From 0b002ec9f5c4cc19c05b208798b0f2cc6f3d0084 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 21 May 2025 15:09:22 +0000 Subject: [PATCH 01/11] feat(agent/agentcontainers): update containers periodically This change introduces a significant refactor to the agentcontainers API and enabled periodic updates of Docker containers rather than on-demand. Consequently this change also allows us to move away from using a locking channel and replace it with a mutex, which simplifies usage. Additionally a previous oversight was fixed, and testing added, to clear devcontainer running/dirty status when the container has been removed. Updates coder/coder#16424 Updates coder/internal#621 --- agent/agent.go | 6 - agent/agentcontainers/api.go | 518 +++++++++++++++++------------- agent/agentcontainers/api_test.go | 237 +++++++++----- cli/ssh_test.go | 11 +- coderd/workspaceagents_test.go | 22 +- 5 files changed, 467 insertions(+), 327 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index ffdacfb64ba75..cbb12f9655f5a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1176,12 +1176,6 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, } a.metrics.startupScriptSeconds.WithLabelValues(label).Set(dur) a.scriptRunner.StartCron() - if containerAPI := a.containerAPI.Load(); containerAPI != nil { - // Inform the container API that the agent is ready. - // This allows us to start watching for changes to - // the devcontainer configuration files. - containerAPI.SignalReady() - } }) if err != nil { return xerrors.Errorf("track conn goroutine: %w", err) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index f2164c9a874ff..74a47d37954d9 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -8,6 +8,7 @@ import ( "path" "slices" "strings" + "sync" "time" "github.com/fsnotify/fsnotify" @@ -25,35 +26,35 @@ import ( ) const ( - defaultGetContainersCacheDuration = 10 * time.Second - dockerCreatedAtTimeFormat = "2006-01-02 15:04:05 -0700 MST" - getContainersTimeout = 5 * time.Second + defaultUpdateInterval = 10 * time.Second + listContainersTimeout = 15 * time.Second ) // API is responsible for container-related operations in the agent. // It provides methods to list and manage containers. type API struct { - ctx context.Context - cancel context.CancelFunc - done chan struct{} - logger slog.Logger - watcher watcher.Watcher - - cacheDuration time.Duration - execer agentexec.Execer - cl Lister - dccli DevcontainerCLI - clock quartz.Clock - scriptLogger func(logSourceID uuid.UUID) ScriptLogger - - // lockCh protects the below fields. We use a channel instead of a - // mutex so we can handle cancellation properly. - lockCh chan struct{} - containers codersdk.WorkspaceAgentListContainersResponse - mtime time.Time - devcontainerNames map[string]struct{} // Track devcontainer names to avoid duplicates. - knownDevcontainers []codersdk.WorkspaceAgentDevcontainer // Track predefined and runtime-detected devcontainers. - configFileModifiedTimes map[string]time.Time // Track when config files were last modified. + ctx context.Context + cancel context.CancelFunc + watcherDone chan struct{} + updaterDone chan struct{} + initialUpdateDone chan struct{} // Closed after first update in updaterLoop. + refreshTrigger chan chan error // Channel to trigger manual refresh. + updateInterval time.Duration // Interval for periodic container updates. + logger slog.Logger + watcher watcher.Watcher + execer agentexec.Execer + cl Lister + dccli DevcontainerCLI + clock quartz.Clock + scriptLogger func(logSourceID uuid.UUID) ScriptLogger + + mu sync.RWMutex + closed bool + containers codersdk.WorkspaceAgentListContainersResponse // Output from the last list operation. + containersErr error // Error from the last list operation. + devcontainerNames map[string]struct{} + knownDevcontainers []codersdk.WorkspaceAgentDevcontainer + configFileModifiedTimes map[string]time.Time devcontainerLogSourceIDs map[string]uuid.UUID // Track devcontainer log source IDs. } @@ -69,15 +70,6 @@ func WithClock(clock quartz.Clock) Option { } } -// WithCacheDuration sets the cache duration for the API. -// This is used to control how often the API refreshes the list of -// containers. The default is 10 seconds. -func WithCacheDuration(d time.Duration) Option { - return func(api *API) { - api.cacheDuration = d - } -} - // WithExecer sets the agentexec.Execer implementation to use. func WithExecer(execer agentexec.Execer) Option { return func(api *API) { @@ -169,12 +161,14 @@ func NewAPI(logger slog.Logger, options ...Option) *API { api := &API{ ctx: ctx, cancel: cancel, - done: make(chan struct{}), + watcherDone: make(chan struct{}), + updaterDone: make(chan struct{}), + initialUpdateDone: make(chan struct{}), + refreshTrigger: make(chan chan error), + updateInterval: defaultUpdateInterval, logger: logger, clock: quartz.NewReal(), execer: agentexec.DefaultExecer, - cacheDuration: defaultGetContainersCacheDuration, - lockCh: make(chan struct{}, 1), devcontainerNames: make(map[string]struct{}), knownDevcontainers: []codersdk.WorkspaceAgentDevcontainer{}, configFileModifiedTimes: make(map[string]time.Time), @@ -200,33 +194,16 @@ func NewAPI(logger slog.Logger, options ...Option) *API { } } - go api.loop() + go api.watcherLoop() + go api.updaterLoop() return api } -// SignalReady signals the API that we are ready to begin watching for -// file changes. This is used to prime the cache with the current list -// of containers and to start watching the devcontainer config files for -// changes. It should be called after the agent ready. -func (api *API) SignalReady() { - // Prime the cache with the current list of containers. - _, _ = api.cl.List(api.ctx) - - // Make sure we watch the devcontainer config files for changes. - for _, devcontainer := range api.knownDevcontainers { - if devcontainer.ConfigPath == "" { - continue - } - - if err := api.watcher.Add(devcontainer.ConfigPath); err != nil { - api.logger.Error(api.ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", devcontainer.ConfigPath)) - } - } -} - -func (api *API) loop() { - defer close(api.done) +func (api *API) watcherLoop() { + defer close(api.watcherDone) + defer api.logger.Debug(api.ctx, "watcher loop stopped") + api.logger.Debug(api.ctx, "watcher loop started") for { event, err := api.watcher.Next(api.ctx) @@ -263,10 +240,102 @@ func (api *API) loop() { } } +// updaterLoop is responsible for periodically updating the container +// list and handling manual refresh requests. +func (api *API) updaterLoop() { + defer close(api.updaterDone) + defer api.logger.Debug(api.ctx, "updater loop stopped") + api.logger.Debug(api.ctx, "updater loop started") + + // Ensure that only once instance of the updateContainers is running + // at a time. This is a workaround since quartz.Ticker does not + // allow us to know if the routine has completed. + sema := make(chan struct{}, 1) + sema <- struct{}{} + + // Ensure only one updateContainers is running at a time, others are + // queued. + doUpdate := func() error { + select { + case <-api.ctx.Done(): + return api.ctx.Err() + case <-sema: + } + defer func() { sema <- struct{}{} }() + + return api.updateContainers(api.ctx) + } + + api.logger.Debug(api.ctx, "performing initial containers update") + if err := doUpdate(); err != nil { + api.logger.Error(api.ctx, "initial containers update failed", slog.Error(err)) + } else { + api.logger.Debug(api.ctx, "initial containers update complete") + } + // Signal that the initial update attempt (successful or not) is done. + // Other services can wait on this if they need the first data to be available. + close(api.initialUpdateDone) + + // Use a ticker func to ensure that doUpdate has run to completion + // when advancing time. + waiter := api.clock.TickerFunc(api.ctx, api.updateInterval, func() error { + err := doUpdate() + if err != nil { + api.logger.Error(api.ctx, "periodic containers update failed", slog.Error(err)) + } + return nil // Always nil, keep going. + }) + defer func() { + if err := waiter.Wait(); err != nil { + api.logger.Error(api.ctx, "updater loop ticker failed", slog.Error(err)) + } + }() + + for { + select { + case <-api.ctx.Done(): + api.logger.Debug(api.ctx, "updater loop context canceled") + return + case ch := <-api.refreshTrigger: + api.logger.Debug(api.ctx, "manual containers update triggered") + err := doUpdate() + if err != nil { + api.logger.Error(api.ctx, "manual containers update failed", slog.Error(err)) + } + ch <- err + close(ch) + } + } +} + // Routes returns the HTTP handler for container-related routes. func (api *API) Routes() http.Handler { r := chi.NewRouter() + ensureInitialUpdateDoneMW := func(next http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + select { + case <-api.ctx.Done(): + httpapi.Write(r.Context(), rw, http.StatusServiceUnavailable, codersdk.Response{ + Message: "API closed", + Detail: "The API is closed and cannot process requests.", + }) + return + case <-r.Context().Done(): + return + case <-api.initialUpdateDone: + // Initial update is done, we can start processing + // requests. + } + next.ServeHTTP(rw, r) + }) + } + + // For now, all endpoints require the initial update to be done. + // If we want to allow some endpoints to be available before + // the initial update, we can enable this per-route. + r.Use(ensureInitialUpdateDoneMW) + r.Get("/", api.handleList) r.Route("/devcontainers", func(r chi.Router) { r.Get("/", api.handleDevcontainersList) @@ -278,62 +347,53 @@ func (api *API) Routes() http.Handler { // handleList handles the HTTP request to list containers. func (api *API) handleList(rw http.ResponseWriter, r *http.Request) { - select { - case <-r.Context().Done(): - // Client went away. + ct, err := api.getContainers() + if err != nil { + httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Could not get containers", + Detail: err.Error(), + }) return - default: - ct, err := api.getContainers(r.Context()) - if err != nil { - if errors.Is(err, context.Canceled) { - httpapi.Write(r.Context(), rw, http.StatusRequestTimeout, codersdk.Response{ - Message: "Could not get containers.", - Detail: "Took too long to list containers.", - }) - return - } - httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{ - Message: "Could not get containers.", - Detail: err.Error(), - }) - return - } - - httpapi.Write(r.Context(), rw, http.StatusOK, ct) } + httpapi.Write(r.Context(), rw, http.StatusOK, ct) } -func copyListContainersResponse(resp codersdk.WorkspaceAgentListContainersResponse) codersdk.WorkspaceAgentListContainersResponse { - return codersdk.WorkspaceAgentListContainersResponse{ - Containers: slices.Clone(resp.Containers), - Warnings: slices.Clone(resp.Warnings), - } -} +// updateContainers fetches the latest container list, processes it, and +// updates the cache. It performs locking for updating shared API state. +func (api *API) updateContainers(ctx context.Context) error { + listCtx, listCancel := context.WithTimeout(ctx, listContainersTimeout) + defer listCancel() -func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListContainersResponse, error) { - select { - case <-api.ctx.Done(): - return codersdk.WorkspaceAgentListContainersResponse{}, api.ctx.Err() - case <-ctx.Done(): - return codersdk.WorkspaceAgentListContainersResponse{}, ctx.Err() - case api.lockCh <- struct{}{}: - defer func() { <-api.lockCh }() - } + updated, err := api.cl.List(listCtx) + if err != nil { + // If the context was canceled, we hold off on clearing the + // containers cache. This is to avoid clearing the cache if + // the update was canceled due to a timeout. Hopefully this + // will clear up on the next update. + if !errors.Is(err, context.Canceled) { + api.mu.Lock() + api.containers = codersdk.WorkspaceAgentListContainersResponse{} + api.containersErr = err + api.mu.Unlock() + } - now := api.clock.Now() - if now.Sub(api.mtime) < api.cacheDuration { - return copyListContainersResponse(api.containers), nil + return xerrors.Errorf("list containers failed: %w", err) } - timeoutCtx, timeoutCancel := context.WithTimeout(ctx, getContainersTimeout) - defer timeoutCancel() - updated, err := api.cl.List(timeoutCtx) - if err != nil { - return codersdk.WorkspaceAgentListContainersResponse{}, xerrors.Errorf("get containers: %w", err) - } - api.containers = updated - api.mtime = now + api.mu.Lock() + defer api.mu.Unlock() + + api.processUpdatedContainersLocked(ctx, updated) + api.logger.Debug(ctx, "containers updated successfully", slog.F("container_count", len(api.containers.Containers)), slog.F("warning_count", len(api.containers.Warnings)), slog.F("devcontainer_count", len(api.knownDevcontainers))) + + return nil +} + +// processUpdatedContainersLocked updates the devcontainer state based +// on the latest list of containers. This method assumes that api.mu is +// held. +func (api *API) processUpdatedContainersLocked(ctx context.Context, updated codersdk.WorkspaceAgentListContainersResponse) { dirtyStates := make(map[string]bool) // Reset all known devcontainers to not running. for i := range api.knownDevcontainers { @@ -345,6 +405,7 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC } // Check if the container is running and update the known devcontainers. + updatedDevcontainers := make(map[string]bool) for i := range updated.Containers { container := &updated.Containers[i] workspaceFolder := container.Labels[DevcontainerLocalFolderLabel] @@ -354,18 +415,16 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC continue } - container.DevcontainerDirty = dirtyStates[workspaceFolder] - if container.DevcontainerDirty { - lastModified, hasModTime := api.configFileModifiedTimes[configFile] - if hasModTime && container.CreatedAt.After(lastModified) { - api.logger.Info(ctx, "new container created after config modification, not marking as dirty", - slog.F("container", container.ID), - slog.F("created_at", container.CreatedAt), - slog.F("config_modified_at", lastModified), - slog.F("file", configFile), - ) - container.DevcontainerDirty = false - } + if lastModified, hasModTime := api.configFileModifiedTimes[configFile]; !hasModTime || container.CreatedAt.Before(lastModified) { + api.logger.Debug(ctx, "container created before config modification, setting dirty state from devcontainer", + slog.F("container", container.ID), + slog.F("created_at", container.CreatedAt), + slog.F("config_modified_at", lastModified), + slog.F("file", configFile), + slog.F("workspace_folder", workspaceFolder), + slog.F("dirty", dirtyStates[workspaceFolder]), + ) + container.DevcontainerDirty = dirtyStates[workspaceFolder] } // Check if this is already in our known list. @@ -373,29 +432,17 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC return dc.WorkspaceFolder == workspaceFolder }); knownIndex != -1 { // Update existing entry with runtime information. - if configFile != "" && api.knownDevcontainers[knownIndex].ConfigPath == "" { - api.knownDevcontainers[knownIndex].ConfigPath = configFile + dc := &api.knownDevcontainers[knownIndex] + if configFile != "" && dc.ConfigPath == "" { + dc.ConfigPath = configFile if err := api.watcher.Add(configFile); err != nil { api.logger.Error(ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", configFile)) } } - api.knownDevcontainers[knownIndex].Running = container.Running - api.knownDevcontainers[knownIndex].Container = container - - // Check if this container was created after the config - // file was modified. - if configFile != "" && api.knownDevcontainers[knownIndex].Dirty { - lastModified, hasModTime := api.configFileModifiedTimes[configFile] - if hasModTime && container.CreatedAt.After(lastModified) { - api.logger.Info(ctx, "clearing dirty flag for container created after config modification", - slog.F("container", container.ID), - slog.F("created_at", container.CreatedAt), - slog.F("config_modified_at", lastModified), - slog.F("file", configFile), - ) - api.knownDevcontainers[knownIndex].Dirty = false - } - } + dc.Running = container.Running + dc.Container = container + dc.Dirty = container.DevcontainerDirty + updatedDevcontainers[workspaceFolder] = true continue } @@ -428,9 +475,67 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC Dirty: container.DevcontainerDirty, Container: container, }) + updatedDevcontainers[workspaceFolder] = true + } + + for i := range api.knownDevcontainers { + if _, ok := updatedDevcontainers[api.knownDevcontainers[i].WorkspaceFolder]; ok { + continue + } + + dc := &api.knownDevcontainers[i] + + if !dc.Running && !dc.Dirty && dc.Container == nil { + // Already marked as not running, skip. + continue + } + + api.logger.Debug(ctx, "devcontainer is not running anymore, marking as not running", + slog.F("workspace_folder", dc.WorkspaceFolder), + slog.F("config_path", dc.ConfigPath), + slog.F("name", dc.Name), + ) + dc.Running = false + dc.Dirty = false + dc.Container = nil + } + + api.containers = updated + api.containersErr = nil +} + +// refreshContainers triggers an immediate update of the container list +// and waits for it to complete. +func (api *API) refreshContainers(ctx context.Context) error { + done := make(chan error, 1) + select { + case <-api.ctx.Done(): + return xerrors.Errorf("API closed, cannot send refresh trigger: %w", api.ctx.Err()) + case <-ctx.Done(): + return ctx.Err() + case api.refreshTrigger <- done: + select { + case <-api.ctx.Done(): + return xerrors.Errorf("API closed, cannot wait for refresh: %w", api.ctx.Err()) + case <-ctx.Done(): + return ctx.Err() + case err := <-done: + return err + } } +} - return copyListContainersResponse(api.containers), nil +func (api *API) getContainers() (codersdk.WorkspaceAgentListContainersResponse, error) { + api.mu.RLock() + defer api.mu.RUnlock() + + if api.containersErr != nil { + return codersdk.WorkspaceAgentListContainersResponse{}, api.containersErr + } + return codersdk.WorkspaceAgentListContainersResponse{ + Containers: slices.Clone(api.containers.Containers), + Warnings: slices.Clone(api.containers.Warnings), + }, nil } // handleDevcontainerRecreate handles the HTTP request to recreate a @@ -447,7 +552,7 @@ func (api *API) handleDevcontainerRecreate(w http.ResponseWriter, r *http.Reques return } - containers, err := api.getContainers(ctx) + containers, err := api.getContainers() if err != nil { httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{ Message: "Could not list containers", @@ -509,30 +614,9 @@ func (api *API) handleDevcontainerRecreate(w http.ResponseWriter, r *http.Reques return } - // TODO(mafredri): Temporarily handle clearing the dirty state after - // recreation, later on this should be handled by a "container watcher". - if !api.doLockedHandler(w, r, func() { - for i := range api.knownDevcontainers { - if api.knownDevcontainers[i].WorkspaceFolder == workspaceFolder { - if api.knownDevcontainers[i].Dirty { - api.logger.Info(ctx, "clearing dirty flag after recreation", - slog.F("workspace_folder", workspaceFolder), - slog.F("name", api.knownDevcontainers[i].Name), - ) - api.knownDevcontainers[i].Dirty = false - // TODO(mafredri): This should be handled by a service that - // updates the devcontainer state periodically and on-demand. - api.knownDevcontainers[i].Container = nil - // Set the modified time to the zero value to indicate that - // the containers list must be refreshed. This will see to - // it that the new container is re-assigned. - api.mtime = time.Time{} - } - return - } - } - }) { - return + // NOTE(mafredri): This won't be needed once recreation is done async. + if err := api.refreshContainers(r.Context()); err != nil { + api.logger.Error(ctx, "failed to trigger immediate refresh after devcontainer recreation", slog.Error(err)) } w.WriteHeader(http.StatusNoContent) @@ -542,8 +626,10 @@ func (api *API) handleDevcontainerRecreate(w http.ResponseWriter, r *http.Reques func (api *API) handleDevcontainersList(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - // Run getContainers to detect the latest devcontainers and their state. - _, err := api.getContainers(ctx) + api.mu.RLock() + err := api.containersErr + devcontainers := slices.Clone(api.knownDevcontainers) + api.mu.RUnlock() if err != nil { httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{ Message: "Could not list containers", @@ -552,13 +638,6 @@ func (api *API) handleDevcontainersList(w http.ResponseWriter, r *http.Request) return } - var devcontainers []codersdk.WorkspaceAgentDevcontainer - if !api.doLockedHandler(w, r, func() { - devcontainers = slices.Clone(api.knownDevcontainers) - }) { - return - } - slices.SortFunc(devcontainers, func(a, b codersdk.WorkspaceAgentDevcontainer) int { if cmp := strings.Compare(a.WorkspaceFolder, b.WorkspaceFolder); cmp != 0 { return cmp @@ -576,75 +655,52 @@ func (api *API) handleDevcontainersList(w http.ResponseWriter, r *http.Request) // markDevcontainerDirty finds the devcontainer with the given config file path // and marks it as dirty. It acquires the lock before modifying the state. func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) { - ok := api.doLocked(func() { - // Record the timestamp of when this configuration file was modified. - api.configFileModifiedTimes[configPath] = modifiedAt + api.mu.Lock() + defer api.mu.Unlock() - for i := range api.knownDevcontainers { - if api.knownDevcontainers[i].ConfigPath != configPath { - continue - } + // Record the timestamp of when this configuration file was modified. + api.configFileModifiedTimes[configPath] = modifiedAt - // TODO(mafredri): Simplistic mark for now, we should check if the - // container is running and if the config file was modified after - // the container was created. - if !api.knownDevcontainers[i].Dirty { - api.logger.Info(api.ctx, "marking devcontainer as dirty", - slog.F("file", configPath), - slog.F("name", api.knownDevcontainers[i].Name), - slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder), - slog.F("modified_at", modifiedAt), - ) - api.knownDevcontainers[i].Dirty = true - if api.knownDevcontainers[i].Container != nil { - api.knownDevcontainers[i].Container.DevcontainerDirty = true - } + for i := range api.knownDevcontainers { + if api.knownDevcontainers[i].ConfigPath != configPath { + continue + } + + // TODO(mafredri): Simplistic mark for now, we should check if the + // container is running and if the config file was modified after + // the container was created. + if !api.knownDevcontainers[i].Dirty { + api.logger.Info(api.ctx, "marking devcontainer as dirty", + slog.F("file", configPath), + slog.F("name", api.knownDevcontainers[i].Name), + slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder), + slog.F("modified_at", modifiedAt), + ) + api.knownDevcontainers[i].Dirty = true + if api.knownDevcontainers[i].Container != nil { + api.knownDevcontainers[i].Container.DevcontainerDirty = true } } - }) - if !ok { - api.logger.Debug(api.ctx, "mark devcontainer dirty failed", slog.F("file", configPath)) } } -func (api *API) doLockedHandler(w http.ResponseWriter, r *http.Request, f func()) bool { - select { - case <-r.Context().Done(): - httpapi.Write(r.Context(), w, http.StatusRequestTimeout, codersdk.Response{ - Message: "Request canceled", - Detail: "Request was canceled before we could process it.", - }) - return false - case <-api.ctx.Done(): - httpapi.Write(r.Context(), w, http.StatusServiceUnavailable, codersdk.Response{ - Message: "API closed", - Detail: "The API is closed and cannot process requests.", - }) - return false - case api.lockCh <- struct{}{}: - defer func() { <-api.lockCh }() +func (api *API) Close() error { + api.mu.Lock() + if api.closed { + api.mu.Unlock() + return nil } - f() - return true -} + api.closed = true -func (api *API) doLocked(f func()) bool { - select { - case <-api.ctx.Done(): - return false - case api.lockCh <- struct{}{}: - defer func() { <-api.lockCh }() - } - f() - return true -} + api.logger.Debug(api.ctx, "closing API") + defer api.logger.Debug(api.ctx, "closed API") -func (api *API) Close() error { api.cancel() - <-api.done err := api.watcher.Close() - if err != nil { - return err - } - return nil + + api.mu.Unlock() + <-api.watcherDone + <-api.updaterDone + + return err } diff --git a/agent/agentcontainers/api_test.go b/agent/agentcontainers/api_test.go index 2e173b7d5a6b4..ee81b4069ecc2 100644 --- a/agent/agentcontainers/api_test.go +++ b/agent/agentcontainers/api_test.go @@ -164,12 +164,8 @@ func TestAPI(t *testing.T) { // Each test case is called multiple times to ensure idempotency for _, tc := range []struct { name string - // data to be stored in the handler - cacheData codersdk.WorkspaceAgentListContainersResponse - // duration of cache - cacheDur time.Duration - // relative age of the cached data - cacheAge time.Duration + // initialData to be stored in the handler + initialData codersdk.WorkspaceAgentListContainersResponse // function to set up expectations for the mock setupMock func(mcl *acmock.MockLister, preReq *gomock.Call) // expected result @@ -178,28 +174,17 @@ func TestAPI(t *testing.T) { expectedErr string }{ { - name: "no cache", + name: "no initial data", + initialData: makeResponse(), setupMock: func(mcl *acmock.MockLister, preReq *gomock.Call) { mcl.EXPECT().List(gomock.Any()).Return(makeResponse(fakeCt), nil).After(preReq).AnyTimes() }, expected: makeResponse(fakeCt), }, { - name: "no data", - cacheData: makeResponse(), - cacheAge: 2 * time.Second, - cacheDur: time.Second, - setupMock: func(mcl *acmock.MockLister, preReq *gomock.Call) { - mcl.EXPECT().List(gomock.Any()).Return(makeResponse(fakeCt), nil).After(preReq).AnyTimes() - }, - expected: makeResponse(fakeCt), - }, - { - name: "cached data", - cacheAge: time.Second, - cacheData: makeResponse(fakeCt), - cacheDur: 2 * time.Second, - expected: makeResponse(fakeCt), + name: "repeat initial data", + initialData: makeResponse(fakeCt), + expected: makeResponse(fakeCt), }, { name: "lister error", @@ -209,10 +194,8 @@ func TestAPI(t *testing.T) { expectedErr: assert.AnError.Error(), }, { - name: "stale cache", - cacheAge: 2 * time.Second, - cacheData: makeResponse(fakeCt), - cacheDur: time.Second, + name: "updated data", + initialData: makeResponse(fakeCt), setupMock: func(mcl *acmock.MockLister, preReq *gomock.Call) { mcl.EXPECT().List(gomock.Any()).Return(makeResponse(fakeCt2), nil).After(preReq).AnyTimes() }, @@ -227,54 +210,55 @@ func TestAPI(t *testing.T) { clk = quartz.NewMock(t) ctrl = gomock.NewController(t) mockLister = acmock.NewMockLister(ctrl) - now = time.Now().UTC() - logger = slogtest.Make(t, nil).Leveled(slog.LevelDebug) + logger = slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) r = chi.NewRouter() - api = agentcontainers.NewAPI(logger, - agentcontainers.WithCacheDuration(tc.cacheDur), - agentcontainers.WithClock(clk), - agentcontainers.WithLister(mockLister), - ) ) - defer api.Close() - - r.Mount("/", api.Routes()) - preReq := mockLister.EXPECT().List(gomock.Any()).Return(tc.cacheData, nil).Times(1) + initialData := mockLister.EXPECT().List(gomock.Any()).Return(tc.initialData, nil) if tc.setupMock != nil { - tc.setupMock(mockLister, preReq) - } - - if tc.cacheAge != 0 { - clk.Set(now.Add(-tc.cacheAge)).MustWait(ctx) + tc.setupMock(mockLister, initialData.Times(1)) } else { - clk.Set(now).MustWait(ctx) + initialData.AnyTimes() } - // Prime the cache with the initial data. - req := httptest.NewRequest(http.MethodGet, "/", nil) + api := agentcontainers.NewAPI(logger, + agentcontainers.WithClock(clk), + agentcontainers.WithLister(mockLister), + ) + defer api.Close() + r.Mount("/", api.Routes()) + + // Initial request returns the initial data. + req := httptest.NewRequest(http.MethodGet, "/", nil). + WithContext(ctx) rec := httptest.NewRecorder() r.ServeHTTP(rec, req) - clk.Set(now).MustWait(ctx) - - // Repeat the test to ensure idempotency - for i := 0; i < 2; i++ { - req = httptest.NewRequest(http.MethodGet, "/", nil) - rec = httptest.NewRecorder() - r.ServeHTTP(rec, req) - - if tc.expectedErr != "" { - got := &codersdk.Error{} - err := json.NewDecoder(rec.Body).Decode(got) - require.NoError(t, err, "unmarshal response failed") - require.ErrorContains(t, got, tc.expectedErr, "expected error (attempt %d)", i) - } else { - var got codersdk.WorkspaceAgentListContainersResponse - err := json.NewDecoder(rec.Body).Decode(&got) - require.NoError(t, err, "unmarshal response failed") - require.Equal(t, tc.expected, got, "expected containers to be equal (attempt %d)", i) - } + var got codersdk.WorkspaceAgentListContainersResponse + err := json.NewDecoder(rec.Body).Decode(&got) + require.NoError(t, err, "unmarshal response failed") + require.Equal(t, tc.initialData, got, "want initial data") + + // Advance the clock to run updateLoop. + _, aw := clk.AdvanceNext() + aw.MustWait(ctx) + + // Second request returns the updated data. + req = httptest.NewRequest(http.MethodGet, "/", nil). + WithContext(ctx) + rec = httptest.NewRecorder() + r.ServeHTTP(rec, req) + + if tc.expectedErr != "" { + got := &codersdk.Error{} + err := json.NewDecoder(rec.Body).Decode(got) + require.NoError(t, err, "unmarshal response failed") + require.ErrorContains(t, got, tc.expectedErr, "want error") + } else { + var got codersdk.WorkspaceAgentListContainersResponse + err := json.NewDecoder(rec.Body).Decode(&got) + require.NoError(t, err, "unmarshal response failed") + require.Equal(t, tc.expected, got, "want updated data") } }) } @@ -380,7 +364,7 @@ func TestAPI(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) // Setup router with the handler under test. r := chi.NewRouter() @@ -393,8 +377,11 @@ func TestAPI(t *testing.T) { defer api.Close() r.Mount("/", api.Routes()) + ctx := testutil.Context(t, testutil.WaitShort) + // Simulate HTTP request to the recreate endpoint. - req := httptest.NewRequest(http.MethodPost, "/devcontainers/container/"+tt.containerID+"/recreate", nil) + req := httptest.NewRequest(http.MethodPost, "/devcontainers/container/"+tt.containerID+"/recreate", nil). + WithContext(ctx) rec := httptest.NewRecorder() r.ServeHTTP(rec, req) @@ -688,7 +675,7 @@ func TestAPI(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) // Setup router with the handler under test. r := chi.NewRouter() @@ -712,9 +699,13 @@ func TestAPI(t *testing.T) { api := agentcontainers.NewAPI(logger, apiOptions...) defer api.Close() + r.Mount("/", api.Routes()) - req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil) + ctx := testutil.Context(t, testutil.WaitShort) + + req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil). + WithContext(ctx) rec := httptest.NewRecorder() r.ServeHTTP(rec, req) @@ -739,10 +730,97 @@ func TestAPI(t *testing.T) { } }) + t.Run("List devcontainers running then not running", func(t *testing.T) { + t.Parallel() + + container := codersdk.WorkspaceAgentContainer{ + ID: "container-id", + FriendlyName: "container-name", + Running: true, + CreatedAt: time.Now(), + Labels: map[string]string{ + agentcontainers.DevcontainerLocalFolderLabel: "/home/coder/project", + agentcontainers.DevcontainerConfigFileLabel: "/home/coder/project/.devcontainer/devcontainer.json", + }, + } + dc := codersdk.WorkspaceAgentDevcontainer{ + ID: uuid.New(), + Name: "test-devcontainer", + WorkspaceFolder: "/home/coder/project", + ConfigPath: "/home/coder/project/.devcontainer/devcontainer.json", + } + + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + lister := &fakeLister{ + containers: codersdk.WorkspaceAgentListContainersResponse{ + Containers: []codersdk.WorkspaceAgentContainer{container}, + }, + } + watcher := newFakeWatcher(t) + clk := quartz.NewMock(t) + api := agentcontainers.NewAPI(logger, + agentcontainers.WithClock(clk), + agentcontainers.WithLister(lister), + agentcontainers.WithWatcher(watcher), + agentcontainers.WithDevcontainers( + []codersdk.WorkspaceAgentDevcontainer{dc}, + []codersdk.WorkspaceAgentScript{{LogSourceID: uuid.New(), ID: dc.ID}}, + ), + ) + defer api.Close() + + ctx := testutil.Context(t, testutil.WaitShort) + + // Simulate a file modification event to make the devcontainer dirty. + watcher.sendEventWaitNextCalled(ctx, fsnotify.Event{ + Name: "/home/coder/project/.devcontainer/devcontainer.json", + Op: fsnotify.Write, + }) + + // Initially the devcontainer should be running and dirty. + req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil). + WithContext(ctx) + rec := httptest.NewRecorder() + api.Routes().ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp1 codersdk.WorkspaceAgentDevcontainersResponse + err := json.NewDecoder(rec.Body).Decode(&resp1) + require.NoError(t, err) + require.Len(t, resp1.Devcontainers, 1) + require.True(t, resp1.Devcontainers[0].Running, "devcontainer should be running initially") + require.True(t, resp1.Devcontainers[0].Dirty, "devcontainer should be dirty initially") + require.NotNil(t, resp1.Devcontainers[0].Container, "devcontainer should have a container initially") + + // Next, simulate a situation where the container is no longer + // running. + lister.containers.Containers = []codersdk.WorkspaceAgentContainer{} + + // Trigger a refresh which will use the second response from mock + // lister (no containers). + _, aw := clk.AdvanceNext() + aw.MustWait(ctx) + + // Afterwards the devcontainer should not be running and not dirty. + req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil). + WithContext(ctx) + rec = httptest.NewRecorder() + api.Routes().ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + var resp2 codersdk.WorkspaceAgentDevcontainersResponse + err = json.NewDecoder(rec.Body).Decode(&resp2) + require.NoError(t, err) + require.Len(t, resp2.Devcontainers, 1) + require.False(t, resp2.Devcontainers[0].Running, "devcontainer should not be running after empty list") + require.False(t, resp2.Devcontainers[0].Dirty, "devcontainer should not be dirty after empty list") + require.Nil(t, resp2.Devcontainers[0].Container, "devcontainer should not have a container after empty list") + }) + t.Run("FileWatcher", func(t *testing.T) { t.Parallel() - ctx := testutil.Context(t, testutil.WaitMedium) + ctx := testutil.Context(t, testutil.WaitShort) startTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) mClock := quartz.NewMock(t) @@ -777,14 +855,13 @@ func TestAPI(t *testing.T) { ) defer api.Close() - api.SignalReady() - r := chi.NewRouter() r.Mount("/", api.Routes()) // Call the list endpoint first to ensure config files are // detected and watched. - req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil) + req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil). + WithContext(ctx) rec := httptest.NewRecorder() r.ServeHTTP(rec, req) require.Equal(t, http.StatusOK, rec.Code) @@ -813,10 +890,13 @@ func TestAPI(t *testing.T) { Op: fsnotify.Write, }) - mClock.Advance(time.Minute).MustWait(ctx) + // Advance the clock to run updateLoop. + _, aw := mClock.AdvanceNext() + aw.MustWait(ctx) // Check if the container is marked as dirty. - req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil) + req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil). + WithContext(ctx) rec = httptest.NewRecorder() r.ServeHTTP(rec, req) require.Equal(t, http.StatusOK, rec.Code) @@ -830,15 +910,18 @@ func TestAPI(t *testing.T) { assert.True(t, response.Devcontainers[0].Container.DevcontainerDirty, "container should be marked as dirty after config file was modified") - mClock.Advance(time.Minute).MustWait(ctx) - container.ID = "new-container-id" // Simulate a new container ID after recreation. container.FriendlyName = "new-container-name" container.CreatedAt = mClock.Now() // Update the creation time. fLister.containers.Containers = []codersdk.WorkspaceAgentContainer{container} + // Advance the clock to run updateLoop. + _, aw = mClock.AdvanceNext() + aw.MustWait(ctx) + // Check if dirty flag is cleared. - req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil) + req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil). + WithContext(ctx) rec = httptest.NewRecorder() r.ServeHTTP(rec, req) require.Equal(t, http.StatusOK, rec.Code) diff --git a/cli/ssh_test.go b/cli/ssh_test.go index 49f83daa0612a..3feef218c5b24 100644 --- a/cli/ssh_test.go +++ b/cli/ssh_test.go @@ -2056,12 +2056,6 @@ func TestSSH_Container(t *testing.T) { client, workspace, agentToken := setupWorkspaceForAgent(t) ctrl := gomock.NewController(t) mLister := acmock.NewMockLister(ctrl) - _ = agenttest.New(t, client.URL, agentToken, func(o *agent.Options) { - o.ExperimentalDevcontainersEnabled = true - o.ContainerAPIOptions = append(o.ContainerAPIOptions, agentcontainers.WithLister(mLister)) - }) - _ = coderdtest.NewWorkspaceAgentWaiter(t, client, workspace.ID).Wait() - mLister.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{ Containers: []codersdk.WorkspaceAgentContainer{ { @@ -2071,6 +2065,11 @@ func TestSSH_Container(t *testing.T) { }, Warnings: nil, }, nil) + _ = agenttest.New(t, client.URL, agentToken, func(o *agent.Options) { + o.ExperimentalDevcontainersEnabled = true + o.ContainerAPIOptions = append(o.ContainerAPIOptions, agentcontainers.WithLister(mLister)) + }) + _ = coderdtest.NewWorkspaceAgentWaiter(t, client, workspace.ID).Wait() cID := uuid.NewString() inv, root := clitest.New(t, "ssh", workspace.Name, "-c", cID) diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index bd335e20b0fbb..3c829320768e6 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -1295,14 +1295,14 @@ func TestWorkspaceAgentContainers(t *testing.T) { { name: "test response", setupMock: func(mcl *acmock.MockLister) (codersdk.WorkspaceAgentListContainersResponse, error) { - mcl.EXPECT().List(gomock.Any()).Return(testResponse, nil).Times(1) + mcl.EXPECT().List(gomock.Any()).Return(testResponse, nil).AnyTimes() return testResponse, nil }, }, { name: "error response", setupMock: func(mcl *acmock.MockLister) (codersdk.WorkspaceAgentListContainersResponse, error) { - mcl.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{}, assert.AnError).Times(1) + mcl.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{}, assert.AnError).AnyTimes() return codersdk.WorkspaceAgentListContainersResponse{}, assert.AnError }, }, @@ -1314,7 +1314,10 @@ func TestWorkspaceAgentContainers(t *testing.T) { ctrl := gomock.NewController(t) mcl := acmock.NewMockLister(ctrl) expected, expectedErr := tc.setupMock(mcl) - client, db := coderdtest.NewWithDatabase(t, &coderdtest.Options{}) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + client, db := coderdtest.NewWithDatabase(t, &coderdtest.Options{ + Logger: &logger, + }) user := coderdtest.CreateFirstUser(t, client) r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ OrganizationID: user.OrganizationID, @@ -1323,6 +1326,7 @@ func TestWorkspaceAgentContainers(t *testing.T) { return agents }).Do() _ = agenttest.New(t, client.URL, r.AgentToken, func(o *agent.Options) { + o.Logger = logger.Named("agent") o.ExperimentalDevcontainersEnabled = true o.ContainerAPIOptions = append(o.ContainerAPIOptions, agentcontainers.WithLister(mcl)) }) @@ -1392,7 +1396,7 @@ func TestWorkspaceAgentRecreateDevcontainer(t *testing.T) { setupMock: func(mcl *acmock.MockLister, mdccli *acmock.MockDevcontainerCLI) int { mcl.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{ Containers: []codersdk.WorkspaceAgentContainer{devContainer}, - }, nil).Times(1) + }, nil).AnyTimes() mdccli.EXPECT().Up(gomock.Any(), workspaceFolder, configFile, gomock.Any()).Return("someid", nil).Times(1) return 0 }, @@ -1400,7 +1404,7 @@ func TestWorkspaceAgentRecreateDevcontainer(t *testing.T) { { name: "Container does not exist", setupMock: func(mcl *acmock.MockLister, mdccli *acmock.MockDevcontainerCLI) int { - mcl.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{}, nil).Times(1) + mcl.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{}, nil).AnyTimes() return http.StatusNotFound }, }, @@ -1409,7 +1413,7 @@ func TestWorkspaceAgentRecreateDevcontainer(t *testing.T) { setupMock: func(mcl *acmock.MockLister, mdccli *acmock.MockDevcontainerCLI) int { mcl.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{ Containers: []codersdk.WorkspaceAgentContainer{plainContainer}, - }, nil).Times(1) + }, nil).AnyTimes() return http.StatusNotFound }, }, @@ -1421,7 +1425,10 @@ func TestWorkspaceAgentRecreateDevcontainer(t *testing.T) { mcl := acmock.NewMockLister(ctrl) mdccli := acmock.NewMockDevcontainerCLI(ctrl) wantStatus := tc.setupMock(mcl, mdccli) - client, db := coderdtest.NewWithDatabase(t, &coderdtest.Options{}) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + client, db := coderdtest.NewWithDatabase(t, &coderdtest.Options{ + Logger: &logger, + }) user := coderdtest.CreateFirstUser(t, client) r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{ OrganizationID: user.OrganizationID, @@ -1430,6 +1437,7 @@ func TestWorkspaceAgentRecreateDevcontainer(t *testing.T) { return agents }).Do() _ = agenttest.New(t, client.URL, r.AgentToken, func(o *agent.Options) { + o.Logger = logger.Named("agent") o.ExperimentalDevcontainersEnabled = true o.ContainerAPIOptions = append( o.ContainerAPIOptions, From 58b65a156d147222be63c34af2541efff025c08b Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 21 May 2025 16:06:04 +0000 Subject: [PATCH 02/11] try to fix potential race in test --- agent/agentcontainers/api_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/agent/agentcontainers/api_test.go b/agent/agentcontainers/api_test.go index ee81b4069ecc2..c34bb6aaeb992 100644 --- a/agent/agentcontainers/api_test.go +++ b/agent/agentcontainers/api_test.go @@ -737,7 +737,7 @@ func TestAPI(t *testing.T) { ID: "container-id", FriendlyName: "container-name", Running: true, - CreatedAt: time.Now(), + CreatedAt: time.Now().Add(-1 * time.Minute), Labels: map[string]string{ agentcontainers.DevcontainerLocalFolderLabel: "/home/coder/project", agentcontainers.DevcontainerConfigFileLabel: "/home/coder/project/.devcontainer/devcontainer.json", @@ -771,6 +771,8 @@ func TestAPI(t *testing.T) { ctx := testutil.Context(t, testutil.WaitShort) + clk.Set(time.Now()).MustWait(ctx) + // Simulate a file modification event to make the devcontainer dirty. watcher.sendEventWaitNextCalled(ctx, fsnotify.Event{ Name: "/home/coder/project/.devcontainer/devcontainer.json", From 3cd9cc1a89fd5b9ee6b4175bcf189a86aea92109 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 21 May 2025 16:11:48 +0000 Subject: [PATCH 03/11] fix bug in marking container as dirty --- agent/agentcontainers/api.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 74a47d37954d9..228c4e5aa37c0 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -677,9 +677,15 @@ func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) { slog.F("modified_at", modifiedAt), ) api.knownDevcontainers[i].Dirty = true - if api.knownDevcontainers[i].Container != nil { - api.knownDevcontainers[i].Container.DevcontainerDirty = true - } + } + if api.knownDevcontainers[i].Container != nil && !api.knownDevcontainers[i].Container.DevcontainerDirty { + api.logger.Info(api.ctx, "marking devcontainer container as dirty", + slog.F("file", configPath), + slog.F("name", api.knownDevcontainers[i].Name), + slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder), + slog.F("modified_at", modifiedAt), + ) + api.knownDevcontainers[i].Container.DevcontainerDirty = true } } } From 1aa6a48f5663faf6e01860d220fc41757af0f2b5 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 21 May 2025 16:15:43 +0000 Subject: [PATCH 04/11] make sure watcher is synced --- agent/agentcontainers/api_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/agent/agentcontainers/api_test.go b/agent/agentcontainers/api_test.go index c34bb6aaeb992..dae7abcb08937 100644 --- a/agent/agentcontainers/api_test.go +++ b/agent/agentcontainers/api_test.go @@ -773,6 +773,9 @@ func TestAPI(t *testing.T) { clk.Set(time.Now()).MustWait(ctx) + // Make sure the start loop has been called. + watcher.waitNext(ctx) + // Simulate a file modification event to make the devcontainer dirty. watcher.sendEventWaitNextCalled(ctx, fsnotify.Event{ Name: "/home/coder/project/.devcontainer/devcontainer.json", From 0122e4396fdad13a885717ad130ba8776c1eec49 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Wed, 21 May 2025 17:00:48 +0000 Subject: [PATCH 05/11] improve readability of markDevcontainerDirty --- agent/agentcontainers/api.go | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 228c4e5aa37c0..425abec0a9556 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -662,30 +662,28 @@ func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) { api.configFileModifiedTimes[configPath] = modifiedAt for i := range api.knownDevcontainers { - if api.knownDevcontainers[i].ConfigPath != configPath { + dc := &api.knownDevcontainers[i] + if dc.ConfigPath != configPath { continue } + logger := api.logger.With( + slog.F("file", configPath), + slog.F("name", dc.Name), + slog.F("workspace_folder", dc.WorkspaceFolder), + slog.F("modified_at", modifiedAt), + ) + // TODO(mafredri): Simplistic mark for now, we should check if the // container is running and if the config file was modified after // the container was created. - if !api.knownDevcontainers[i].Dirty { - api.logger.Info(api.ctx, "marking devcontainer as dirty", - slog.F("file", configPath), - slog.F("name", api.knownDevcontainers[i].Name), - slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder), - slog.F("modified_at", modifiedAt), - ) - api.knownDevcontainers[i].Dirty = true + if !dc.Dirty { + logger.Info(api.ctx, "marking devcontainer as dirty") + dc.Dirty = true } - if api.knownDevcontainers[i].Container != nil && !api.knownDevcontainers[i].Container.DevcontainerDirty { - api.logger.Info(api.ctx, "marking devcontainer container as dirty", - slog.F("file", configPath), - slog.F("name", api.knownDevcontainers[i].Name), - slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder), - slog.F("modified_at", modifiedAt), - ) - api.knownDevcontainers[i].Container.DevcontainerDirty = true + if dc.Container != nil && !dc.Container.DevcontainerDirty { + logger.Info(api.ctx, "marking devcontainer container as dirty") + dc.Container.DevcontainerDirty = true } } } From e225434339123b9f91fc29ecf35728e9bd540457 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 May 2025 08:28:50 +0000 Subject: [PATCH 06/11] refactor ticker and refresh --- agent/agentcontainers/api.go | 84 +++++++++++++++++------------------- 1 file changed, 40 insertions(+), 44 deletions(-) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 425abec0a9556..ca6f0bf3e7cd7 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -38,7 +38,7 @@ type API struct { watcherDone chan struct{} updaterDone chan struct{} initialUpdateDone chan struct{} // Closed after first update in updaterLoop. - refreshTrigger chan chan error // Channel to trigger manual refresh. + updateTrigger chan chan error // Channel to trigger manual refresh. updateInterval time.Duration // Interval for periodic container updates. logger slog.Logger watcher watcher.Watcher @@ -164,7 +164,7 @@ func NewAPI(logger slog.Logger, options ...Option) *API { watcherDone: make(chan struct{}), updaterDone: make(chan struct{}), initialUpdateDone: make(chan struct{}), - refreshTrigger: make(chan chan error), + updateTrigger: make(chan chan error), updateInterval: defaultUpdateInterval, logger: logger, clock: quartz.NewReal(), @@ -247,27 +247,12 @@ func (api *API) updaterLoop() { defer api.logger.Debug(api.ctx, "updater loop stopped") api.logger.Debug(api.ctx, "updater loop started") - // Ensure that only once instance of the updateContainers is running - // at a time. This is a workaround since quartz.Ticker does not - // allow us to know if the routine has completed. - sema := make(chan struct{}, 1) - sema <- struct{}{} - - // Ensure only one updateContainers is running at a time, others are - // queued. - doUpdate := func() error { - select { - case <-api.ctx.Done(): - return api.ctx.Err() - case <-sema: - } - defer func() { sema <- struct{}{} }() - - return api.updateContainers(api.ctx) - } - + // Perform an initial update to populate the container list, this + // gives us a guarantee that the API has loaded the initial state + // before returning any responses. This is useful for both tests + // and anyone looking to interact with the API. api.logger.Debug(api.ctx, "performing initial containers update") - if err := doUpdate(); err != nil { + if err := api.updateContainers(api.ctx); err != nil { api.logger.Error(api.ctx, "initial containers update failed", slog.Error(err)) } else { api.logger.Debug(api.ctx, "initial containers update complete") @@ -276,17 +261,29 @@ func (api *API) updaterLoop() { // Other services can wait on this if they need the first data to be available. close(api.initialUpdateDone) - // Use a ticker func to ensure that doUpdate has run to completion - // when advancing time. - waiter := api.clock.TickerFunc(api.ctx, api.updateInterval, func() error { - err := doUpdate() - if err != nil { - api.logger.Error(api.ctx, "periodic containers update failed", slog.Error(err)) + // We utilize a TickerFunc here instead of a regular Ticker so that + // we can guarantee execution of the updateContainers method after + // advancing the clock. + ticker := api.clock.TickerFunc(api.ctx, api.updateInterval, func() error { + done := make(chan error, 1) + defer close(done) + + select { + case <-api.ctx.Done(): + return api.ctx.Err() + case api.updateTrigger <- done: + err := <-done + if err != nil { + api.logger.Error(api.ctx, "updater loop ticker failed", slog.Error(err)) + } + default: + api.logger.Debug(api.ctx, "updater loop ticker skipped, update in progress") } - return nil // Always nil, keep going. - }) + + return nil // Always nil to keep the ticker going. + }, "updaterLoop") defer func() { - if err := waiter.Wait(); err != nil { + if err := ticker.Wait("updaterLoop"); err != nil && !errors.Is(err, context.Canceled) { api.logger.Error(api.ctx, "updater loop ticker failed", slog.Error(err)) } }() @@ -294,16 +291,9 @@ func (api *API) updaterLoop() { for { select { case <-api.ctx.Done(): - api.logger.Debug(api.ctx, "updater loop context canceled") return - case ch := <-api.refreshTrigger: - api.logger.Debug(api.ctx, "manual containers update triggered") - err := doUpdate() - if err != nil { - api.logger.Error(api.ctx, "manual containers update failed", slog.Error(err)) - } - ch <- err - close(ch) + case done := <-api.updateTrigger: + done <- api.updateContainers(api.ctx) } } } @@ -506,17 +496,23 @@ func (api *API) processUpdatedContainersLocked(ctx context.Context, updated code // refreshContainers triggers an immediate update of the container list // and waits for it to complete. -func (api *API) refreshContainers(ctx context.Context) error { +func (api *API) refreshContainers(ctx context.Context) (err error) { + defer func() { + if err != nil { + err = xerrors.Errorf("refresh containers failed: %w", err) + } + }() + done := make(chan error, 1) select { case <-api.ctx.Done(): - return xerrors.Errorf("API closed, cannot send refresh trigger: %w", api.ctx.Err()) + return xerrors.Errorf("API closed: %w", api.ctx.Err()) case <-ctx.Done(): return ctx.Err() - case api.refreshTrigger <- done: + case api.updateTrigger <- done: select { case <-api.ctx.Done(): - return xerrors.Errorf("API closed, cannot wait for refresh: %w", api.ctx.Err()) + return xerrors.Errorf("API closed: %w", api.ctx.Err()) case <-ctx.Done(): return ctx.Err() case err := <-done: From 04431a57838e43c469d8fd027f222bd35da577b1 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 May 2025 09:08:39 +0000 Subject: [PATCH 07/11] update tests, add test for error during initial data --- agent/agentcontainers/api_test.go | 120 ++++++++++++++++++++---------- 1 file changed, 82 insertions(+), 38 deletions(-) diff --git a/agent/agentcontainers/api_test.go b/agent/agentcontainers/api_test.go index dae7abcb08937..edbf2e235f342 100644 --- a/agent/agentcontainers/api_test.go +++ b/agent/agentcontainers/api_test.go @@ -161,11 +161,16 @@ func TestAPI(t *testing.T) { return codersdk.WorkspaceAgentListContainersResponse{Containers: cts} } + type initialDataPayload struct { + val codersdk.WorkspaceAgentListContainersResponse + err error + } + // Each test case is called multiple times to ensure idempotency for _, tc := range []struct { name string // initialData to be stored in the handler - initialData codersdk.WorkspaceAgentListContainersResponse + initialData initialDataPayload // function to set up expectations for the mock setupMock func(mcl *acmock.MockLister, preReq *gomock.Call) // expected result @@ -175,7 +180,7 @@ func TestAPI(t *testing.T) { }{ { name: "no initial data", - initialData: makeResponse(), + initialData: initialDataPayload{makeResponse(), nil}, setupMock: func(mcl *acmock.MockLister, preReq *gomock.Call) { mcl.EXPECT().List(gomock.Any()).Return(makeResponse(fakeCt), nil).After(preReq).AnyTimes() }, @@ -183,11 +188,25 @@ func TestAPI(t *testing.T) { }, { name: "repeat initial data", - initialData: makeResponse(fakeCt), + initialData: initialDataPayload{makeResponse(fakeCt), nil}, expected: makeResponse(fakeCt), }, { - name: "lister error", + name: "lister error always", + initialData: initialDataPayload{makeResponse(), assert.AnError}, + expectedErr: assert.AnError.Error(), + }, + { + name: "lister error only during initial data", + initialData: initialDataPayload{makeResponse(), assert.AnError}, + setupMock: func(mcl *acmock.MockLister, preReq *gomock.Call) { + mcl.EXPECT().List(gomock.Any()).Return(makeResponse(fakeCt), nil).After(preReq).AnyTimes() + }, + expectedErr: assert.AnError.Error(), + }, + { + name: "lister error after initial data", + initialData: initialDataPayload{makeResponse(fakeCt), nil}, setupMock: func(mcl *acmock.MockLister, preReq *gomock.Call) { mcl.EXPECT().List(gomock.Any()).Return(makeResponse(), assert.AnError).After(preReq).AnyTimes() }, @@ -195,52 +214,65 @@ func TestAPI(t *testing.T) { }, { name: "updated data", - initialData: makeResponse(fakeCt), + initialData: initialDataPayload{makeResponse(fakeCt), nil}, setupMock: func(mcl *acmock.MockLister, preReq *gomock.Call) { mcl.EXPECT().List(gomock.Any()).Return(makeResponse(fakeCt2), nil).After(preReq).AnyTimes() }, expected: makeResponse(fakeCt2), }, } { - tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() var ( ctx = testutil.Context(t, testutil.WaitShort) - clk = quartz.NewMock(t) - ctrl = gomock.NewController(t) - mockLister = acmock.NewMockLister(ctrl) + mClock = quartz.NewMock(t) + tickerTrap = mClock.Trap().TickerFunc("updaterLoop") + mCtrl = gomock.NewController(t) + mLister = acmock.NewMockLister(mCtrl) logger = slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) r = chi.NewRouter() ) - initialData := mockLister.EXPECT().List(gomock.Any()).Return(tc.initialData, nil) + initialDataCall := mLister.EXPECT().List(gomock.Any()).Return(tc.initialData.val, tc.initialData.err) if tc.setupMock != nil { - tc.setupMock(mockLister, initialData.Times(1)) + tc.setupMock(mLister, initialDataCall.Times(1)) } else { - initialData.AnyTimes() + initialDataCall.AnyTimes() } api := agentcontainers.NewAPI(logger, - agentcontainers.WithClock(clk), - agentcontainers.WithLister(mockLister), + agentcontainers.WithClock(mClock), + agentcontainers.WithLister(mLister), ) defer api.Close() r.Mount("/", api.Routes()) + // Make sure the ticker function has been registered + // before advancing the clock. + tickerTrap.MustWait(ctx).Release() + tickerTrap.Close() + // Initial request returns the initial data. req := httptest.NewRequest(http.MethodGet, "/", nil). WithContext(ctx) rec := httptest.NewRecorder() r.ServeHTTP(rec, req) - var got codersdk.WorkspaceAgentListContainersResponse - err := json.NewDecoder(rec.Body).Decode(&got) - require.NoError(t, err, "unmarshal response failed") - require.Equal(t, tc.initialData, got, "want initial data") + if tc.initialData.err != nil { + got := &codersdk.Error{} + err := json.NewDecoder(rec.Body).Decode(got) + require.NoError(t, err, "unmarshal response failed") + require.ErrorContains(t, got, tc.initialData.err.Error(), "want error") + return + } else { + var got codersdk.WorkspaceAgentListContainersResponse + err := json.NewDecoder(rec.Body).Decode(&got) + require.NoError(t, err, "unmarshal response failed") + require.Equal(t, tc.initialData.val, got, "want initial data") + } - // Advance the clock to run updateLoop. - _, aw := clk.AdvanceNext() + // Advance the clock to run updaterLoop. + _, aw := mClock.AdvanceNext() aw.MustWait(ctx) // Second request returns the updated data. @@ -750,18 +782,23 @@ func TestAPI(t *testing.T) { ConfigPath: "/home/coder/project/.devcontainer/devcontainer.json", } + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - lister := &fakeLister{ + fLister := &fakeLister{ containers: codersdk.WorkspaceAgentListContainersResponse{ Containers: []codersdk.WorkspaceAgentContainer{container}, }, } - watcher := newFakeWatcher(t) - clk := quartz.NewMock(t) + fWatcher := newFakeWatcher(t) + mClock := quartz.NewMock(t) + mClock.Set(time.Now()).MustWait(ctx) + tickerTrap := mClock.Trap().TickerFunc("updaterLoop") + api := agentcontainers.NewAPI(logger, - agentcontainers.WithClock(clk), - agentcontainers.WithLister(lister), - agentcontainers.WithWatcher(watcher), + agentcontainers.WithClock(mClock), + agentcontainers.WithLister(fLister), + agentcontainers.WithWatcher(fWatcher), agentcontainers.WithDevcontainers( []codersdk.WorkspaceAgentDevcontainer{dc}, []codersdk.WorkspaceAgentScript{{LogSourceID: uuid.New(), ID: dc.ID}}, @@ -769,15 +806,16 @@ func TestAPI(t *testing.T) { ) defer api.Close() - ctx := testutil.Context(t, testutil.WaitShort) - - clk.Set(time.Now()).MustWait(ctx) + // Make sure the ticker function has been registered + // before advancing any use of mClock.Advance. + tickerTrap.MustWait(ctx).Release() + tickerTrap.Close() // Make sure the start loop has been called. - watcher.waitNext(ctx) + fWatcher.waitNext(ctx) // Simulate a file modification event to make the devcontainer dirty. - watcher.sendEventWaitNextCalled(ctx, fsnotify.Event{ + fWatcher.sendEventWaitNextCalled(ctx, fsnotify.Event{ Name: "/home/coder/project/.devcontainer/devcontainer.json", Op: fsnotify.Write, }) @@ -799,11 +837,11 @@ func TestAPI(t *testing.T) { // Next, simulate a situation where the container is no longer // running. - lister.containers.Containers = []codersdk.WorkspaceAgentContainer{} + fLister.containers.Containers = []codersdk.WorkspaceAgentContainer{} // Trigger a refresh which will use the second response from mock // lister (no containers). - _, aw := clk.AdvanceNext() + _, aw := mClock.AdvanceNext() aw.MustWait(ctx) // Afterwards the devcontainer should not be running and not dirty. @@ -828,9 +866,6 @@ func TestAPI(t *testing.T) { ctx := testutil.Context(t, testutil.WaitShort) startTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) - mClock := quartz.NewMock(t) - mClock.Set(startTime) - fWatcher := newFakeWatcher(t) // Create a fake container with a config file. configPath := "/workspace/project/.devcontainer/devcontainer.json" @@ -845,6 +880,10 @@ func TestAPI(t *testing.T) { }, } + mClock := quartz.NewMock(t) + mClock.Set(startTime) + tickerTrap := mClock.Trap().TickerFunc("updaterLoop") + fWatcher := newFakeWatcher(t) fLister := &fakeLister{ containers: codersdk.WorkspaceAgentListContainersResponse{ Containers: []codersdk.WorkspaceAgentContainer{container}, @@ -863,6 +902,11 @@ func TestAPI(t *testing.T) { r := chi.NewRouter() r.Mount("/", api.Routes()) + // Make sure the ticker function has been registered + // before advancing any use of mClock.Advance. + tickerTrap.MustWait(ctx).Release() + tickerTrap.Close() + // Call the list endpoint first to ensure config files are // detected and watched. req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil). @@ -895,7 +939,7 @@ func TestAPI(t *testing.T) { Op: fsnotify.Write, }) - // Advance the clock to run updateLoop. + // Advance the clock to run updaterLoop. _, aw := mClock.AdvanceNext() aw.MustWait(ctx) @@ -920,7 +964,7 @@ func TestAPI(t *testing.T) { container.CreatedAt = mClock.Now() // Update the creation time. fLister.containers.Containers = []codersdk.WorkspaceAgentContainer{container} - // Advance the clock to run updateLoop. + // Advance the clock to run updaterLoop. _, aw = mClock.AdvanceNext() aw.MustWait(ctx) From 0455dd90ab51bd65ae2b9753cd7ca2e951725733 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 May 2025 09:12:24 +0000 Subject: [PATCH 08/11] document updateContainers internal timeout --- agent/agentcontainers/api.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index ca6f0bf3e7cd7..7fd42175db7d4 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -293,6 +293,8 @@ func (api *API) updaterLoop() { case <-api.ctx.Done(): return case done := <-api.updateTrigger: + // Note that although we pass api.ctx here, updateContainers + // has an internal timeout to prevent long blocking calls. done <- api.updateContainers(api.ctx) } } From 75d691cb3f506c946e4c81f2bfa0b3e65521d854 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 May 2025 09:24:39 +0000 Subject: [PATCH 09/11] anytimes --- cli/open_test.go | 4 ++-- cli/ssh_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cli/open_test.go b/cli/open_test.go index 9ba16a32674e2..97d24f0634d9d 100644 --- a/cli/open_test.go +++ b/cli/open_test.go @@ -326,7 +326,7 @@ func TestOpenVSCodeDevContainer(t *testing.T) { }, }, }, nil, - ) + ).AnyTimes() client, workspace, agentToken := setupWorkspaceForAgent(t, func(agents []*proto.Agent) []*proto.Agent { agents[0].Directory = agentDir @@ -501,7 +501,7 @@ func TestOpenVSCodeDevContainer_NoAgentDirectory(t *testing.T) { }, }, }, nil, - ) + ).AnyTimes() client, workspace, agentToken := setupWorkspaceForAgent(t, func(agents []*proto.Agent) []*proto.Agent { agents[0].Name = agentName diff --git a/cli/ssh_test.go b/cli/ssh_test.go index 3feef218c5b24..147fc07372032 100644 --- a/cli/ssh_test.go +++ b/cli/ssh_test.go @@ -2064,7 +2064,7 @@ func TestSSH_Container(t *testing.T) { }, }, Warnings: nil, - }, nil) + }, nil).AnyTimes() _ = agenttest.New(t, client.URL, agentToken, func(o *agent.Options) { o.ExperimentalDevcontainersEnabled = true o.ContainerAPIOptions = append(o.ContainerAPIOptions, agentcontainers.WithLister(mLister)) From 101e011243854a29ae11de882a38d20bbdb9b5e5 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 May 2025 10:07:32 +0000 Subject: [PATCH 10/11] appease the linter --- agent/agentcontainers/api_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/agent/agentcontainers/api_test.go b/agent/agentcontainers/api_test.go index edbf2e235f342..40ddec54cf6ad 100644 --- a/agent/agentcontainers/api_test.go +++ b/agent/agentcontainers/api_test.go @@ -286,12 +286,13 @@ func TestAPI(t *testing.T) { err := json.NewDecoder(rec.Body).Decode(got) require.NoError(t, err, "unmarshal response failed") require.ErrorContains(t, got, tc.expectedErr, "want error") - } else { - var got codersdk.WorkspaceAgentListContainersResponse - err := json.NewDecoder(rec.Body).Decode(&got) - require.NoError(t, err, "unmarshal response failed") - require.Equal(t, tc.expected, got, "want updated data") + return } + + var got codersdk.WorkspaceAgentListContainersResponse + err := json.NewDecoder(rec.Body).Decode(&got) + require.NoError(t, err, "unmarshal response failed") + require.Equal(t, tc.expected, got, "want updated data") }) } }) From 361ac82f1ba718fe9c3b481052d74b37cf502799 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 May 2025 10:30:41 +0000 Subject: [PATCH 11/11] fix test and appease linter again --- agent/agentcontainers/api_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/agent/agentcontainers/api_test.go b/agent/agentcontainers/api_test.go index 40ddec54cf6ad..a687cb8c001f8 100644 --- a/agent/agentcontainers/api_test.go +++ b/agent/agentcontainers/api_test.go @@ -202,7 +202,7 @@ func TestAPI(t *testing.T) { setupMock: func(mcl *acmock.MockLister, preReq *gomock.Call) { mcl.EXPECT().List(gomock.Any()).Return(makeResponse(fakeCt), nil).After(preReq).AnyTimes() }, - expectedErr: assert.AnError.Error(), + expected: makeResponse(fakeCt), }, { name: "lister error after initial data", @@ -263,7 +263,6 @@ func TestAPI(t *testing.T) { err := json.NewDecoder(rec.Body).Decode(got) require.NoError(t, err, "unmarshal response failed") require.ErrorContains(t, got, tc.initialData.err.Error(), "want error") - return } else { var got codersdk.WorkspaceAgentListContainersResponse err := json.NewDecoder(rec.Body).Decode(&got)