From 279f8740ea70661015a99521f36c37380a10f59b Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Mon, 18 Mar 2024 10:53:49 +0000 Subject: [PATCH 01/21] add tests to assert last used at updated on port-forward --- cli/portforward_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/cli/portforward_test.go b/cli/portforward_test.go index 9ea5335c43365..902ed1f6bb247 100644 --- a/cli/portforward_test.go +++ b/cli/portforward_test.go @@ -148,6 +148,10 @@ func TestPortForward(t *testing.T) { cancel() err = <-errC require.ErrorIs(t, err, context.Canceled) + + updated, err := client.Workspace(context.Background(), workspace.ID) + require.NoError(t, err) + require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) }) t.Run(c.name+"_TwoPorts", func(t *testing.T) { @@ -196,6 +200,10 @@ func TestPortForward(t *testing.T) { cancel() err = <-errC require.ErrorIs(t, err, context.Canceled) + + updated, err := client.Workspace(context.Background(), workspace.ID) + require.NoError(t, err) + require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) }) } @@ -257,6 +265,10 @@ func TestPortForward(t *testing.T) { cancel() err := <-errC require.ErrorIs(t, err, context.Canceled) + + updated, err := client.Workspace(context.Background(), workspace.ID) + require.NoError(t, err) + require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) }) } From 8f9b945d44ab2e206bcaf8bd0b6e02aecaa97cbd Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Mon, 18 Mar 2024 14:32:40 +0000 Subject: [PATCH 02/21] add workspaceusage package --- coderd/workspaceusage/tracker.go | 181 ++++++++++++++++++++++++++ coderd/workspaceusage/tracker_test.go | 80 ++++++++++++ 2 files changed, 261 insertions(+) create mode 100644 coderd/workspaceusage/tracker.go create mode 100644 coderd/workspaceusage/tracker_test.go diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go new file mode 100644 index 0000000000000..1b8c0ba86aa23 --- /dev/null +++ b/coderd/workspaceusage/tracker.go @@ -0,0 +1,181 @@ +package workspaceusage + +import ( + "context" + "os" + "sort" + "strings" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" +) + +var DefaultFlushInterval = 60 * time.Second + +// Store is a subset of database.Store +type Store interface { + BatchUpdateWorkspaceLastUsedAt(context.Context, database.BatchUpdateWorkspaceLastUsedAtParams) error +} + +// Tracker tracks and de-bounces updates to workspace usage activity. +// It keeps an internal map of workspace IDs that have been used and +// periodically flushes this to its configured Store. +type Tracker struct { + log slog.Logger // you know, for logs + mut sync.Mutex // protects m + m map[uuid.UUID]struct{} // stores workspace ids + s Store // for flushing data + tickCh <-chan time.Time // controls flush interval + stopTick func() // stops flushing + stopCh chan struct{} // signals us to stop + stopOnce sync.Once // because you only stop once + doneCh chan struct{} // signifies that we have stopped + flushCh chan int // used for testing. +} + +// New returns a new Tracker. It is the caller's responsibility +// to call Close(). +func New(s Store, opts ...Option) *Tracker { + hb := &Tracker{ + log: slog.Make(sloghuman.Sink(os.Stderr)), + m: make(map[uuid.UUID]struct{}, 0), + s: s, + tickCh: nil, + stopTick: nil, + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), + flushCh: nil, + } + for _, opt := range opts { + opt(hb) + } + if hb.tickCh == nil && hb.stopTick == nil { + ticker := time.NewTicker(DefaultFlushInterval) + hb.tickCh = ticker.C + hb.stopTick = ticker.Stop + } + return hb +} + +type Option func(*Tracker) + +// WithLogger sets the logger to be used by Tracker. +func WithLogger(log slog.Logger) Option { + return func(h *Tracker) { + h.log = log + } +} + +// WithFlushInterval allows configuring the flush interval of Tracker. +func WithFlushInterval(d time.Duration) Option { + return func(h *Tracker) { + ticker := time.NewTicker(d) + h.tickCh = ticker.C + h.stopTick = ticker.Stop + } +} + +// WithFlushChannel allows passing a channel that receives +// the number of marked workspaces every time Tracker flushes. +// For testing only. +func WithFlushChannel(c chan int) Option { + return func(h *Tracker) { + h.flushCh = c + } +} + +// WithTickChannel allows passing a channel to replace a ticker. +// For testing only. +func WithTickChannel(c chan time.Time) Option { + return func(h *Tracker) { + h.tickCh = c + h.stopTick = func() {} + } +} + +// Add marks the workspace with the given ID as having been used recently. +// Tracker will periodically flush this to its configured Store. +func (wut *Tracker) Add(workspaceID uuid.UUID) { + wut.mut.Lock() + wut.m[workspaceID] = struct{}{} + wut.mut.Unlock() +} + +// flushLocked updates last_used_at of all current workspace IDs. +// MUST HOLD LOCK BEFORE CALLING +func (wut *Tracker) flushLocked(now time.Time) { + if wut.mut.TryLock() { + panic("developer error: must lock before calling flush()") + } + count := len(wut.m) + defer func() { // only used for testing + if wut.flushCh != nil { + wut.flushCh <- count + } + }() + if count == 0 { + wut.log.Debug(context.Background(), "nothing to flush") + return + } + // Copy our current set of IDs + ids := make([]uuid.UUID, 0) + for k := range wut.m { + ids = append(ids, k) + } + // Reset our internal map + wut.m = make(map[uuid.UUID]struct{}) + // For ease of testing, sort the IDs lexically + sort.Slice(ids, func(i, j int) bool { + // For some unfathomable reason, byte arrays are not comparable? + return strings.Compare(ids[i].String(), ids[j].String()) < 0 + }) + // Set a short-ish timeout for this. We don't want to hang forever. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + // nolint: gocritic // system function + authCtx := dbauthz.AsSystemRestricted(ctx) + if err := wut.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{ + LastUsedAt: now, + IDs: ids, + }); err != nil { + wut.log.Error(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err)) + return + } + wut.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) +} + +func (wut *Tracker) Loop() { + defer func() { + wut.log.Debug(context.Background(), "workspace usage tracker loop exited") + }() + for { + select { + case <-wut.stopCh: + close(wut.doneCh) + return + case now, ok := <-wut.tickCh: + if !ok { + return + } + wut.mut.Lock() + wut.flushLocked(now.UTC()) + wut.mut.Unlock() + } + } +} + +// Close stops Tracker and performs a final flush. +func (wut *Tracker) Close() { + wut.stopOnce.Do(func() { + wut.stopCh <- struct{}{} + wut.stopTick() + <-wut.doneCh + }) +} diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go new file mode 100644 index 0000000000000..3981b08301b3b --- /dev/null +++ b/coderd/workspaceusage/tracker_test.go @@ -0,0 +1,80 @@ +package workspaceusage_test + +import ( + "sort" + "strings" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "go.uber.org/mock/gomock" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbmock" + "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/workspaceusage" +) + +func TestTracker(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + mDB := dbmock.NewMockStore(ctrl) + log := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + + tickCh := make(chan time.Time) + flushCh := make(chan int, 1) + wut := workspaceusage.New(mDB, workspaceusage.WithLogger(log), workspaceusage.WithTickChannel(tickCh), workspaceusage.WithFlushChannel(flushCh)) + t.Cleanup(wut.Close) + + go wut.Loop() + + // 1. No marked workspaces should imply no flush. + now := dbtime.Now() + tickCh <- now + count := <-flushCh + require.Equal(t, 0, count, "expected zero flushes") + + // 2. One marked workspace should cause a flush. + ids := []uuid.UUID{uuid.New()} + now = dbtime.Now() + wut.Add(ids[0]) + mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ + LastUsedAt: now, + IDs: ids, + }).Times(1) + tickCh <- now + count = <-flushCh + require.Equal(t, 1, count, "expected one flush with one id") + + // 3. Lots of marked workspaces should also cause a flush. + for i := 0; i < 10; i++ { + ids = append(ids, uuid.New()) + } + + // Sort ids so mockDB knows what to expect + sort.Slice(ids, func(i, j int) bool { + return strings.Compare(ids[i].String(), ids[j].String()) < 0 + }) + + for _, id := range ids { + wut.Add(id) + } + + now = dbtime.Now() + mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ + LastUsedAt: now, + IDs: ids, + }).Times(1) + tickCh <- now + count = <-flushCh + require.Equal(t, 11, count, "expected one flush with eleven ids") +} + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} From e8c842cfc32d1d968c5cfdad982911e0f0b939d1 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Mon, 18 Mar 2024 16:03:11 +0000 Subject: [PATCH 03/21] add workspace usager tracking to coderd, add endpoint --- coderd/coderd.go | 17 +++++++++++++++++ coderd/coderdtest/coderdtest.go | 17 +++++++++++++++++ coderd/workspaces.go | 18 ++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/coderd/coderd.go b/coderd/coderd.go index bdd20512fa556..a6aaf269eb125 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -66,6 +66,7 @@ import ( "github.com/coder/coder/v2/coderd/updatecheck" "github.com/coder/coder/v2/coderd/util/slice" "github.com/coder/coder/v2/coderd/workspaceapps" + "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpc" "github.com/coder/coder/v2/provisionerd/proto" @@ -190,6 +191,9 @@ type Options struct { // NewTicker is used for unit tests to replace "time.NewTicker". NewTicker func(duration time.Duration) (tick <-chan time.Time, done func()) + + // WorkspaceUsageTracker tracks workspace usage by the CLI. + WorkspaceUsageTracker *workspaceusage.Tracker } // @title Coder API @@ -362,6 +366,14 @@ func New(options *Options) *API { OIDC: options.OIDCConfig, } + if options.WorkspaceUsageTracker == nil { + options.WorkspaceUsageTracker = workspaceusage.New(options.Database, + workspaceusage.WithFlushInterval(workspaceusage.DefaultFlushInterval), + workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), + ) + } + go options.WorkspaceUsageTracker.Loop() + ctx, cancel := context.WithCancel(context.Background()) r := chi.NewRouter() @@ -405,6 +417,7 @@ func New(options *Options) *API { options.Logger.Named("acquirer"), options.Database, options.Pubsub), + workspaceUsageTracker: options.WorkspaceUsageTracker, } api.AppearanceFetcher.Store(&appearance.DefaultFetcher) @@ -972,6 +985,7 @@ func New(options *Options) *API { }) r.Get("/watch", api.watchWorkspace) r.Put("/extend", api.putExtendWorkspace) + r.Post("/usage", api.postWorkspaceUsage) r.Put("/dormant", api.putWorkspaceDormant) r.Put("/favorite", api.putFavoriteWorkspace) r.Delete("/favorite", api.deleteFavoriteWorkspace) @@ -1179,6 +1193,8 @@ type API struct { statsBatcher *batchstats.Batcher Acquirer *provisionerdserver.Acquirer + + workspaceUsageTracker *workspaceusage.Tracker } // Close waits for all WebSocket connections to drain before returning. @@ -1200,6 +1216,7 @@ func (api *API) Close() error { _ = (*coordinator).Close() } _ = api.agentProvider.Close() + api.workspaceUsageTracker.Close() return nil } diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 4d315c3e2b058..577c10d9e8528 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -70,6 +70,7 @@ import ( "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" + "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/drpc" @@ -146,6 +147,7 @@ type Options struct { WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions AllowWorkspaceRenames bool NewTicker func(duration time.Duration) (<-chan time.Time, func()) + WorkspaceUsageTracker *workspaceusage.Tracker } // New constructs a codersdk client connected to an in-memory API instance. @@ -306,6 +308,20 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can hangDetector.Start() t.Cleanup(hangDetector.Close) + if options.WorkspaceUsageTracker == nil { + // Workspace usage tracking must be triggered manually in tests. + // To do this, pass in your own WorkspaceUsageTracker. + wutFlush := make(chan int) + wutTick := make(chan time.Time) + options.WorkspaceUsageTracker = workspaceusage.New( + options.Database, + workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), + workspaceusage.WithFlushChannel(wutFlush), + workspaceusage.WithTickChannel(wutTick), + ) + } + t.Cleanup(options.WorkspaceUsageTracker.Close) + var mutex sync.RWMutex var handler http.Handler srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -454,6 +470,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions, AllowWorkspaceRenames: options.AllowWorkspaceRenames, NewTicker: options.NewTicker, + WorkspaceUsageTracker: options.WorkspaceUsageTracker, } } diff --git a/coderd/workspaces.go b/coderd/workspaces.go index 05bf9688e7536..fdbe5ddfa2589 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -1096,6 +1096,24 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) { httpapi.Write(ctx, rw, code, resp) } +// @Summary Post Workspace Usage by ID +// @ID post-workspace-usage-by-id +// @Security CoderSessionToken +// @Tags Workspaces +// @Param workspace path string true "Workspace ID" format(uuid) +// @Success 204 +// @Router /workspaces/{workspace}/usage [post] +func (api *API) postWorkspaceUsage(rw http.ResponseWriter, r *http.Request) { + workspace := httpmw.WorkspaceParam(r) + if !api.Authorize(r, rbac.ActionUpdate, workspace) { + httpapi.Forbidden(rw) + return + } + + api.workspaceUsageTracker.Add(workspace.ID) + rw.WriteHeader(http.StatusNoContent) +} + // @Summary Favorite workspace by ID. // @ID favorite-workspace-by-id // @Security CoderSessionToken From 86704a1da70e7356fd2ca064f01795378091daf9 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 19 Mar 2024 09:50:06 +0000 Subject: [PATCH 04/21] add workspace usage tracking to cli/portforward, fix tests --- cli/portforward.go | 3 +++ cli/portforward_test.go | 19 +++++++++++++++- codersdk/workspaces.go | 50 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/cli/portforward.go b/cli/portforward.go index 68a076d5908e1..ebe925a6a3801 100644 --- a/cli/portforward.go +++ b/cli/portforward.go @@ -136,6 +136,8 @@ func (r *RootCmd) portForward() *serpent.Command { listeners[i] = l } + stopUpdating := client.UpdateWorkspaceUsageContext(ctx, workspace.ID) + // Wait for the context to be canceled or for a signal and close // all listeners. var closeErr error @@ -156,6 +158,7 @@ func (r *RootCmd) portForward() *serpent.Command { } cancel() + stopUpdating() closeAllListeners() }() diff --git a/cli/portforward_test.go b/cli/portforward_test.go index 902ed1f6bb247..1fb18eee1baf2 100644 --- a/cli/portforward_test.go +++ b/cli/portforward_test.go @@ -21,6 +21,9 @@ import ( "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbfake" + "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/pty/ptytest" "github.com/coder/coder/v2/testutil" @@ -96,7 +99,15 @@ func TestPortForward(t *testing.T) { // Setup agent once to be shared between test-cases (avoid expensive // non-parallel setup). var ( - client, db = coderdtest.NewWithDatabase(t, nil) + db, ps = dbtestutil.NewDB(t) + wuTick = make(chan time.Time) + wuFlush = make(chan int, 1) + wut = workspaceusage.New(db, workspaceusage.WithFlushChannel(wuFlush), workspaceusage.WithTickChannel(wuTick)) + client = coderdtest.New(t, &coderdtest.Options{ + WorkspaceUsageTracker: wut, + Database: db, + Pubsub: ps, + }) admin = coderdtest.CreateFirstUser(t, client) member, memberUser = coderdtest.CreateAnotherUser(t, client, admin.OrganizationID) workspace = runAgent(t, client, memberUser.ID, db) @@ -149,6 +160,8 @@ func TestPortForward(t *testing.T) { err = <-errC require.ErrorIs(t, err, context.Canceled) + wuTick <- dbtime.Now() + <-wuFlush updated, err := client.Workspace(context.Background(), workspace.ID) require.NoError(t, err) require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) @@ -201,6 +214,8 @@ func TestPortForward(t *testing.T) { err = <-errC require.ErrorIs(t, err, context.Canceled) + wuTick <- dbtime.Now() + <-wuFlush updated, err := client.Workspace(context.Background(), workspace.ID) require.NoError(t, err) require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) @@ -266,6 +281,8 @@ func TestPortForward(t *testing.T) { err := <-errC require.ErrorIs(t, err, context.Canceled) + wuTick <- dbtime.Now() + <-wuFlush updated, err := client.Workspace(context.Background(), workspace.ID) require.NoError(t, err) require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) diff --git a/codersdk/workspaces.go b/codersdk/workspaces.go index ecdc99d7775d0..71d6a60d200e8 100644 --- a/codersdk/workspaces.go +++ b/codersdk/workspaces.go @@ -11,6 +11,8 @@ import ( "github.com/google/uuid" "golang.org/x/xerrors" + "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/tracing" ) @@ -314,6 +316,54 @@ func (c *Client) PutExtendWorkspace(ctx context.Context, id uuid.UUID, req PutEx return nil } +// PostWorkspaceUsage marks the workspace as having been used recently. +func (c *Client) PostWorkspaceUsage(ctx context.Context, id uuid.UUID) error { + path := fmt.Sprintf("/api/v2/workspaces/%s/usage", id.String()) + res, err := c.Request(ctx, http.MethodPost, path, nil) + if err != nil { + return xerrors.Errorf("post workspace usage: %w", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusNoContent { + return ReadBodyAsError(res) + } + return nil +} + +// UpdateWorkspaceUsageContext periodically posts workspace usage for the workspace +// with the given id in the background. +// The caller is responsible for calling the returned function to stop the background +// process. +func (c *Client) UpdateWorkspaceUsageContext(ctx context.Context, id uuid.UUID) func() { + hbCtx, hbCancel := context.WithCancel(ctx) + // Perform one initial heartbeat + if err := c.PostWorkspaceUsage(hbCtx, id); err != nil { + c.logger.Warn(ctx, "failed to post workspace usage", slog.Error(err)) + } + ticker := time.NewTicker(time.Minute) + doneCh := make(chan struct{}) + go func() { + defer func() { + ticker.Stop() + close(doneCh) + }() + for { + select { + case <-ticker.C: + if err := c.PostWorkspaceUsage(hbCtx, id); err != nil { + c.logger.Warn(ctx, "failed to post workspace usage in background", slog.Error(err)) + } + case <-hbCtx.Done(): + return + } + } + }() + return func() { + hbCancel() + <-doneCh + } +} + // UpdateWorkspaceDormancy is a request to activate or make a workspace dormant. // A value of false will activate a dormant workspace. type UpdateWorkspaceDormancy struct { From c99327cd2de69931d44cbea06969fe3a978654b1 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 19 Mar 2024 12:32:22 +0000 Subject: [PATCH 05/21] make gen --- coderd/apidoc/docs.go | 29 +++++++++++++++++++++++++++++ coderd/apidoc/swagger.json | 27 +++++++++++++++++++++++++++ docs/api/workspaces.md | 26 ++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index d947ec67da909..213f088bf9997 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -7592,6 +7592,35 @@ const docTemplate = `{ } } }, + "/workspaces/{workspace}/usage": { + "post": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "tags": [ + "Workspaces" + ], + "summary": "Post Workspace Usage by ID", + "operationId": "post-workspace-usage-by-id", + "parameters": [ + { + "type": "string", + "format": "uuid", + "description": "Workspace ID", + "name": "workspace", + "in": "path", + "required": true + } + ], + "responses": { + "204": { + "description": "No Content" + } + } + } + }, "/workspaces/{workspace}/watch": { "get": { "security": [ diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index 149d63578b86f..425025867e30c 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -6711,6 +6711,33 @@ } } }, + "/workspaces/{workspace}/usage": { + "post": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "tags": ["Workspaces"], + "summary": "Post Workspace Usage by ID", + "operationId": "post-workspace-usage-by-id", + "parameters": [ + { + "type": "string", + "format": "uuid", + "description": "Workspace ID", + "name": "workspace", + "in": "path", + "required": true + } + ], + "responses": { + "204": { + "description": "No Content" + } + } + } + }, "/workspaces/{workspace}/watch": { "get": { "security": [ diff --git a/docs/api/workspaces.md b/docs/api/workspaces.md index f176653a171dd..c16dd970a5cff 100644 --- a/docs/api/workspaces.md +++ b/docs/api/workspaces.md @@ -1385,6 +1385,32 @@ curl -X PUT http://coder-server:8080/api/v2/workspaces/{workspace}/ttl \ To perform this operation, you must be authenticated. [Learn more](authentication.md). +## Post Workspace Usage by ID + +### Code samples + +```shell +# Example request using curl +curl -X POST http://coder-server:8080/api/v2/workspaces/{workspace}/usage \ + -H 'Coder-Session-Token: API_KEY' +``` + +`POST /workspaces/{workspace}/usage` + +### Parameters + +| Name | In | Type | Required | Description | +| ----------- | ---- | ------------ | -------- | ------------ | +| `workspace` | path | string(uuid) | true | Workspace ID | + +### Responses + +| Status | Meaning | Description | Schema | +| ------ | --------------------------------------------------------------- | ----------- | ------ | +| 204 | [No Content](https://tools.ietf.org/html/rfc7231#section-6.3.5) | No Content | | + +To perform this operation, you must be authenticated. [Learn more](authentication.md). + ## Watch workspace by ID ### Code samples From 5876edd8cf0f4e04273406c87ef8c1ee973a2267 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 19 Mar 2024 19:17:19 +0000 Subject: [PATCH 06/21] workspaceusage: improve locking and tests --- coderd/workspaceusage/tracker.go | 98 +++++++++++++++++---------- coderd/workspaceusage/tracker_test.go | 31 +++++++-- 2 files changed, 87 insertions(+), 42 deletions(-) diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index 1b8c0ba86aa23..ff68ad7931492 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -28,16 +28,16 @@ type Store interface { // It keeps an internal map of workspace IDs that have been used and // periodically flushes this to its configured Store. type Tracker struct { - log slog.Logger // you know, for logs - mut sync.Mutex // protects m - m map[uuid.UUID]struct{} // stores workspace ids - s Store // for flushing data - tickCh <-chan time.Time // controls flush interval - stopTick func() // stops flushing - stopCh chan struct{} // signals us to stop - stopOnce sync.Once // because you only stop once - doneCh chan struct{} // signifies that we have stopped - flushCh chan int // used for testing. + log slog.Logger // you know, for logs + flushLock sync.Mutex // protects m + m *uuidSet // stores workspace ids + s Store // for flushing data + tickCh <-chan time.Time // controls flush interval + stopTick func() // stops flushing + stopCh chan struct{} // signals us to stop + stopOnce sync.Once // because you only stop once + doneCh chan struct{} // signifies that we have stopped + flushCh chan int // used for testing. } // New returns a new Tracker. It is the caller's responsibility @@ -45,7 +45,7 @@ type Tracker struct { func New(s Store, opts ...Option) *Tracker { hb := &Tracker{ log: slog.Make(sloghuman.Sink(os.Stderr)), - m: make(map[uuid.UUID]struct{}, 0), + m: &uuidSet{}, s: s, tickCh: nil, stopTick: nil, @@ -103,44 +103,40 @@ func WithTickChannel(c chan time.Time) Option { // Add marks the workspace with the given ID as having been used recently. // Tracker will periodically flush this to its configured Store. func (wut *Tracker) Add(workspaceID uuid.UUID) { - wut.mut.Lock() - wut.m[workspaceID] = struct{}{} - wut.mut.Unlock() + wut.m.Add(workspaceID) } -// flushLocked updates last_used_at of all current workspace IDs. -// MUST HOLD LOCK BEFORE CALLING -func (wut *Tracker) flushLocked(now time.Time) { - if wut.mut.TryLock() { - panic("developer error: must lock before calling flush()") - } - count := len(wut.m) - defer func() { // only used for testing - if wut.flushCh != nil { +// flush updates last_used_at of all current workspace IDs. +// If this is held while a previous flush is in progress, it will +// deadlock until the previous flush has completed. +func (wut *Tracker) flush(now time.Time) { + var count int + if wut.flushCh != nil { // only used for testing + defer func() { wut.flushCh <- count - } - }() + }() + } + + // Copy our current set of IDs + ids := wut.m.UniqueAndClear() + count = len(ids) if count == 0 { wut.log.Debug(context.Background(), "nothing to flush") return } - // Copy our current set of IDs - ids := make([]uuid.UUID, 0) - for k := range wut.m { - ids = append(ids, k) - } - // Reset our internal map - wut.m = make(map[uuid.UUID]struct{}) + // For ease of testing, sort the IDs lexically sort.Slice(ids, func(i, j int) bool { // For some unfathomable reason, byte arrays are not comparable? return strings.Compare(ids[i].String(), ids[j].String()) < 0 }) // Set a short-ish timeout for this. We don't want to hang forever. - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // nolint: gocritic // system function authCtx := dbauthz.AsSystemRestricted(ctx) + wut.flushLock.Lock() + defer wut.flushLock.Unlock() if err := wut.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{ LastUsedAt: now, IDs: ids, @@ -164,9 +160,7 @@ func (wut *Tracker) Loop() { if !ok { return } - wut.mut.Lock() - wut.flushLocked(now.UTC()) - wut.mut.Unlock() + wut.flush(now.UTC()) } } } @@ -179,3 +173,35 @@ func (wut *Tracker) Close() { <-wut.doneCh }) } + +// uuidSet is a set of UUIDs. Safe for concurrent usage. +// The zero value can be used. +type uuidSet struct { + l sync.Mutex + m map[uuid.UUID]struct{} +} + +func (s *uuidSet) Add(id uuid.UUID) { + s.l.Lock() + defer s.l.Unlock() + if s.m == nil { + s.m = make(map[uuid.UUID]struct{}) + } + s.m[id] = struct{}{} +} + +// UniqueAndClear returns the unique set of entries in s and +// resets the internal map. +func (s *uuidSet) UniqueAndClear() []uuid.UUID { + s.l.Lock() + defer s.l.Unlock() + if s.m == nil { + s.m = make(map[uuid.UUID]struct{}) + } + l := make([]uuid.UUID, 0) + for k := range s.m { + l = append(l, k) + } + s.m = make(map[uuid.UUID]struct{}) + return l +} diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go index 3981b08301b3b..9dd93d3bca885 100644 --- a/coderd/workspaceusage/tracker_test.go +++ b/coderd/workspaceusage/tracker_test.go @@ -3,6 +3,7 @@ package workspaceusage_test import ( "sort" "strings" + "sync" "testing" "time" @@ -61,17 +62,35 @@ func TestTracker(t *testing.T) { return strings.Compare(ids[i].String(), ids[j].String()) < 0 }) - for _, id := range ids { - wut.Add(id) - } - now = dbtime.Now() mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ LastUsedAt: now, IDs: ids, }).Times(1) - tickCh <- now - count = <-flushCh + // Try to force a race condition. + var wg sync.WaitGroup + numTicks := 10 + count = 0 + wg.Add(1) + go func() { + defer wg.Done() + for _, id := range ids { + wut.Add(id) + } + }() + for i := 0; i < numTicks; i++ { + wg.Add(1) + go func() { + defer wg.Done() + tickCh <- now + }() + } + + for i := 0; i < numTicks; i++ { + count += <-flushCh + } + + wg.Wait() require.Equal(t, 11, count, "expected one flush with eleven ids") } From e4e031163723225c14c9b494c138d40d71dccf8c Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 19 Mar 2024 19:30:10 +0000 Subject: [PATCH 07/21] address more PR comments --- coderd/workspaceusage/tracker.go | 34 ++++++++++++++++++++------- coderd/workspaceusage/tracker_test.go | 7 ++++++ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index ff68ad7931492..02e43901cb537 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -1,10 +1,11 @@ package workspaceusage import ( + "bytes" "context" + "flag" "os" "sort" - "strings" "sync" "time" @@ -84,16 +85,22 @@ func WithFlushInterval(d time.Duration) Option { // WithFlushChannel allows passing a channel that receives // the number of marked workspaces every time Tracker flushes. -// For testing only. +// For testing only and will panic if used outside of tests. func WithFlushChannel(c chan int) Option { + if flag.Lookup("test.v") == nil { + panic("developer error: WithFlushChannel is not to be used outside of tests.") + } return func(h *Tracker) { h.flushCh = c } } // WithTickChannel allows passing a channel to replace a ticker. -// For testing only. +// For testing only and will panic if used outside of tests. func WithTickChannel(c chan time.Time) Option { + if flag.Lookup("test.v") == nil { + panic("developer error: WithTickChannel is not to be used outside of tests.") + } return func(h *Tracker) { h.tickCh = c h.stopTick = func() {} @@ -125,11 +132,6 @@ func (wut *Tracker) flush(now time.Time) { return } - // For ease of testing, sort the IDs lexically - sort.Slice(ids, func(i, j int) bool { - // For some unfathomable reason, byte arrays are not comparable? - return strings.Compare(ids[i].String(), ids[j].String()) < 0 - }) // Set a short-ish timeout for this. We don't want to hang forever. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -147,7 +149,15 @@ func (wut *Tracker) flush(now time.Time) { wut.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) } +// Loop periodically flushes every tick. +// If Loop is called after Close, it will panic. Don't do this. func (wut *Tracker) Loop() { + // Calling Loop after Close() is an error. + select { + case <-wut.doneCh: + panic("developer error: Loop called after Close") + default: + } defer func() { wut.log.Debug(context.Background(), "workspace usage tracker loop exited") }() @@ -165,7 +175,8 @@ func (wut *Tracker) Loop() { } } -// Close stops Tracker and performs a final flush. +// Close stops Tracker and returns once Loop has exited. +// After calling Close(), Loop must not be called. func (wut *Tracker) Close() { wut.stopOnce.Do(func() { wut.stopCh <- struct{}{} @@ -202,6 +213,11 @@ func (s *uuidSet) UniqueAndClear() []uuid.UUID { for k := range s.m { l = append(l, k) } + // For ease of testing, sort the IDs lexically + sort.Slice(l, func(i, j int) bool { + // For some unfathomable reason, byte arrays are not comparable? + return bytes.Compare(l[i][:], l[j][:]) < 0 + }) s.m = make(map[uuid.UUID]struct{}) return l } diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go index 9dd93d3bca885..ec59e8083efc8 100644 --- a/coderd/workspaceusage/tracker_test.go +++ b/coderd/workspaceusage/tracker_test.go @@ -92,6 +92,13 @@ func TestTracker(t *testing.T) { wg.Wait() require.Equal(t, 11, count, "expected one flush with eleven ids") + + // 4. Closing multiple times should not be a problem. + wut.Close() + wut.Close() + + // 5. Running Loop() again should panic. + require.Panics(t, wut.Loop) } func TestMain(m *testing.M) { From 958d1d131b0d85ab765afcbbcd50a09a167d54e8 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 19 Mar 2024 19:53:44 +0000 Subject: [PATCH 08/21] try to race harder --- coderd/workspaceusage/tracker.go | 8 ++---- coderd/workspaceusage/tracker_test.go | 40 +++++++++++++++------------ 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index 02e43901cb537..9a9906021b13c 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -117,16 +117,14 @@ func (wut *Tracker) Add(workspaceID uuid.UUID) { // If this is held while a previous flush is in progress, it will // deadlock until the previous flush has completed. func (wut *Tracker) flush(now time.Time) { - var count int + // Copy our current set of IDs + ids := wut.m.UniqueAndClear() + count := len(ids) if wut.flushCh != nil { // only used for testing defer func() { wut.flushCh <- count }() } - - // Copy our current set of IDs - ids := wut.m.UniqueAndClear() - count = len(ids) if count == 0 { wut.log.Debug(context.Background(), "nothing to flush") return diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go index ec59e8083efc8..5213ae33dc837 100644 --- a/coderd/workspaceusage/tracker_test.go +++ b/coderd/workspaceusage/tracker_test.go @@ -1,8 +1,8 @@ package workspaceusage_test import ( + "bytes" "sort" - "strings" "sync" "testing" "time" @@ -53,51 +53,55 @@ func TestTracker(t *testing.T) { require.Equal(t, 1, count, "expected one flush with one id") // 3. Lots of marked workspaces should also cause a flush. - for i := 0; i < 10; i++ { + for i := 0; i < 31; i++ { ids = append(ids, uuid.New()) } - // Sort ids so mockDB knows what to expect + // Sort ids so mDB know what to expect. sort.Slice(ids, func(i, j int) bool { - return strings.Compare(ids[i].String(), ids[j].String()) < 0 + return bytes.Compare(ids[i][:], ids[j][:]) < 0 }) now = dbtime.Now() mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ LastUsedAt: now, IDs: ids, - }).Times(1) + }) + for _, id := range ids { + wut.Add(id) + } + tickCh <- now + count = <-flushCh + require.Equal(t, len(ids), count, "incorrect number of ids flushed") + + // 4. Try to cause a race condition! + now = dbtime.Now() + // Difficult to know what to EXPECT here, so we won't check strictly here. + mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), gomock.Any()).MinTimes(1).MaxTimes(len(ids)) // Try to force a race condition. var wg sync.WaitGroup - numTicks := 10 count = 0 - wg.Add(1) - go func() { - defer wg.Done() - for _, id := range ids { - wut.Add(id) - } - }() - for i := 0; i < numTicks; i++ { + for i := 0; i < len(ids); i++ { wg.Add(1) go func() { defer wg.Done() tickCh <- now }() + wut.Add(ids[i]) } - for i := 0; i < numTicks; i++ { + for i := 0; i < len(ids); i++ { count += <-flushCh } wg.Wait() - require.Equal(t, 11, count, "expected one flush with eleven ids") + require.Equal(t, len(ids), count, "incorrect number of ids flushed") - // 4. Closing multiple times should not be a problem. + // 5. Closing multiple times should not be a problem. wut.Close() wut.Close() - // 5. Running Loop() again should panic. + // 6. Running Loop() again should panic. require.Panics(t, wut.Loop) } From a36aeb9711ca5844fdb2a9af5fd663bcb96851c9 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 09:10:54 +0000 Subject: [PATCH 09/21] add danny's suggestions Co-authored-by: Danny Kopping --- coderd/workspaceusage/tracker.go | 4 +++- codersdk/workspaces.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index 9a9906021b13c..a6691d1322d10 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -206,6 +206,7 @@ func (s *uuidSet) UniqueAndClear() []uuid.UUID { defer s.l.Unlock() if s.m == nil { s.m = make(map[uuid.UUID]struct{}) + return []uuid.UUID{} } l := make([]uuid.UUID, 0) for k := range s.m { @@ -214,8 +215,9 @@ func (s *uuidSet) UniqueAndClear() []uuid.UUID { // For ease of testing, sort the IDs lexically sort.Slice(l, func(i, j int) bool { // For some unfathomable reason, byte arrays are not comparable? + // See https://github.com/golang/go/issues/61004 return bytes.Compare(l[i][:], l[j][:]) < 0 }) - s.m = make(map[uuid.UUID]struct{}) + clear(s.m) return l } diff --git a/codersdk/workspaces.go b/codersdk/workspaces.go index 71d6a60d200e8..0007e85de8ee4 100644 --- a/codersdk/workspaces.go +++ b/codersdk/workspaces.go @@ -336,7 +336,7 @@ func (c *Client) PostWorkspaceUsage(ctx context.Context, id uuid.UUID) error { // process. func (c *Client) UpdateWorkspaceUsageContext(ctx context.Context, id uuid.UUID) func() { hbCtx, hbCancel := context.WithCancel(ctx) - // Perform one initial heartbeat + // Perform one initial update if err := c.PostWorkspaceUsage(hbCtx, id); err != nil { c.logger.Warn(ctx, "failed to post workspace usage", slog.Error(err)) } From 692f666414fd8d8f050bf89f5233cd7fecd0fc8c Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 09:30:52 +0000 Subject: [PATCH 10/21] add big big comments --- coderd/coderdtest/coderdtest.go | 18 +++++++++++++++++- coderd/workspaceusage/tracker.go | 6 ++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 577c10d9e8528..3c1ba02b0f4dc 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -309,8 +309,24 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can t.Cleanup(hangDetector.Close) if options.WorkspaceUsageTracker == nil { + // Did last_used_at not update? Scratching your noggin? Here's why. // Workspace usage tracking must be triggered manually in tests. - // To do this, pass in your own WorkspaceUsageTracker. + // The vast majority of existing tests do not depend on last_used_at + // and adding an extra background goroutine to all existing tests may + // lead to future flakes and goleak complaints. + // To do this, pass in your own WorkspaceUsageTracker like so: + // + // db, ps = dbtestutil.NewDB(t) + // wuTick = make(chan time.Time) + // wuFlush = make(chan int, 1) + // wut = workspaceusage.New(db, workspaceusage.WithFlushChannel(wuFlush), workspaceusage.WithTickChannel(wuTick)) + // client = coderdtest.New(t, &coderdtest.Options{ + // WorkspaceUsageTracker: wut, + // Database: db, + // Pubsub: ps, + // }) + // + // See TestPortForward for how this works in practice. wutFlush := make(chan int) wutTick := make(chan time.Time) options.WorkspaceUsageTracker = workspaceusage.New( diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index a6691d1322d10..061b78c00db3e 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -168,6 +168,12 @@ func (wut *Tracker) Loop() { if !ok { return } + // NOTE: we do not update last_used_at with the time at which each workspace was added. + // Instead, we update with the time of the flush. If the BatchUpdateWorkspacesLastUsedAt + // query can be rewritten to update each id with a corresponding last_used_at timestamp + // then we could capture the exact usage time of each workspace. For now however, as + // we perform this query at a regular interval, the time of the flush is 'close enough' + // for the purposes of both dormancy (and for autostop, in future). wut.flush(now.UTC()) } } From d794e00c633dbef1eb67d62b777f761151c53c5d Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 10:23:19 +0000 Subject: [PATCH 11/21] fix(database): BatchUpdateWorkspaceLastUsedAt: avoid overwriting older data --- coderd/database/dbmem/dbmem.go | 4 ++++ coderd/database/queries.sql.go | 3 +++ coderd/database/queries/workspaces.sql | 5 ++++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/coderd/database/dbmem/dbmem.go b/coderd/database/dbmem/dbmem.go index 131020e53fb3e..3dee48714fa84 100644 --- a/coderd/database/dbmem/dbmem.go +++ b/coderd/database/dbmem/dbmem.go @@ -1046,6 +1046,10 @@ func (q *FakeQuerier) BatchUpdateWorkspaceLastUsedAt(_ context.Context, arg data if _, found := m[q.workspaces[i].ID]; !found { continue } + // WHERE last_used_at < @last_used_at + if !q.workspaces[i].LastUsedAt.Before(arg.LastUsedAt) { + continue + } q.workspaces[i].LastUsedAt = arg.LastUsedAt n++ } diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 85ff4a91cd02b..f3609632a8a9a 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -11518,6 +11518,9 @@ SET last_used_at = $1 WHERE id = ANY($2 :: uuid[]) +AND + -- Do not overwrite with older data + last_used_at < $1 ` type BatchUpdateWorkspaceLastUsedAtParams struct { diff --git a/coderd/database/queries/workspaces.sql b/coderd/database/queries/workspaces.sql index 0482fd5135179..01c86cb41e1b1 100644 --- a/coderd/database/queries/workspaces.sql +++ b/coderd/database/queries/workspaces.sql @@ -433,7 +433,10 @@ UPDATE SET last_used_at = @last_used_at WHERE - id = ANY(@ids :: uuid[]); + id = ANY(@ids :: uuid[]) +AND + -- Do not overwrite with older data + last_used_at < @last_used_at; -- name: GetDeploymentWorkspaceStats :one WITH workspaces_with_jobs AS ( From 45a0eef34d60529d2ea69cbfc70e4ba348431def Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 10:23:50 +0000 Subject: [PATCH 12/21] fix(coderd/workspaceusage): log number of consecutive flush errors --- coderd/workspaceusage/tracker.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index 061b78c00db3e..f3cd25efeaf77 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -29,16 +29,17 @@ type Store interface { // It keeps an internal map of workspace IDs that have been used and // periodically flushes this to its configured Store. type Tracker struct { - log slog.Logger // you know, for logs - flushLock sync.Mutex // protects m - m *uuidSet // stores workspace ids - s Store // for flushing data - tickCh <-chan time.Time // controls flush interval - stopTick func() // stops flushing - stopCh chan struct{} // signals us to stop - stopOnce sync.Once // because you only stop once - doneCh chan struct{} // signifies that we have stopped - flushCh chan int // used for testing. + log slog.Logger // you know, for logs + flushLock sync.Mutex // protects m + flushErrors int // tracks the number of consecutive errors flushing + m *uuidSet // stores workspace ids + s Store // for flushing data + tickCh <-chan time.Time // controls flush interval + stopTick func() // stops flushing + stopCh chan struct{} // signals us to stop + stopOnce sync.Once // because you only stop once + doneCh chan struct{} // signifies that we have stopped + flushCh chan int // used for testing. } // New returns a new Tracker. It is the caller's responsibility @@ -141,9 +142,13 @@ func (wut *Tracker) flush(now time.Time) { LastUsedAt: now, IDs: ids, }); err != nil { - wut.log.Error(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err)) + wut.flushErrors++ + wut.log.Error(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", wut.flushErrors), slog.Error(err)) + // TODO: if this keeps failing, it indicates a fundamental problem with the database connection. + // How to surface it correctly to admins besides just screaming into the logs? return } + wut.flushErrors = 0 wut.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) } From 8e40efd51d6fcd8cbeeef3597111c706f06e6f12 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 10:27:30 +0000 Subject: [PATCH 13/21] upgrade to error log on multiple flush failures --- coderd/workspaceusage/tracker.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index f3cd25efeaf77..6559c29ad9513 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -142,10 +142,17 @@ func (wut *Tracker) flush(now time.Time) { LastUsedAt: now, IDs: ids, }); err != nil { + // A single failure to flush is likely not a huge problem. If the workspace is still connected at + // the next iteration, either another coderd instance will likely have this data or the CLI + // will tell us again that the workspace is in use. wut.flushErrors++ - wut.log.Error(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", wut.flushErrors), slog.Error(err)) - // TODO: if this keeps failing, it indicates a fundamental problem with the database connection. - // How to surface it correctly to admins besides just screaming into the logs? + if wut.flushErrors > 1 { + wut.log.Error(ctx, "multiple failures updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", wut.flushErrors), slog.Error(err)) + // TODO: if this keeps failing, it indicates a fundamental problem with the database connection. + // How to surface it correctly to admins besides just screaming into the logs? + } else { + wut.log.Warn(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err)) + } return } wut.flushErrors = 0 From 591e1abffe1f47651ff6fcd1a204575ddacd4498 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 11:22:53 +0000 Subject: [PATCH 14/21] chore(coderd/workspaceusage): add integration-style test with multiple instances --- coderd/workspaceusage/tracker_test.go | 113 ++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go index 5213ae33dc837..bf56e74386a53 100644 --- a/coderd/workspaceusage/tracker_test.go +++ b/coderd/workspaceusage/tracker_test.go @@ -14,10 +14,15 @@ import ( "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbfake" "github.com/coder/coder/v2/coderd/database/dbmock" + "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/workspaceusage" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/testutil" ) func TestTracker(t *testing.T) { @@ -105,6 +110,114 @@ func TestTracker(t *testing.T) { require.Panics(t, wut.Loop) } +// This test performs a more 'integration-style' test with multiple instances. +func TestTracker_MultipleInstances(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("this test only makes sense with postgres") + } + + // Given we have two coderd instances connected to the same database + var ( + ctx = testutil.Context(t, testutil.WaitLong) + db, ps = dbtestutil.NewDB(t) + wuTickA = make(chan time.Time) + wuFlushA = make(chan int, 1) + wutA = workspaceusage.New(db, workspaceusage.WithFlushChannel(wuFlushA), workspaceusage.WithTickChannel(wuTickA)) + wuTickB = make(chan time.Time) + wuFlushB = make(chan int, 1) + wutB = workspaceusage.New(db, workspaceusage.WithFlushChannel(wuFlushB), workspaceusage.WithTickChannel(wuTickB)) + clientA = coderdtest.New(t, &coderdtest.Options{ + WorkspaceUsageTracker: wutA, + Database: db, + Pubsub: ps, + }) + clientB = coderdtest.New(t, &coderdtest.Options{ + WorkspaceUsageTracker: wutB, + Database: db, + Pubsub: ps, + }) + owner = coderdtest.CreateFirstUser(t, clientA) + now = dbtime.Now() + ) + + clientB.SetSessionToken(clientA.SessionToken()) + + // Create a number of workspaces + numWorkspaces := 10 + w := make([]dbfake.WorkspaceResponse, numWorkspaces) + for i := 0; i < numWorkspaces; i++ { + wr := dbfake.WorkspaceBuild(t, db, database.Workspace{ + OwnerID: owner.UserID, + OrganizationID: owner.OrganizationID, + LastUsedAt: now, + }).WithAgent().Do() + w[i] = wr + } + + // Use client A to update LastUsedAt of the first three + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[0].Workspace.ID)) + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[1].Workspace.ID)) + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[2].Workspace.ID)) + // Use client B to update LastUsedAt of the next three + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[3].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[4].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[5].Workspace.ID)) + // The next two will have updated from both instances + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) + require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) + require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) + // The last two will not report any usage. + + // Tick both with different times and wait for both flushes to complete + nowA := now.Add(time.Minute) + nowB := now.Add(2 * time.Minute) + var wg sync.WaitGroup + var flushedA, flushedB int + wg.Add(1) + go func() { + defer wg.Done() + wuTickA <- nowA + flushedA = <-wuFlushA + }() + wg.Add(1) + go func() { + defer wg.Done() + wuTickB <- nowB + flushedB = <-wuFlushB + }() + wg.Wait() + + // We expect 5 flushed IDs each + require.Equal(t, 5, flushedA) + require.Equal(t, 5, flushedB) + + // Fetch updated workspaces + updated := make([]codersdk.Workspace, numWorkspaces) + for i := 0; i < numWorkspaces; i++ { + ws, err := clientA.Workspace(ctx, w[i].Workspace.ID) + require.NoError(t, err) + updated[i] = ws + } + // We expect the first three to have the timestamp of flushA + require.Equal(t, nowA.UTC(), updated[0].LastUsedAt.UTC()) + require.Equal(t, nowA.UTC(), updated[1].LastUsedAt.UTC()) + require.Equal(t, nowA.UTC(), updated[2].LastUsedAt.UTC()) + // We expect the next three to have the timestamp of flushB + require.Equal(t, nowB.UTC(), updated[3].LastUsedAt.UTC()) + require.Equal(t, nowB.UTC(), updated[4].LastUsedAt.UTC()) + require.Equal(t, nowB.UTC(), updated[5].LastUsedAt.UTC()) + // The next two should have the timestamp of flushB as it is newer than flushA + require.Equal(t, nowB.UTC(), updated[6].LastUsedAt.UTC()) + require.Equal(t, nowB.UTC(), updated[7].LastUsedAt.UTC()) + // And the last two should be untouched + require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) + require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) + require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) + require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) +} + func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } From 0caaf3a92c7756d49470969a9406ee2c3cf727f2 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 13:01:05 +0000 Subject: [PATCH 15/21] fix(cli/portforward_test.go): use testutil.RequireRecv/SendCtx --- cli/portforward_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/cli/portforward_test.go b/cli/portforward_test.go index 1fb18eee1baf2..2ec7674377814 100644 --- a/cli/portforward_test.go +++ b/cli/portforward_test.go @@ -160,8 +160,9 @@ func TestPortForward(t *testing.T) { err = <-errC require.ErrorIs(t, err, context.Canceled) - wuTick <- dbtime.Now() - <-wuFlush + flushCtx := testutil.Context(t, testutil.WaitShort) + testutil.RequireSendCtx(flushCtx, t, wuTick, dbtime.Now()) + _ = testutil.RequireRecvCtx(flushCtx, t, wuFlush) updated, err := client.Workspace(context.Background(), workspace.ID) require.NoError(t, err) require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) @@ -214,8 +215,9 @@ func TestPortForward(t *testing.T) { err = <-errC require.ErrorIs(t, err, context.Canceled) - wuTick <- dbtime.Now() - <-wuFlush + flushCtx := testutil.Context(t, testutil.WaitShort) + testutil.RequireSendCtx(flushCtx, t, wuTick, dbtime.Now()) + _ = testutil.RequireRecvCtx(flushCtx, t, wuFlush) updated, err := client.Workspace(context.Background(), workspace.ID) require.NoError(t, err) require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) @@ -281,8 +283,9 @@ func TestPortForward(t *testing.T) { err := <-errC require.ErrorIs(t, err, context.Canceled) - wuTick <- dbtime.Now() - <-wuFlush + flushCtx := testutil.Context(t, testutil.WaitShort) + testutil.RequireSendCtx(flushCtx, t, wuTick, dbtime.Now()) + _ = testutil.RequireRecvCtx(flushCtx, t, wuFlush) updated, err := client.Workspace(context.Background(), workspace.ID) require.NoError(t, err) require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) From cc72868f24c7d3e56524db0c6982765bdd2a6952 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 13:04:05 +0000 Subject: [PATCH 16/21] just use default flush interval --- coderd/coderd.go | 1 - 1 file changed, 1 deletion(-) diff --git a/coderd/coderd.go b/coderd/coderd.go index a6aaf269eb125..af4b4c51a9725 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -368,7 +368,6 @@ func New(options *Options) *API { if options.WorkspaceUsageTracker == nil { options.WorkspaceUsageTracker = workspaceusage.New(options.Database, - workspaceusage.WithFlushInterval(workspaceusage.DefaultFlushInterval), workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), ) } From f5f8d75b018641a7b4772a7c3c73ca73efea9e7a Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 13:04:12 +0000 Subject: [PATCH 17/21] rename receiver --- coderd/workspaceusage/tracker.go | 56 ++++++++++++++++---------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index 6559c29ad9513..936836814e31e 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -110,24 +110,24 @@ func WithTickChannel(c chan time.Time) Option { // Add marks the workspace with the given ID as having been used recently. // Tracker will periodically flush this to its configured Store. -func (wut *Tracker) Add(workspaceID uuid.UUID) { - wut.m.Add(workspaceID) +func (tr *Tracker) Add(workspaceID uuid.UUID) { + tr.m.Add(workspaceID) } // flush updates last_used_at of all current workspace IDs. // If this is held while a previous flush is in progress, it will // deadlock until the previous flush has completed. -func (wut *Tracker) flush(now time.Time) { +func (tr *Tracker) flush(now time.Time) { // Copy our current set of IDs - ids := wut.m.UniqueAndClear() + ids := tr.m.UniqueAndClear() count := len(ids) - if wut.flushCh != nil { // only used for testing + if tr.flushCh != nil { // only used for testing defer func() { - wut.flushCh <- count + tr.flushCh <- count }() } if count == 0 { - wut.log.Debug(context.Background(), "nothing to flush") + tr.log.Debug(context.Background(), "nothing to flush") return } @@ -136,47 +136,47 @@ func (wut *Tracker) flush(now time.Time) { defer cancel() // nolint: gocritic // system function authCtx := dbauthz.AsSystemRestricted(ctx) - wut.flushLock.Lock() - defer wut.flushLock.Unlock() - if err := wut.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{ + tr.flushLock.Lock() + defer tr.flushLock.Unlock() + if err := tr.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{ LastUsedAt: now, IDs: ids, }); err != nil { // A single failure to flush is likely not a huge problem. If the workspace is still connected at // the next iteration, either another coderd instance will likely have this data or the CLI // will tell us again that the workspace is in use. - wut.flushErrors++ - if wut.flushErrors > 1 { - wut.log.Error(ctx, "multiple failures updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", wut.flushErrors), slog.Error(err)) + tr.flushErrors++ + if tr.flushErrors > 1 { + tr.log.Error(ctx, "multiple failures updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", tr.flushErrors), slog.Error(err)) // TODO: if this keeps failing, it indicates a fundamental problem with the database connection. // How to surface it correctly to admins besides just screaming into the logs? } else { - wut.log.Warn(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err)) + tr.log.Warn(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err)) } return } - wut.flushErrors = 0 - wut.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) + tr.flushErrors = 0 + tr.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) } // Loop periodically flushes every tick. // If Loop is called after Close, it will panic. Don't do this. -func (wut *Tracker) Loop() { +func (tr *Tracker) Loop() { // Calling Loop after Close() is an error. select { - case <-wut.doneCh: + case <-tr.doneCh: panic("developer error: Loop called after Close") default: } defer func() { - wut.log.Debug(context.Background(), "workspace usage tracker loop exited") + tr.log.Debug(context.Background(), "workspace usage tracker loop exited") }() for { select { - case <-wut.stopCh: - close(wut.doneCh) + case <-tr.stopCh: + close(tr.doneCh) return - case now, ok := <-wut.tickCh: + case now, ok := <-tr.tickCh: if !ok { return } @@ -186,18 +186,18 @@ func (wut *Tracker) Loop() { // then we could capture the exact usage time of each workspace. For now however, as // we perform this query at a regular interval, the time of the flush is 'close enough' // for the purposes of both dormancy (and for autostop, in future). - wut.flush(now.UTC()) + tr.flush(now.UTC()) } } } // Close stops Tracker and returns once Loop has exited. // After calling Close(), Loop must not be called. -func (wut *Tracker) Close() { - wut.stopOnce.Do(func() { - wut.stopCh <- struct{}{} - wut.stopTick() - <-wut.doneCh +func (tr *Tracker) Close() { + tr.stopOnce.Do(func() { + tr.stopCh <- struct{}{} + tr.stopTick() + <-tr.doneCh }) } From a2e716d73d25b34abf1e8d15636debe63568e204 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 13:15:54 +0000 Subject: [PATCH 18/21] defer close doneCh --- coderd/workspaceusage/tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index 936836814e31e..c6c071096c1ea 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -169,12 +169,12 @@ func (tr *Tracker) Loop() { default: } defer func() { + close(tr.doneCh) tr.log.Debug(context.Background(), "workspace usage tracker loop exited") }() for { select { case <-tr.stopCh: - close(tr.doneCh) return case now, ok := <-tr.tickCh: if !ok { From 5b64f96a5041d7ae13b318b938e77dc809b428cd Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 13:16:15 +0000 Subject: [PATCH 19/21] defer instead of cleanup, avoid data race in real pubsub --- coderd/workspaceusage/tracker_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go index bf56e74386a53..21a66249ea496 100644 --- a/coderd/workspaceusage/tracker_test.go +++ b/coderd/workspaceusage/tracker_test.go @@ -20,6 +20,7 @@ import ( "github.com/coder/coder/v2/coderd/database/dbmock" "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" + "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/testutil" @@ -35,7 +36,7 @@ func TestTracker(t *testing.T) { tickCh := make(chan time.Time) flushCh := make(chan int, 1) wut := workspaceusage.New(mDB, workspaceusage.WithLogger(log), workspaceusage.WithTickChannel(tickCh), workspaceusage.WithFlushChannel(flushCh)) - t.Cleanup(wut.Close) + defer wut.Close() go wut.Loop() @@ -119,8 +120,11 @@ func TestTracker_MultipleInstances(t *testing.T) { // Given we have two coderd instances connected to the same database var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, ps = dbtestutil.NewDB(t) + ctx = testutil.Context(t, testutil.WaitLong) + db, _ = dbtestutil.NewDB(t) + // real pubsub is not safe for concurrent use, and this test currently + // does not depend on pubsub + ps = pubsub.NewInMemory() wuTickA = make(chan time.Time) wuFlushA = make(chan int, 1) wutA = workspaceusage.New(db, workspaceusage.WithFlushChannel(wuFlushA), workspaceusage.WithTickChannel(wuTickA)) @@ -140,6 +144,8 @@ func TestTracker_MultipleInstances(t *testing.T) { owner = coderdtest.CreateFirstUser(t, clientA) now = dbtime.Now() ) + defer wutA.Close() + defer wutB.Close() clientB.SetSessionToken(clientA.SessionToken()) From 23ccf21894ceeabde6cfbe382b97f0497b8052d5 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 13:17:16 +0000 Subject: [PATCH 20/21] fix(coderdtest): buffer just in case --- coderd/coderdtest/coderdtest.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 3c1ba02b0f4dc..c1b32628f3bc8 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -327,8 +327,8 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can // }) // // See TestPortForward for how this works in practice. - wutFlush := make(chan int) - wutTick := make(chan time.Time) + wutFlush := make(chan int, 1) // buffering just in case + wutTick := make(chan time.Time, 1) // buffering just in case options.WorkspaceUsageTracker = workspaceusage.New( options.Database, workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), From c9ac9d23b1deefad63c407cd448a8c7d6bbff7a4 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 20 Mar 2024 14:36:15 +0000 Subject: [PATCH 21/21] refactor: unexport Loop, remove panic, simplify external API --- cli/portforward_test.go | 15 +++---- cli/server.go | 8 ++++ coderd/coderd.go | 1 - coderd/coderdtest/coderdtest.go | 61 ++++++++++++++------------- coderd/workspaceusage/tracker.go | 51 ++++++++++------------ coderd/workspaceusage/tracker_test.go | 28 ++++++------ 6 files changed, 78 insertions(+), 86 deletions(-) diff --git a/cli/portforward_test.go b/cli/portforward_test.go index 2ec7674377814..edef520c23dc6 100644 --- a/cli/portforward_test.go +++ b/cli/portforward_test.go @@ -21,9 +21,7 @@ import ( "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbfake" - "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" - "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/pty/ptytest" "github.com/coder/coder/v2/testutil" @@ -99,14 +97,11 @@ func TestPortForward(t *testing.T) { // Setup agent once to be shared between test-cases (avoid expensive // non-parallel setup). var ( - db, ps = dbtestutil.NewDB(t) - wuTick = make(chan time.Time) - wuFlush = make(chan int, 1) - wut = workspaceusage.New(db, workspaceusage.WithFlushChannel(wuFlush), workspaceusage.WithTickChannel(wuTick)) - client = coderdtest.New(t, &coderdtest.Options{ - WorkspaceUsageTracker: wut, - Database: db, - Pubsub: ps, + wuTick = make(chan time.Time) + wuFlush = make(chan int, 1) + client, db = coderdtest.NewWithDatabase(t, &coderdtest.Options{ + WorkspaceUsageTrackerTick: wuTick, + WorkspaceUsageTrackerFlush: wuFlush, }) admin = coderdtest.CreateFirstUser(t, client) member, memberUser = coderdtest.CreateAnotherUser(t, client, admin.OrganizationID) diff --git a/cli/server.go b/cli/server.go index 94648bb900282..f371c30156135 100644 --- a/cli/server.go +++ b/cli/server.go @@ -86,6 +86,7 @@ import ( stringutil "github.com/coder/coder/v2/coderd/util/strings" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" + "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpc" "github.com/coder/coder/v2/cryptorand" @@ -968,6 +969,13 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. purger := dbpurge.New(ctx, logger, options.Database) defer purger.Close() + // Updates workspace usage + tracker := workspaceusage.New(options.Database, + workspaceusage.WithLogger(logger.Named("workspace_usage_tracker")), + ) + options.WorkspaceUsageTracker = tracker + defer tracker.Close() + // Wrap the server in middleware that redirects to the access URL if // the request is not to a local IP. var handler http.Handler = coderAPI.RootHandler diff --git a/coderd/coderd.go b/coderd/coderd.go index af4b4c51a9725..6b89f4d09556d 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -371,7 +371,6 @@ func New(options *Options) *API { workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), ) } - go options.WorkspaceUsageTracker.Loop() ctx, cancel := context.WithCancel(context.Background()) r := chi.NewRouter() diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index c1b32628f3bc8..303f840938817 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -147,7 +147,8 @@ type Options struct { WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions AllowWorkspaceRenames bool NewTicker func(duration time.Duration) (<-chan time.Time, func()) - WorkspaceUsageTracker *workspaceusage.Tracker + WorkspaceUsageTrackerFlush chan int + WorkspaceUsageTrackerTick chan time.Time } // New constructs a codersdk client connected to an in-memory API instance. @@ -308,35 +309,35 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can hangDetector.Start() t.Cleanup(hangDetector.Close) - if options.WorkspaceUsageTracker == nil { - // Did last_used_at not update? Scratching your noggin? Here's why. - // Workspace usage tracking must be triggered manually in tests. - // The vast majority of existing tests do not depend on last_used_at - // and adding an extra background goroutine to all existing tests may - // lead to future flakes and goleak complaints. - // To do this, pass in your own WorkspaceUsageTracker like so: - // - // db, ps = dbtestutil.NewDB(t) - // wuTick = make(chan time.Time) - // wuFlush = make(chan int, 1) - // wut = workspaceusage.New(db, workspaceusage.WithFlushChannel(wuFlush), workspaceusage.WithTickChannel(wuTick)) - // client = coderdtest.New(t, &coderdtest.Options{ - // WorkspaceUsageTracker: wut, - // Database: db, - // Pubsub: ps, - // }) - // - // See TestPortForward for how this works in practice. - wutFlush := make(chan int, 1) // buffering just in case - wutTick := make(chan time.Time, 1) // buffering just in case - options.WorkspaceUsageTracker = workspaceusage.New( - options.Database, - workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), - workspaceusage.WithFlushChannel(wutFlush), - workspaceusage.WithTickChannel(wutTick), - ) + // Did last_used_at not update? Scratching your noggin? Here's why. + // Workspace usage tracking must be triggered manually in tests. + // The vast majority of existing tests do not depend on last_used_at + // and adding an extra time-based background goroutine to all existing + // tests may lead to future flakes and goleak complaints. + // Instead, pass in your own flush and ticker like so: + // + // tickCh = make(chan time.Time) + // flushCh = make(chan int, 1) + // client = coderdtest.New(t, &coderdtest.Options{ + // WorkspaceUsageTrackerFlush: flushCh, + // WorkspaceUsageTrackerTick: tickCh + // }) + // + // Now to trigger a tick, just write to `tickCh`. + // Reading from `flushCh` will ensure that workspaceusage.Tracker flushed. + // See TestPortForward or TestTracker_MultipleInstances for how this works in practice. + if options.WorkspaceUsageTrackerFlush == nil { + options.WorkspaceUsageTrackerFlush = make(chan int, 1) // buffering just in case + } + if options.WorkspaceUsageTrackerTick == nil { + options.WorkspaceUsageTrackerTick = make(chan time.Time, 1) // buffering just in case } - t.Cleanup(options.WorkspaceUsageTracker.Close) + // Close is called by API.Close() + wuTracker := workspaceusage.New( + options.Database, + workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), + workspaceusage.WithTickFlush(options.WorkspaceUsageTrackerTick, options.WorkspaceUsageTrackerFlush), + ) var mutex sync.RWMutex var handler http.Handler @@ -486,7 +487,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions, AllowWorkspaceRenames: options.AllowWorkspaceRenames, NewTicker: options.NewTicker, - WorkspaceUsageTracker: options.WorkspaceUsageTracker, + WorkspaceUsageTracker: wuTracker, } } diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go index c6c071096c1ea..6a3659a5008d8 100644 --- a/coderd/workspaceusage/tracker.go +++ b/coderd/workspaceusage/tracker.go @@ -45,7 +45,7 @@ type Tracker struct { // New returns a new Tracker. It is the caller's responsibility // to call Close(). func New(s Store, opts ...Option) *Tracker { - hb := &Tracker{ + tr := &Tracker{ log: slog.Make(sloghuman.Sink(os.Stderr)), m: &uuidSet{}, s: s, @@ -56,14 +56,15 @@ func New(s Store, opts ...Option) *Tracker { flushCh: nil, } for _, opt := range opts { - opt(hb) + opt(tr) } - if hb.tickCh == nil && hb.stopTick == nil { - ticker := time.NewTicker(DefaultFlushInterval) - hb.tickCh = ticker.C - hb.stopTick = ticker.Stop + if tr.tickCh == nil && tr.stopTick == nil { + tick := time.NewTicker(DefaultFlushInterval) + tr.tickCh = tick.C + tr.stopTick = tick.Stop } - return hb + go tr.loop() + return tr } type Option func(*Tracker) @@ -84,27 +85,18 @@ func WithFlushInterval(d time.Duration) Option { } } -// WithFlushChannel allows passing a channel that receives -// the number of marked workspaces every time Tracker flushes. +// WithTickFlush allows passing two channels: one that reads +// a time.Time, and one that returns the number of marked workspaces +// every time Tracker flushes. // For testing only and will panic if used outside of tests. -func WithFlushChannel(c chan int) Option { +func WithTickFlush(tickCh <-chan time.Time, flushCh chan int) Option { if flag.Lookup("test.v") == nil { - panic("developer error: WithFlushChannel is not to be used outside of tests.") + panic("developer error: WithTickFlush is not to be used outside of tests.") } return func(h *Tracker) { - h.flushCh = c - } -} - -// WithTickChannel allows passing a channel to replace a ticker. -// For testing only and will panic if used outside of tests. -func WithTickChannel(c chan time.Time) Option { - if flag.Lookup("test.v") == nil { - panic("developer error: WithTickChannel is not to be used outside of tests.") - } - return func(h *Tracker) { - h.tickCh = c + h.tickCh = tickCh h.stopTick = func() {} + h.flushCh = flushCh } } @@ -159,13 +151,13 @@ func (tr *Tracker) flush(now time.Time) { tr.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) } -// Loop periodically flushes every tick. -// If Loop is called after Close, it will panic. Don't do this. -func (tr *Tracker) Loop() { - // Calling Loop after Close() is an error. +// loop periodically flushes every tick. +// If loop is called after Close, it will exit immediately and log an error. +func (tr *Tracker) loop() { select { case <-tr.doneCh: - panic("developer error: Loop called after Close") + tr.log.Error(context.Background(), "developer error: Loop called after Close") + return default: } defer func() { @@ -193,12 +185,13 @@ func (tr *Tracker) Loop() { // Close stops Tracker and returns once Loop has exited. // After calling Close(), Loop must not be called. -func (tr *Tracker) Close() { +func (tr *Tracker) Close() error { tr.stopOnce.Do(func() { tr.stopCh <- struct{}{} tr.stopTick() <-tr.doneCh }) + return nil } // uuidSet is a set of UUIDs. Safe for concurrent usage. diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go index 21a66249ea496..ae9a9d2162d1c 100644 --- a/coderd/workspaceusage/tracker_test.go +++ b/coderd/workspaceusage/tracker_test.go @@ -35,11 +35,12 @@ func TestTracker(t *testing.T) { tickCh := make(chan time.Time) flushCh := make(chan int, 1) - wut := workspaceusage.New(mDB, workspaceusage.WithLogger(log), workspaceusage.WithTickChannel(tickCh), workspaceusage.WithFlushChannel(flushCh)) + wut := workspaceusage.New(mDB, + workspaceusage.WithLogger(log), + workspaceusage.WithTickFlush(tickCh, flushCh), + ) defer wut.Close() - go wut.Loop() - // 1. No marked workspaces should imply no flush. now := dbtime.Now() tickCh <- now @@ -106,9 +107,6 @@ func TestTracker(t *testing.T) { // 5. Closing multiple times should not be a problem. wut.Close() wut.Close() - - // 6. Running Loop() again should panic. - require.Panics(t, wut.Loop) } // This test performs a more 'integration-style' test with multiple instances. @@ -127,25 +125,23 @@ func TestTracker_MultipleInstances(t *testing.T) { ps = pubsub.NewInMemory() wuTickA = make(chan time.Time) wuFlushA = make(chan int, 1) - wutA = workspaceusage.New(db, workspaceusage.WithFlushChannel(wuFlushA), workspaceusage.WithTickChannel(wuTickA)) wuTickB = make(chan time.Time) wuFlushB = make(chan int, 1) - wutB = workspaceusage.New(db, workspaceusage.WithFlushChannel(wuFlushB), workspaceusage.WithTickChannel(wuTickB)) clientA = coderdtest.New(t, &coderdtest.Options{ - WorkspaceUsageTracker: wutA, - Database: db, - Pubsub: ps, + WorkspaceUsageTrackerTick: wuTickA, + WorkspaceUsageTrackerFlush: wuFlushA, + Database: db, + Pubsub: ps, }) clientB = coderdtest.New(t, &coderdtest.Options{ - WorkspaceUsageTracker: wutB, - Database: db, - Pubsub: ps, + WorkspaceUsageTrackerTick: wuTickB, + WorkspaceUsageTrackerFlush: wuFlushB, + Database: db, + Pubsub: ps, }) owner = coderdtest.CreateFirstUser(t, clientA) now = dbtime.Now() ) - defer wutA.Close() - defer wutB.Close() clientB.SetSessionToken(clientA.SessionToken())