From 40c8de9b3c5de57019535298a87605d2fb9dfa98 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 11:03:44 +0000 Subject: [PATCH 01/23] feat(agent/agentcontainers): add file watcher and dirty status Fixes coder/internal#479 Fixes coder/internal#480 --- agent/agent.go | 5 +- agent/agentcontainers/api.go | 283 +++++++++++++++--- agent/agentcontainers/api_internal_test.go | 2 + agent/agentcontainers/api_test.go | 201 +++++++++++++ agent/agentcontainers/watcher/noop.go | 38 +++ agent/agentcontainers/watcher/noop_test.go | 70 +++++ agent/agentcontainers/watcher/watcher.go | 89 ++++++ agent/agentcontainers/watcher/watcher_test.go | 66 ++++ agent/api.go | 6 +- codersdk/workspaceagents.go | 1 + go.mod | 1 + 11 files changed, 714 insertions(+), 48 deletions(-) create mode 100644 agent/agentcontainers/watcher/noop.go create mode 100644 agent/agentcontainers/watcher/noop_test.go create mode 100644 agent/agentcontainers/watcher/watcher.go create mode 100644 agent/agentcontainers/watcher/watcher_test.go diff --git a/agent/agent.go b/agent/agent.go index a7434b90d4854..1ca5bd8faf858 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1481,8 +1481,10 @@ func (a *agent) createTailnet( }() if err = a.trackGoroutine(func() { defer apiListener.Close() + apiHandler, closeAPIHAndler := a.apiHandler() server := &http.Server{ - Handler: a.apiHandler(), + BaseContext: func(net.Listener) context.Context { return ctx }, + Handler: apiHandler, ReadTimeout: 20 * time.Second, ReadHeaderTimeout: 20 * time.Second, WriteTimeout: 20 * time.Second, @@ -1493,6 +1495,7 @@ func (a *agent) createTailnet( case <-ctx.Done(): case <-a.hardCtx.Done(): } + _ = closeAPIHAndler() _ = server.Close() }() diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 9a028e565b6ca..fa61bc18b0d15 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -10,11 +10,13 @@ import ( "strings" "time" + "github.com/fsnotify/fsnotify" "github.com/go-chi/chi/v5" "github.com/google/uuid" "golang.org/x/xerrors" "cdr.dev/slog" + "github.com/coder/coder/v2/agent/agentcontainers/watcher" "github.com/coder/coder/v2/agent/agentexec" "github.com/coder/coder/v2/coderd/httpapi" "github.com/coder/coder/v2/codersdk" @@ -30,6 +32,12 @@ const ( // API is responsible for container-related operations in the agent. // It provides methods to list and manage containers. type API struct { + ctx context.Context + cancel context.CancelFunc + done chan struct{} + logger slog.Logger + watcher watcher.Watcher + cacheDuration time.Duration cl Lister dccli DevcontainerCLI @@ -37,11 +45,12 @@ type API struct { // lockCh protects the below fields. We use a channel instead of a // mutex so we can handle cancellation properly. - lockCh chan struct{} - containers codersdk.WorkspaceAgentListContainersResponse - mtime time.Time - devcontainerNames map[string]struct{} // Track devcontainer names to avoid duplicates. - knownDevcontainers []codersdk.WorkspaceAgentDevcontainer // Track predefined and runtime-detected devcontainers. + lockCh chan struct{} + containers codersdk.WorkspaceAgentListContainersResponse + mtime time.Time + devcontainerNames map[string]struct{} // Track devcontainer names to avoid duplicates. + knownDevcontainers []codersdk.WorkspaceAgentDevcontainer // Track predefined and runtime-detected devcontainers. + configModifiedTimes map[string]time.Time // Track when config files were last modified. } // Option is a functional option for API. @@ -55,6 +64,16 @@ func WithLister(cl Lister) Option { } } +// WithClock sets the quartz.Clock implementation to use. +// This is primarily used for testing to control time. +func WithClock(clock quartz.Clock) Option { + return func(api *API) { + api.clock = clock + } +} + +// WithDevcontainerCLI sets the DevcontainerCLI implementation to use. +// This can be used in tests to modify @devcontainer/cli behavior. func WithDevcontainerCLI(dccli DevcontainerCLI) Option { return func(api *API) { api.dccli = dccli @@ -76,14 +95,29 @@ func WithDevcontainers(devcontainers []codersdk.WorkspaceAgentDevcontainer) Opti } } +// WithWatcher sets the file watcher implementation to use. By default a +// noop watcher is used. This can be used in tests to modify the watcher +// behavior or to use an actual file watcher (e.g. fsnotify). +func WithWatcher(w watcher.Watcher) Option { + return func(api *API) { + api.watcher = w + } +} + // NewAPI returns a new API with the given options applied. func NewAPI(logger slog.Logger, options ...Option) *API { + ctx, cancel := context.WithCancel(context.Background()) api := &API{ - clock: quartz.NewReal(), - cacheDuration: defaultGetContainersCacheDuration, - lockCh: make(chan struct{}, 1), - devcontainerNames: make(map[string]struct{}), - knownDevcontainers: []codersdk.WorkspaceAgentDevcontainer{}, + ctx: ctx, + cancel: cancel, + done: make(chan struct{}), + logger: logger, + clock: quartz.NewReal(), + cacheDuration: defaultGetContainersCacheDuration, + lockCh: make(chan struct{}, 1), + devcontainerNames: make(map[string]struct{}), + knownDevcontainers: []codersdk.WorkspaceAgentDevcontainer{}, + configModifiedTimes: make(map[string]time.Time), } for _, opt := range options { opt(api) @@ -92,12 +126,60 @@ func NewAPI(logger slog.Logger, options ...Option) *API { api.cl = &DockerCLILister{} } if api.dccli == nil { - api.dccli = NewDevcontainerCLI(logger, agentexec.DefaultExecer) + api.dccli = NewDevcontainerCLI(logger.Named("devcontainer-cli"), agentexec.DefaultExecer) + } + if api.watcher == nil { + api.watcher = watcher.NewNoop() + } + + // Make sure we watch the devcontainer config files for changes. + for _, devcontainer := range api.knownDevcontainers { + if devcontainer.ConfigPath != "" { + if err := api.watcher.Add(devcontainer.ConfigPath); err != nil { + api.logger.Error(ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", devcontainer.ConfigPath)) + } + } } + go api.start() + return api } +func (api *API) start() { + defer close(api.done) + + for { + event, err := api.watcher.Next(api.ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + api.logger.Debug(api.ctx, "watcher closed") + return + } + api.logger.Error(api.ctx, "watcher error waiting for next event", slog.Error(err)) + continue + } + if event == nil { + continue + } + + now := api.clock.Now() + switch { + case event.Has(fsnotify.Create | fsnotify.Write): + api.logger.Debug(api.ctx, "devcontainer config file changed", slog.F("file", event.Name)) + api.markDevcontainerDirty(event.Name, now) + case event.Has(fsnotify.Remove): + api.logger.Debug(api.ctx, "devcontainer config file removed", slog.F("file", event.Name)) + api.markDevcontainerDirty(event.Name, now) + case event.Has(fsnotify.Rename): + api.logger.Debug(api.ctx, "devcontainer config file renamed", slog.F("file", event.Name)) + api.markDevcontainerDirty(event.Name, now) + default: + api.logger.Debug(api.ctx, "devcontainer config file event ignored", slog.F("file", event.Name), slog.F("event", event)) + } + } +} + // Routes returns the HTTP handler for container-related routes. func (api *API) Routes() http.Handler { r := chi.NewRouter() @@ -143,12 +225,12 @@ func copyListContainersResponse(resp codersdk.WorkspaceAgentListContainersRespon func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListContainersResponse, error) { select { + case <-api.ctx.Done(): + return codersdk.WorkspaceAgentListContainersResponse{}, api.ctx.Err() case <-ctx.Done(): return codersdk.WorkspaceAgentListContainersResponse{}, ctx.Err() case api.lockCh <- struct{}{}: - defer func() { - <-api.lockCh - }() + defer func() { <-api.lockCh }() } now := api.clock.Now() @@ -165,51 +247,96 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC api.containers = updated api.mtime = now - // Reset all known devcontainers to not running. + dirtyStates := make(map[string]bool) for i := range api.knownDevcontainers { api.knownDevcontainers[i].Running = false api.knownDevcontainers[i].Container = nil + // Preserve the dirty state and store in map for lookup. + dirtyStates[api.knownDevcontainers[i].WorkspaceFolder] = api.knownDevcontainers[i].Dirty } // Check if the container is running and update the known devcontainers. for _, container := range updated.Containers { workspaceFolder := container.Labels[DevcontainerLocalFolderLabel] - if workspaceFolder != "" { - // Check if this is already in our known list. - if knownIndex := slices.IndexFunc(api.knownDevcontainers, func(dc codersdk.WorkspaceAgentDevcontainer) bool { - return dc.WorkspaceFolder == workspaceFolder - }); knownIndex != -1 { - // Update existing entry with runtime information. - if api.knownDevcontainers[knownIndex].ConfigPath == "" { - api.knownDevcontainers[knownIndex].ConfigPath = container.Labels[DevcontainerConfigFileLabel] + configFile := container.Labels[DevcontainerConfigFileLabel] + + if workspaceFolder == "" { + continue + } + + // Check if this is already in our known list. + if knownIndex := slices.IndexFunc(api.knownDevcontainers, func(dc codersdk.WorkspaceAgentDevcontainer) bool { + return dc.WorkspaceFolder == workspaceFolder + }); knownIndex != -1 { + // Update existing entry with runtime information. + if configFile != "" && api.knownDevcontainers[knownIndex].ConfigPath == "" { + api.knownDevcontainers[knownIndex].ConfigPath = configFile + if err := api.watcher.Add(configFile); err != nil { + api.logger.Error(ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", configFile)) + } + } + api.knownDevcontainers[knownIndex].Running = container.Running + api.knownDevcontainers[knownIndex].Container = &container + + // Check if this container was created after the config + // file was modified. + if configFile != "" && api.knownDevcontainers[knownIndex].Dirty { + lastModified, hasModTime := api.configModifiedTimes[configFile] + if hasModTime && container.CreatedAt.After(lastModified) { + api.logger.Info(ctx, "clearing dirty flag for container created after config modification", + slog.F("container", container.ID), + slog.F("created_at", container.CreatedAt), + slog.F("config_modified_at", lastModified), + slog.F("file", configFile), + ) + api.knownDevcontainers[knownIndex].Dirty = false } - api.knownDevcontainers[knownIndex].Running = container.Running - api.knownDevcontainers[knownIndex].Container = &container - continue } + continue + } - // If not in our known list, add as a runtime detected entry. - name := path.Base(workspaceFolder) - if _, ok := api.devcontainerNames[name]; ok { - // Try to find a unique name by appending a number. - for i := 2; ; i++ { - newName := fmt.Sprintf("%s-%d", name, i) - if _, ok := api.devcontainerNames[newName]; !ok { - name = newName - break - } + // If not in our known list, add as a runtime detected entry. + name := path.Base(workspaceFolder) + if _, ok := api.devcontainerNames[name]; ok { + // Try to find a unique name by appending a number. + for i := 2; ; i++ { + newName := fmt.Sprintf("%s-%d", name, i) + if _, ok := api.devcontainerNames[newName]; !ok { + name = newName + break } } - api.devcontainerNames[name] = struct{}{} - api.knownDevcontainers = append(api.knownDevcontainers, codersdk.WorkspaceAgentDevcontainer{ - ID: uuid.New(), - Name: name, - WorkspaceFolder: workspaceFolder, - ConfigPath: container.Labels[DevcontainerConfigFileLabel], - Running: container.Running, - Container: &container, - }) } + api.devcontainerNames[name] = struct{}{} + if configFile != "" { + if err := api.watcher.Add(configFile); err != nil { + api.logger.Error(ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", configFile)) + } + } + + dirty := dirtyStates[workspaceFolder] + if dirty { + lastModified, hasModTime := api.configModifiedTimes[configFile] + if hasModTime && container.CreatedAt.After(lastModified) { + api.logger.Info(ctx, "new container created after config modification, not marking as dirty", + slog.F("container", container.ID), + slog.F("created_at", container.CreatedAt), + slog.F("config_modified_at", lastModified), + slog.F("file", configFile), + ) + dirty = false + } + } + + api.knownDevcontainers = append(api.knownDevcontainers, codersdk.WorkspaceAgentDevcontainer{ + ID: uuid.New(), + Name: name, + WorkspaceFolder: workspaceFolder, + ConfigPath: configFile, + Running: container.Running, + Dirty: dirty, + Container: &container, + }) } return copyListContainersResponse(api.containers), nil @@ -271,6 +398,29 @@ func (api *API) handleRecreate(w http.ResponseWriter, r *http.Request) { return } + // TODO(mafredri): Temporarily handle clearing the dirty state after + // recreation, later on this should be handled by a "container watcher". + select { + case <-api.ctx.Done(): + return + case <-ctx.Done(): + return + case api.lockCh <- struct{}{}: + defer func() { <-api.lockCh }() + } + for i := range api.knownDevcontainers { + if api.knownDevcontainers[i].WorkspaceFolder == workspaceFolder { + if api.knownDevcontainers[i].Dirty { + api.logger.Info(ctx, "clearing dirty flag after recreation", + slog.F("workspace_folder", workspaceFolder), + slog.F("name", api.knownDevcontainers[i].Name), + ) + api.knownDevcontainers[i].Dirty = false + } + break + } + } + w.WriteHeader(http.StatusNoContent) } @@ -289,6 +439,8 @@ func (api *API) handleListDevcontainers(w http.ResponseWriter, r *http.Request) } select { + case <-api.ctx.Done(): + return case <-ctx.Done(): return case api.lockCh <- struct{}{}: @@ -309,3 +461,44 @@ func (api *API) handleListDevcontainers(w http.ResponseWriter, r *http.Request) httpapi.Write(ctx, w, http.StatusOK, response) } + +// markDevcontainerDirty finds the devcontainer with the given config file path +// and marks it as dirty. It acquires the lock before modifying the state. +func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) { + select { + case <-api.ctx.Done(): + return + case api.lockCh <- struct{}{}: + defer func() { <-api.lockCh }() + } + + // Record the timestamp of when this configuration file was modified. + api.configModifiedTimes[configPath] = modifiedAt + + for i := range api.knownDevcontainers { + if api.knownDevcontainers[i].ConfigPath == configPath { + // TODO(mafredri): Simplistic mark for now, we should check if the + // container is running and if the config file was modified after + // the container was created. + if !api.knownDevcontainers[i].Dirty { + api.logger.Info(api.ctx, "marking devcontainer as dirty", + slog.F("file", configPath), + slog.F("name", api.knownDevcontainers[i].Name), + slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder), + slog.F("modified_at", modifiedAt), + ) + api.knownDevcontainers[i].Dirty = true + } + } + } +} + +func (api *API) Close() error { + api.cancel() + <-api.done + err := api.watcher.Close() + if err != nil { + return err + } + return nil +} diff --git a/agent/agentcontainers/api_internal_test.go b/agent/agentcontainers/api_internal_test.go index 756526d341d68..331c41e8df10b 100644 --- a/agent/agentcontainers/api_internal_test.go +++ b/agent/agentcontainers/api_internal_test.go @@ -103,6 +103,8 @@ func TestAPI(t *testing.T) { logger = slogtest.Make(t, nil).Leveled(slog.LevelDebug) api = NewAPI(logger, WithLister(mockLister)) ) + defer api.Close() + api.cacheDuration = tc.cacheDur api.clock = clk api.containers = tc.cacheData diff --git a/agent/agentcontainers/api_test.go b/agent/agentcontainers/api_test.go index 6f2fe5ce84919..1deff56f84650 100644 --- a/agent/agentcontainers/api_test.go +++ b/agent/agentcontainers/api_test.go @@ -6,7 +6,9 @@ import ( "net/http" "net/http/httptest" "testing" + "time" + "github.com/fsnotify/fsnotify" "github.com/go-chi/chi/v5" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -17,6 +19,8 @@ import ( "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/agent/agentcontainers" "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/testutil" + "github.com/coder/quartz" ) // fakeLister implements the agentcontainers.Lister interface for @@ -41,6 +45,103 @@ func (f *fakeDevcontainerCLI) Up(_ context.Context, _, _ string, _ ...agentconta return f.id, f.err } +// fakeWatcher implements the watcher.Watcher interface for testing. +// It allows controlling what events are sent and when. +type fakeWatcher struct { + t testing.TB + events chan *fsnotify.Event + closeNotify chan struct{} + addedPaths []string + closed bool + nextCalled chan struct{} + nextErr error + closeErr error +} + +func newFakeWatcher(t testing.TB) *fakeWatcher { + return &fakeWatcher{ + t: t, + events: make(chan *fsnotify.Event, 10), // Buffered to avoid blocking tests. + closeNotify: make(chan struct{}), + addedPaths: make([]string, 0), + nextCalled: make(chan struct{}, 1), + } +} + +func (w *fakeWatcher) Add(file string) error { + w.addedPaths = append(w.addedPaths, file) + return nil +} + +func (w *fakeWatcher) Remove(file string) error { + for i, path := range w.addedPaths { + if path == file { + w.addedPaths = append(w.addedPaths[:i], w.addedPaths[i+1:]...) + break + } + } + return nil +} + +func (w *fakeWatcher) clearNext() { + select { + case <-w.nextCalled: + default: + } +} + +func (w *fakeWatcher) waitNext(ctx context.Context) bool { + select { + case <-w.t.Context().Done(): + return false + case <-ctx.Done(): + return false + case <-w.closeNotify: + return false + case <-w.nextCalled: + return true + } +} + +func (w *fakeWatcher) Next(ctx context.Context) (*fsnotify.Event, error) { + select { + case w.nextCalled <- struct{}{}: + default: + } + + if w.nextErr != nil { + err := w.nextErr + w.nextErr = nil + return nil, err + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-w.closeNotify: + return nil, xerrors.New("watcher closed") + case event := <-w.events: + return event, nil + } +} + +func (w *fakeWatcher) Close() error { + if w.closed { + return nil + } + + w.closed = true + close(w.closeNotify) + return w.closeErr +} + +// sendEvent sends a file system event through the fake watcher. +func (w *fakeWatcher) sendEventWaitNextCalled(ctx context.Context, event fsnotify.Event) { + w.clearNext() + w.events <- &event + w.waitNext(ctx) +} + func TestAPI(t *testing.T) { t.Parallel() @@ -153,6 +254,7 @@ func TestAPI(t *testing.T) { agentcontainers.WithLister(tt.lister), agentcontainers.WithDevcontainerCLI(tt.devcontainerCLI), ) + defer api.Close() r.Mount("/", api.Routes()) // Simulate HTTP request to the recreate endpoint. @@ -463,6 +565,7 @@ func TestAPI(t *testing.T) { } api := agentcontainers.NewAPI(logger, apiOptions...) + defer api.Close() r.Mount("/", api.Routes()) req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil) @@ -489,6 +592,104 @@ func TestAPI(t *testing.T) { }) } }) + + t.Run("FileWatcher", func(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitMedium) + + startTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + mClock := quartz.NewMock(t) + mClock.Set(startTime) + fWatcher := newFakeWatcher(t) + + // Create a fake container with a config file. + configPath := "/workspace/project/.devcontainer/devcontainer.json" + container := codersdk.WorkspaceAgentContainer{ + ID: "container-id", + FriendlyName: "container-name", + Running: true, + CreatedAt: startTime.Add(-1 * time.Hour), // Created 1 hour before test start. + Labels: map[string]string{ + agentcontainers.DevcontainerLocalFolderLabel: "/workspace/project", + agentcontainers.DevcontainerConfigFileLabel: configPath, + }, + } + + fLister := &fakeLister{ + containers: codersdk.WorkspaceAgentListContainersResponse{ + Containers: []codersdk.WorkspaceAgentContainer{container}, + }, + } + + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + api := agentcontainers.NewAPI( + logger, + agentcontainers.WithLister(fLister), + agentcontainers.WithWatcher(fWatcher), + agentcontainers.WithClock(mClock), + ) + defer api.Close() + + r := chi.NewRouter() + r.Mount("/", api.Routes()) + + // Call the list endpoint first to ensure config files are + // detected and watched. + req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil) + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + + var response codersdk.WorkspaceAgentDevcontainersResponse + err := json.NewDecoder(rec.Body).Decode(&response) + require.NoError(t, err) + require.Len(t, response.Devcontainers, 1) + assert.False(t, response.Devcontainers[0].Dirty, + "container should not be marked as dirty initially") + + // Verify the watcher is watching the config file. + assert.Contains(t, fWatcher.addedPaths, configPath, + "watcher should be watching the container's config file") + + // Send a file modification event and check if the container is + // marked dirty. + fWatcher.sendEventWaitNextCalled(ctx, fsnotify.Event{ + Name: configPath, + Op: fsnotify.Write, + }) + + // Check if the container is marked as dirty. + req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil) + rec = httptest.NewRecorder() + r.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + + err = json.NewDecoder(rec.Body).Decode(&response) + require.NoError(t, err) + require.Len(t, response.Devcontainers, 1) + assert.True(t, response.Devcontainers[0].Dirty, + "container should be marked as dirty after config file was modified") + + mClock.Advance(10 * time.Minute).MustWait(ctx) + + container.ID = "new-container-id" // Simulate a new container ID after recreation. + container.FriendlyName = "new-container-name" + container.CreatedAt = mClock.Now() // Update the creation time. + fLister.containers.Containers = []codersdk.WorkspaceAgentContainer{container} + + // Check if dirty flag is cleared. + req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil) + rec = httptest.NewRecorder() + r.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + + err = json.NewDecoder(rec.Body).Decode(&response) + require.NoError(t, err) + require.Len(t, response.Devcontainers, 1) + assert.False(t, response.Devcontainers[0].Dirty, + "dirty flag should be cleared after container recreation") + }) } // mustFindDevcontainerByPath returns the devcontainer with the given workspace diff --git a/agent/agentcontainers/watcher/noop.go b/agent/agentcontainers/watcher/noop.go new file mode 100644 index 0000000000000..7332215951c96 --- /dev/null +++ b/agent/agentcontainers/watcher/noop.go @@ -0,0 +1,38 @@ +package watcher + +import ( + "context" + + "github.com/fsnotify/fsnotify" + "golang.org/x/xerrors" +) + +func NewNoop() Watcher { + return &noopWatcher{closed: make(chan struct{})} +} + +type noopWatcher struct { + closed chan struct{} +} + +func (*noopWatcher) Add(string) error { + return nil +} + +func (*noopWatcher) Remove(string) error { + return nil +} + +func (n *noopWatcher) Next(ctx context.Context) (*fsnotify.Event, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-n.closed: + return nil, xerrors.New("watcher closed") + } +} + +func (n *noopWatcher) Close() error { + close(n.closed) + return nil +} diff --git a/agent/agentcontainers/watcher/noop_test.go b/agent/agentcontainers/watcher/noop_test.go new file mode 100644 index 0000000000000..5e9aa07f89925 --- /dev/null +++ b/agent/agentcontainers/watcher/noop_test.go @@ -0,0 +1,70 @@ +package watcher_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/agent/agentcontainers/watcher" + "github.com/coder/coder/v2/testutil" +) + +func TestNoopWatcher(t *testing.T) { + t.Parallel() + + // Create the noop watcher under test. + wut := watcher.NewNoop() + + // Test adding/removing files (should have no effect). + err := wut.Add("some-file.txt") + assert.NoError(t, err, "noop watcher should not return error on Add") + + err = wut.Remove("some-file.txt") + assert.NoError(t, err, "noop watcher should not return error on Remove") + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + // Start a goroutine to wait for Next to return. + errC := make(chan error, 1) + go func() { + _, err := wut.Next(ctx) + errC <- err + }() + + select { + case <-errC: + require.Fail(t, "want Next to block") + default: + } + + // Cancel the context and check that Next returns. + cancel() + + select { + case err := <-errC: + assert.Error(t, err, "want Next error when context is canceled") + case <-time.After(testutil.WaitShort): + t.Fatal("want Next to return after context was canceled") + } + + // Test Close. + err = wut.Close() + assert.NoError(t, err, "want no error on Close") +} + +func TestNoopWatcher_CloseBeforeNext(t *testing.T) { + t.Parallel() + + wut := watcher.NewNoop() + + err := wut.Close() + require.NoError(t, err, "close watcher failed") + + ctx := context.Background() + _, err = wut.Next(ctx) + assert.Error(t, err, "want Next to return error when watcher is closed") +} diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go new file mode 100644 index 0000000000000..505e1b43d1dd1 --- /dev/null +++ b/agent/agentcontainers/watcher/watcher.go @@ -0,0 +1,89 @@ +// Package watcher provides file system watching capabilities for the +// agent. It defines an interface for monitoring file changes and +// implementations that can be used to detect when configuration files +// are modified. This is primarily used to track changes to devcontainer +// configuration files and notify users when containers need to be +// recreated to apply the new configuration. +package watcher + +import ( + "context" + + "github.com/fsnotify/fsnotify" + "golang.org/x/xerrors" +) + +// Watcher defines an interface for monitoring file system changes. +// Implementations track file modifications and provide an event stream +// that clients can consume to react to changes. +type Watcher interface { + // Add starts watching a file for changes. + Add(file string) error + + // Remove stops watching a file for changes. + Remove(file string) error + + // Next blocks until a file system event occurs or the context is canceled. + // It returns the next event or an error if the watcher encountered a problem. + Next(context.Context) (*fsnotify.Event, error) + + // Close shuts down the watcher and releases any resources. + Close() error +} + +type fsnotifyWatcher struct { + *fsnotify.Watcher + closed chan struct{} +} + +func NewFSNotify() (Watcher, error) { + w, err := fsnotify.NewWatcher() + if err != nil { + return nil, xerrors.Errorf("create fsnotify watcher: %w", err) + } + return &fsnotifyWatcher{ + Watcher: w, + closed: make(chan struct{}), + }, nil +} + +func (f *fsnotifyWatcher) Add(file string) error { + if err := f.Watcher.Add(file); err != nil { + return xerrors.Errorf("add file to watcher: %w", err) + } + return nil +} + +func (f *fsnotifyWatcher) Remove(file string) error { + if err := f.Watcher.Remove(file); err != nil { + return xerrors.Errorf("remove file from watcher: %w", err) + } + return nil +} + +func (f *fsnotifyWatcher) Next(ctx context.Context) (*fsnotify.Event, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case event, ok := <-f.Events: + if !ok { + return nil, xerrors.New("watcher closed") + } + return &event, nil + case err, ok := <-f.Errors: + if !ok { + return nil, xerrors.New("watcher closed") + } + return nil, xerrors.Errorf("watcher error: %w", err) + case <-f.closed: + return nil, xerrors.New("watcher closed") + } +} + +func (f *fsnotifyWatcher) Close() error { + if err := f.Watcher.Close(); err != nil { + return xerrors.Errorf("close watcher: %w", err) + } + close(f.closed) + return nil +} diff --git a/agent/agentcontainers/watcher/watcher_test.go b/agent/agentcontainers/watcher/watcher_test.go new file mode 100644 index 0000000000000..35418a466e2e1 --- /dev/null +++ b/agent/agentcontainers/watcher/watcher_test.go @@ -0,0 +1,66 @@ +package watcher_test + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/fsnotify/fsnotify" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/agent/agentcontainers/watcher" + "github.com/coder/coder/v2/testutil" +) + +func TestFSNotifyWatcher(t *testing.T) { + t.Parallel() + + // Create test files. + dir := t.TempDir() + testFile := filepath.Join(dir, "test.json") + err := os.WriteFile(testFile, []byte(`{"test": "initial"}`), 0o600) + require.NoError(t, err, "create test file failed") + + // Create the watcher under test. + wut, err := watcher.NewFSNotify() + require.NoError(t, err, "create FSNotify watcher failed") + defer wut.Close() + + // Add the test file to the watch list. + err = wut.Add(testFile) + require.NoError(t, err, "add file to watcher failed") + + ctx := testutil.Context(t, testutil.WaitShort) + + // Modify the test file to trigger an event. + err = os.WriteFile(testFile, []byte(`{"test": "modified"}`), 0o600) + require.NoError(t, err, "modify test file failed") + + // Verify that we receive the event we want. + event, err := wut.Next(ctx) + require.NoError(t, err, "next event failed") + + require.NotNil(t, event, "want non-nil event") + require.True(t, event.Has(fsnotify.Write), "want write event") + require.Equal(t, event.Name, testFile, "want event for test file") + + // Test removing the file from the watcher. + err = wut.Remove(testFile) + require.NoError(t, err, "remove file from watcher failed") +} + +func TestFSNotifyWatcher_CloseBeforeNext(t *testing.T) { + t.Parallel() + + wut, err := watcher.NewFSNotify() + require.NoError(t, err, "create FSNotify watcher failed") + + err = wut.Close() + require.NoError(t, err, "close watcher failed") + + ctx := context.Background() + _, err = wut.Next(ctx) + assert.Error(t, err, "want Next to return error when watcher is closed") +} diff --git a/agent/api.go b/agent/api.go index 0813deb77a146..97a04333f147e 100644 --- a/agent/api.go +++ b/agent/api.go @@ -12,7 +12,7 @@ import ( "github.com/coder/coder/v2/codersdk" ) -func (a *agent) apiHandler() http.Handler { +func (a *agent) apiHandler() (http.Handler, func() error) { r := chi.NewRouter() r.Get("/", func(rw http.ResponseWriter, r *http.Request) { httpapi.Write(r.Context(), rw, http.StatusOK, codersdk.Response{ @@ -63,7 +63,9 @@ func (a *agent) apiHandler() http.Handler { r.Get("/debug/manifest", a.HandleHTTPDebugManifest) r.Get("/debug/prometheus", promHandler.ServeHTTP) - return r + return r, func() error { + return containerAPI.Close() + } } type listeningPortsHandler struct { diff --git a/codersdk/workspaceagents.go b/codersdk/workspaceagents.go index 6a72de5ae4ff3..5c7171f70a627 100644 --- a/codersdk/workspaceagents.go +++ b/codersdk/workspaceagents.go @@ -408,6 +408,7 @@ type WorkspaceAgentDevcontainer struct { // Additional runtime fields. Running bool `json:"running"` + Dirty bool `json:"dirty"` Container *WorkspaceAgentContainer `json:"container,omitempty"` } diff --git a/go.mod b/go.mod index 230c911779b2f..8861406255434 100644 --- a/go.mod +++ b/go.mod @@ -488,6 +488,7 @@ require ( require ( github.com/coder/preview v0.0.1 + github.com/fsnotify/fsnotify v1.9.0 github.com/kylecarbs/aisdk-go v0.0.5 github.com/mark3labs/mcp-go v0.22.0 ) From a997f8238640c482143b25b4f351c3a50e6b53f0 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 11:18:44 +0000 Subject: [PATCH 02/23] make gen --- site/src/api/typesGenerated.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/site/src/api/typesGenerated.ts b/site/src/api/typesGenerated.ts index 634c2da3f2bb1..f9c6ae423c822 100644 --- a/site/src/api/typesGenerated.ts +++ b/site/src/api/typesGenerated.ts @@ -3252,6 +3252,7 @@ export interface WorkspaceAgentDevcontainer { readonly workspace_folder: string; readonly config_path?: string; readonly running: boolean; + readonly dirty: boolean; readonly container?: WorkspaceAgentContainer; } From 0e568fb0779596bf1e23c032f0141257e945f6c3 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 11:25:02 +0000 Subject: [PATCH 03/23] fix copilot nits --- agent/agent.go | 3 +++ agent/agentcontainers/api.go | 38 ++++++++++++++++++------------------ 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 1ca5bd8faf858..b195368338242 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1482,6 +1482,9 @@ func (a *agent) createTailnet( if err = a.trackGoroutine(func() { defer apiListener.Close() apiHandler, closeAPIHAndler := a.apiHandler() + defer func() { + _ = closeAPIHAndler() + }() server := &http.Server{ BaseContext: func(net.Listener) context.Context { return ctx }, Handler: apiHandler, diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index fa61bc18b0d15..344661cfd76f9 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -45,12 +45,12 @@ type API struct { // lockCh protects the below fields. We use a channel instead of a // mutex so we can handle cancellation properly. - lockCh chan struct{} - containers codersdk.WorkspaceAgentListContainersResponse - mtime time.Time - devcontainerNames map[string]struct{} // Track devcontainer names to avoid duplicates. - knownDevcontainers []codersdk.WorkspaceAgentDevcontainer // Track predefined and runtime-detected devcontainers. - configModifiedTimes map[string]time.Time // Track when config files were last modified. + lockCh chan struct{} + containers codersdk.WorkspaceAgentListContainersResponse + mtime time.Time + devcontainerNames map[string]struct{} // Track devcontainer names to avoid duplicates. + knownDevcontainers []codersdk.WorkspaceAgentDevcontainer // Track predefined and runtime-detected devcontainers. + configFileModifiedTimes map[string]time.Time // Track when config files were last modified. } // Option is a functional option for API. @@ -108,16 +108,16 @@ func WithWatcher(w watcher.Watcher) Option { func NewAPI(logger slog.Logger, options ...Option) *API { ctx, cancel := context.WithCancel(context.Background()) api := &API{ - ctx: ctx, - cancel: cancel, - done: make(chan struct{}), - logger: logger, - clock: quartz.NewReal(), - cacheDuration: defaultGetContainersCacheDuration, - lockCh: make(chan struct{}, 1), - devcontainerNames: make(map[string]struct{}), - knownDevcontainers: []codersdk.WorkspaceAgentDevcontainer{}, - configModifiedTimes: make(map[string]time.Time), + ctx: ctx, + cancel: cancel, + done: make(chan struct{}), + logger: logger, + clock: quartz.NewReal(), + cacheDuration: defaultGetContainersCacheDuration, + lockCh: make(chan struct{}, 1), + devcontainerNames: make(map[string]struct{}), + knownDevcontainers: []codersdk.WorkspaceAgentDevcontainer{}, + configFileModifiedTimes: make(map[string]time.Time), } for _, opt := range options { opt(api) @@ -281,7 +281,7 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC // Check if this container was created after the config // file was modified. if configFile != "" && api.knownDevcontainers[knownIndex].Dirty { - lastModified, hasModTime := api.configModifiedTimes[configFile] + lastModified, hasModTime := api.configFileModifiedTimes[configFile] if hasModTime && container.CreatedAt.After(lastModified) { api.logger.Info(ctx, "clearing dirty flag for container created after config modification", slog.F("container", container.ID), @@ -316,7 +316,7 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC dirty := dirtyStates[workspaceFolder] if dirty { - lastModified, hasModTime := api.configModifiedTimes[configFile] + lastModified, hasModTime := api.configFileModifiedTimes[configFile] if hasModTime && container.CreatedAt.After(lastModified) { api.logger.Info(ctx, "new container created after config modification, not marking as dirty", slog.F("container", container.ID), @@ -473,7 +473,7 @@ func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) { } // Record the timestamp of when this configuration file was modified. - api.configModifiedTimes[configPath] = modifiedAt + api.configFileModifiedTimes[configPath] = modifiedAt for i := range api.knownDevcontainers { if api.knownDevcontainers[i].ConfigPath == configPath { From 4f788d9f0d36c138cc575fa402a8fea9459910e9 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 11:31:31 +0000 Subject: [PATCH 04/23] log event string in case not write --- agent/agentcontainers/watcher/watcher_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/agentcontainers/watcher/watcher_test.go b/agent/agentcontainers/watcher/watcher_test.go index 35418a466e2e1..fdda564cc5cfa 100644 --- a/agent/agentcontainers/watcher/watcher_test.go +++ b/agent/agentcontainers/watcher/watcher_test.go @@ -43,7 +43,7 @@ func TestFSNotifyWatcher(t *testing.T) { require.NoError(t, err, "next event failed") require.NotNil(t, event, "want non-nil event") - require.True(t, event.Has(fsnotify.Write), "want write event") + require.True(t, event.Has(fsnotify.Write), "want write event", event.String()) require.Equal(t, event.Name, testFile, "want event for test file") // Test removing the file from the watcher. From bfeb1da6c01a2e231638714ca42707850f0cedfe Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 11:37:35 +0000 Subject: [PATCH 05/23] fix chmod on macos --- agent/agentcontainers/watcher/watcher_test.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/agent/agentcontainers/watcher/watcher_test.go b/agent/agentcontainers/watcher/watcher_test.go index fdda564cc5cfa..895e327e171d9 100644 --- a/agent/agentcontainers/watcher/watcher_test.go +++ b/agent/agentcontainers/watcher/watcher_test.go @@ -39,12 +39,19 @@ func TestFSNotifyWatcher(t *testing.T) { require.NoError(t, err, "modify test file failed") // Verify that we receive the event we want. - event, err := wut.Next(ctx) - require.NoError(t, err, "next event failed") - - require.NotNil(t, event, "want non-nil event") - require.True(t, event.Has(fsnotify.Write), "want write event", event.String()) - require.Equal(t, event.Name, testFile, "want event for test file") + for { + event, err := wut.Next(ctx) + require.NoError(t, err, "next event failed") + + require.NotNil(t, event, "want non-nil event") + if event.Has(fsnotify.Chmod) && !event.Has(fsnotify.Write) { + // Ignore plain chmod events. + continue + } + require.Truef(t, event.Has(fsnotify.Write), "want write event: %s", event.String()) + require.Equal(t, event.Name, testFile, "want event for test file") + break + } // Test removing the file from the watcher. err = wut.Remove(testFile) From 45a978c0d151b3f64a2ba93008faf17043bcad46 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 11:56:28 +0000 Subject: [PATCH 06/23] fix close of closed channel --- agent/agentcontainers/watcher/noop.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/agent/agentcontainers/watcher/noop.go b/agent/agentcontainers/watcher/noop.go index 7332215951c96..7ef6137f12d53 100644 --- a/agent/agentcontainers/watcher/noop.go +++ b/agent/agentcontainers/watcher/noop.go @@ -12,7 +12,9 @@ func NewNoop() Watcher { } type noopWatcher struct { - closed chan struct{} + mu synx.Mutex + closed bool + done chan struct{} } func (*noopWatcher) Add(string) error { @@ -27,12 +29,18 @@ func (n *noopWatcher) Next(ctx context.Context) (*fsnotify.Event, error) { select { case <-ctx.Done(): return nil, ctx.Err() - case <-n.closed: + case <-n.done: return nil, xerrors.New("watcher closed") } } func (n *noopWatcher) Close() error { - close(n.closed) + n.mu.Lock() + defer n.mu.Unlock() + if n.closed { + return nil + } + n.closed = true + close(n.done) return nil } From d2721a7b8e292189915552bc994ea0be496d09b5 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 11:58:51 +0000 Subject: [PATCH 07/23] add close protection to fsnotify watcher as well --- agent/agentcontainers/watcher/watcher.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go index 505e1b43d1dd1..f0076a50b5e09 100644 --- a/agent/agentcontainers/watcher/watcher.go +++ b/agent/agentcontainers/watcher/watcher.go @@ -8,6 +8,7 @@ package watcher import ( "context" + "sync" "github.com/fsnotify/fsnotify" "golang.org/x/xerrors" @@ -33,7 +34,8 @@ type Watcher interface { type fsnotifyWatcher struct { *fsnotify.Watcher - closed chan struct{} + closeOnce sync.Once + closed chan struct{} } func NewFSNotify() (Watcher, error) { @@ -80,10 +82,12 @@ func (f *fsnotifyWatcher) Next(ctx context.Context) (*fsnotify.Event, error) { } } -func (f *fsnotifyWatcher) Close() error { - if err := f.Watcher.Close(); err != nil { - return xerrors.Errorf("close watcher: %w", err) - } - close(f.closed) - return nil +func (f *fsnotifyWatcher) Close() (err error) { + f.closeOnce.Do(func() { + if err := f.Watcher.Close(); err != nil { + err = xerrors.Errorf("close watcher: %w", err) + } + close(f.closed) + }) + return err } From c03bb7d74715de56a216c5a0a103d7bcf6ba50d1 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 12:02:11 +0000 Subject: [PATCH 08/23] fix typo --- agent/agentcontainers/watcher/noop.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/agent/agentcontainers/watcher/noop.go b/agent/agentcontainers/watcher/noop.go index 7ef6137f12d53..770fb8d0c6e9a 100644 --- a/agent/agentcontainers/watcher/noop.go +++ b/agent/agentcontainers/watcher/noop.go @@ -2,17 +2,19 @@ package watcher import ( "context" + "sync" "github.com/fsnotify/fsnotify" "golang.org/x/xerrors" ) +// NewNoop creates a new watcher that does nothing. func NewNoop() Watcher { - return &noopWatcher{closed: make(chan struct{})} + return &noopWatcher{done: make(chan struct{})} } type noopWatcher struct { - mu synx.Mutex + mu sync.Mutex closed bool done chan struct{} } From aed7c3023cee9dd12ebd18de4f030afb0e81092b Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 12:07:23 +0000 Subject: [PATCH 09/23] fix pr suggestions and improve start loop --- agent/agentcontainers/api.go | 6 +++++- agent/agentcontainers/watcher/watcher.go | 20 +++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 344661cfd76f9..3401f865cd774 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -152,10 +152,14 @@ func (api *API) start() { for { event, err := api.watcher.Next(api.ctx) if err != nil { - if errors.Is(err, context.Canceled) { + if errors.Is(err, watcher.ErrWatcherClosed) { api.logger.Debug(api.ctx, "watcher closed") return } + if api.ctx.Err() != nil { + api.logger.Debug(api.ctx, "api context canceled") + return + } api.logger.Error(api.ctx, "watcher error waiting for next event", slog.Error(err)) continue } diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go index f0076a50b5e09..31351e3a81cbb 100644 --- a/agent/agentcontainers/watcher/watcher.go +++ b/agent/agentcontainers/watcher/watcher.go @@ -14,6 +14,8 @@ import ( "golang.org/x/xerrors" ) +var ErrWatcherClosed = xerrors.New("watcher closed") + // Watcher defines an interface for monitoring file system changes. // Implementations track file modifications and provide an event stream // that clients can consume to react to changes. @@ -63,28 +65,36 @@ func (f *fsnotifyWatcher) Remove(file string) error { return nil } -func (f *fsnotifyWatcher) Next(ctx context.Context) (*fsnotify.Event, error) { +func (f *fsnotifyWatcher) Next(ctx context.Context) (event *fsnotify.Event, err error) { + defer func() { + if ctx.Err() != nil { + event = nil + err = ctx.Err() + } + }() + select { case <-ctx.Done(): return nil, ctx.Err() case event, ok := <-f.Events: if !ok { - return nil, xerrors.New("watcher closed") + return nil, ErrWatcherClosed } return &event, nil case err, ok := <-f.Errors: if !ok { - return nil, xerrors.New("watcher closed") + return nil, ErrWatcherClosed } return nil, xerrors.Errorf("watcher error: %w", err) case <-f.closed: - return nil, xerrors.New("watcher closed") + return nil, ErrWatcherClosed } } func (f *fsnotifyWatcher) Close() (err error) { + err = ErrWatcherClosed f.closeOnce.Do(func() { - if err := f.Watcher.Close(); err != nil { + if err = f.Watcher.Close(); err != nil { err = xerrors.Errorf("close watcher: %w", err) } close(f.closed) From 39526e647d6734b90201b443eee5be508a0a339f Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 12:14:27 +0000 Subject: [PATCH 10/23] update noop impl. --- agent/agentcontainers/watcher/noop.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/agent/agentcontainers/watcher/noop.go b/agent/agentcontainers/watcher/noop.go index 770fb8d0c6e9a..9401a4fe48ab6 100644 --- a/agent/agentcontainers/watcher/noop.go +++ b/agent/agentcontainers/watcher/noop.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/fsnotify/fsnotify" - "golang.org/x/xerrors" ) // NewNoop creates a new watcher that does nothing. @@ -27,12 +26,13 @@ func (*noopWatcher) Remove(string) error { return nil } +// Next blocks until the context is canceled or the watcher is closed. func (n *noopWatcher) Next(ctx context.Context) (*fsnotify.Event, error) { select { case <-ctx.Done(): return nil, ctx.Err() case <-n.done: - return nil, xerrors.New("watcher closed") + return nil, ErrWatcherClosed } } @@ -40,7 +40,7 @@ func (n *noopWatcher) Close() error { n.mu.Lock() defer n.mu.Unlock() if n.closed { - return nil + return ErrWatcherClosed } n.closed = true close(n.done) From 7edee90bbd678258f1e8d46acb4bf95dfb37644a Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 12:14:34 +0000 Subject: [PATCH 11/23] s/file/path --- agent/agentcontainers/watcher/watcher.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go index 31351e3a81cbb..e31732376e5db 100644 --- a/agent/agentcontainers/watcher/watcher.go +++ b/agent/agentcontainers/watcher/watcher.go @@ -51,16 +51,16 @@ func NewFSNotify() (Watcher, error) { }, nil } -func (f *fsnotifyWatcher) Add(file string) error { - if err := f.Watcher.Add(file); err != nil { - return xerrors.Errorf("add file to watcher: %w", err) +func (f *fsnotifyWatcher) Add(path string) error { + if err := f.Watcher.Add(path); err != nil { + return xerrors.Errorf("add path to watcher: %w", err) } return nil } -func (f *fsnotifyWatcher) Remove(file string) error { - if err := f.Watcher.Remove(file); err != nil { - return xerrors.Errorf("remove file from watcher: %w", err) +func (f *fsnotifyWatcher) Remove(path string) error { + if err := f.Watcher.Remove(path); err != nil { + return xerrors.Errorf("remove path from watcher: %w", err) } return nil } From 9cf5415a0aee8d3c399ec19b88ce65b3d8f6914d Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 12:36:27 +0000 Subject: [PATCH 12/23] rewrite watcher impl. to watch dirs --- agent/agentcontainers/watcher/watcher.go | 148 ++++++++++++++++++----- 1 file changed, 117 insertions(+), 31 deletions(-) diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go index e31732376e5db..34e8658166e94 100644 --- a/agent/agentcontainers/watcher/watcher.go +++ b/agent/agentcontainers/watcher/watcher.go @@ -8,6 +8,7 @@ package watcher import ( "context" + "path/filepath" "sync" "github.com/fsnotify/fsnotify" @@ -36,32 +37,91 @@ type Watcher interface { type fsnotifyWatcher struct { *fsnotify.Watcher - closeOnce sync.Once - closed chan struct{} + + mu sync.Mutex // Protects following. + watchedFiles map[string]bool // Files being watched (absolute path -> bool). + watchedDirs map[string]int // Refcount of directories being watched (absolute path -> count). + closed bool // Protects closing of done. + done chan struct{} } +// NewFSNotify creates a new file system watcher that watches parent directories +// instead of individual files for more reliable event detection. func NewFSNotify() (Watcher, error) { w, err := fsnotify.NewWatcher() if err != nil { return nil, xerrors.Errorf("create fsnotify watcher: %w", err) } return &fsnotifyWatcher{ - Watcher: w, - closed: make(chan struct{}), + Watcher: w, + done: make(chan struct{}), + watchedFiles: make(map[string]bool), + watchedDirs: make(map[string]int), }, nil } -func (f *fsnotifyWatcher) Add(path string) error { - if err := f.Watcher.Add(path); err != nil { - return xerrors.Errorf("add path to watcher: %w", err) +func (f *fsnotifyWatcher) Add(file string) error { + absPath, err := filepath.Abs(file) + if err != nil { + return xerrors.Errorf("absolute path: %w", err) + } + + dir := filepath.Dir(absPath) + + f.mu.Lock() + defer f.mu.Unlock() + + // Already watching this file. + if f.watchedFiles[absPath] { + return nil + } + + // Start watching the parent directory if not already watching. + if f.watchedDirs[dir] == 0 { + if err := f.Watcher.Add(dir); err != nil { + return xerrors.Errorf("add directory to watcher: %w", err) + } } + + // Increment the reference count for this directory. + f.watchedDirs[dir]++ + // Mark this file as watched. + f.watchedFiles[absPath] = true + return nil } -func (f *fsnotifyWatcher) Remove(path string) error { - if err := f.Watcher.Remove(path); err != nil { - return xerrors.Errorf("remove path from watcher: %w", err) +func (f *fsnotifyWatcher) Remove(file string) error { + absPath, err := filepath.Abs(file) + if err != nil { + return xerrors.Errorf("absolute path: %w", err) + } + + dir := filepath.Dir(absPath) + + f.mu.Lock() + defer f.mu.Unlock() + + // Not watching this file. + if !f.watchedFiles[absPath] { + return nil + } + + // Remove the file from our watch list. + delete(f.watchedFiles, absPath) + + // Decrement the reference count for this directory. + f.watchedDirs[dir]-- + + // If no more files in this directory are being watched, stop + // watching the directory. + if f.watchedDirs[dir] <= 0 { + if err := f.Watcher.Remove(dir); err != nil { + return xerrors.Errorf("remove directory from watcher: %w", err) + } + delete(f.watchedDirs, dir) } + return nil } @@ -73,31 +133,57 @@ func (f *fsnotifyWatcher) Next(ctx context.Context) (event *fsnotify.Event, err } }() - select { - case <-ctx.Done(): - return nil, ctx.Err() - case event, ok := <-f.Events: - if !ok { - return nil, ErrWatcherClosed - } - return &event, nil - case err, ok := <-f.Errors: - if !ok { + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case evt, ok := <-f.Events: + if !ok { + return nil, ErrWatcherClosed + } + + // Get the absolute path to match against our watched files. + absPath, err := filepath.Abs(evt.Name) + if err != nil { + continue + } + + f.mu.Lock() + isWatched := f.watchedFiles[absPath] + f.mu.Unlock() + if isWatched { + return &evt, nil + } + + continue // Ignore events for files not being watched. + + case err, ok := <-f.Errors: + if !ok { + return nil, ErrWatcherClosed + } + return nil, xerrors.Errorf("watcher error: %w", err) + case <-f.done: return nil, ErrWatcherClosed } - return nil, xerrors.Errorf("watcher error: %w", err) - case <-f.closed: - return nil, ErrWatcherClosed } } func (f *fsnotifyWatcher) Close() (err error) { - err = ErrWatcherClosed - f.closeOnce.Do(func() { - if err = f.Watcher.Close(); err != nil { - err = xerrors.Errorf("close watcher: %w", err) - } - close(f.closed) - }) - return err + f.mu.Lock() + f.watchedFiles = nil + f.watchedDirs = nil + closed := f.closed + f.mu.Unlock() + + if closed { + return ErrWatcherClosed + } + + close(f.done) + + if err := f.Watcher.Close(); err != nil { + return xerrors.Errorf("close watcher: %w", err) + } + + return nil } From de2a42e0ffcdc7ad99ea1e9c7040afe79be177ab Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 12:45:53 +0000 Subject: [PATCH 13/23] add rename and create to fsnotify test --- agent/agentcontainers/watcher/watcher_test.go | 39 ++++++++++++++++++- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/agent/agentcontainers/watcher/watcher_test.go b/agent/agentcontainers/watcher/watcher_test.go index 895e327e171d9..9144fb5e5c724 100644 --- a/agent/agentcontainers/watcher/watcher_test.go +++ b/agent/agentcontainers/watcher/watcher_test.go @@ -44,8 +44,8 @@ func TestFSNotifyWatcher(t *testing.T) { require.NoError(t, err, "next event failed") require.NotNil(t, event, "want non-nil event") - if event.Has(fsnotify.Chmod) && !event.Has(fsnotify.Write) { - // Ignore plain chmod events. + if !event.Has(fsnotify.Write) { + t.Logf("Ignoring event: %s", event) continue } require.Truef(t, event.Has(fsnotify.Write), "want write event: %s", event.String()) @@ -53,6 +53,41 @@ func TestFSNotifyWatcher(t *testing.T) { break } + // Rename the test file to trigger a rename event. + err = os.Rename(testFile, testFile+".bak") + require.NoError(t, err, "rename test file failed") + + // Verify that we receive the event we want. + for { + event, err := wut.Next(ctx) + require.NoError(t, err, "next event failed") + require.NotNil(t, event, "want non-nil event") + if !event.Has(fsnotify.Rename) { + t.Logf("Ignoring event: %s", event) + continue + } + require.Truef(t, event.Has(fsnotify.Rename), "want rename event: %s", event.String()) + require.Equal(t, event.Name, testFile, "want event for test file") + break + } + + err = os.WriteFile(testFile, []byte(`{"test": "new"}`), 0o600) + require.NoError(t, err, "write new test file failed") + + // Verify that we receive the event we want. + for { + event, err := wut.Next(ctx) + require.NoError(t, err, "next event failed") + require.NotNil(t, event, "want non-nil event") + if !event.Has(fsnotify.Create) { + t.Logf("Ignoring event: %s", event) + continue + } + require.Truef(t, event.Has(fsnotify.Create), "want create event: %s", event.String()) + require.Equal(t, event.Name, testFile, "want event for test file") + break + } + // Test removing the file from the watcher. err = wut.Remove(testFile) require.NoError(t, err, "remove file from watcher failed") From 0cbee38c20153463aa8133dc2b7b8814338446b0 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 12:47:57 +0000 Subject: [PATCH 14/23] add atomic file replacement --- agent/agentcontainers/watcher/watcher_test.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/agent/agentcontainers/watcher/watcher_test.go b/agent/agentcontainers/watcher/watcher_test.go index 9144fb5e5c724..6cddfbdcee276 100644 --- a/agent/agentcontainers/watcher/watcher_test.go +++ b/agent/agentcontainers/watcher/watcher_test.go @@ -88,6 +88,26 @@ func TestFSNotifyWatcher(t *testing.T) { break } + err = os.WriteFile(testFile+".atomic", []byte(`{"test": "atomic"}`), 0o600) + require.NoError(t, err, "write new atomic test file failed") + + err = os.Rename(testFile+".atomic", testFile) + require.NoError(t, err, "rename atomic test file failed") + + // Verify that we receive the event we want. + for { + event, err := wut.Next(ctx) + require.NoError(t, err, "next event failed") + require.NotNil(t, event, "want non-nil event") + if !event.Has(fsnotify.Create) { + t.Logf("Ignoring event: %s", event) + continue + } + require.Truef(t, event.Has(fsnotify.Create), "want create event: %s", event.String()) + require.Equal(t, event.Name, testFile, "want event for test file") + break + } + // Test removing the file from the watcher. err = wut.Remove(testFile) require.NoError(t, err, "remove file from watcher failed") From 5ced7e1107ccd828c1bb7bada89a38963bab5130 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 12:53:25 +0000 Subject: [PATCH 15/23] add paranoid reset --- agent/agentcontainers/watcher/watcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go index 34e8658166e94..fd65872fc7e86 100644 --- a/agent/agentcontainers/watcher/watcher.go +++ b/agent/agentcontainers/watcher/watcher.go @@ -116,6 +116,7 @@ func (f *fsnotifyWatcher) Remove(file string) error { // If no more files in this directory are being watched, stop // watching the directory. if f.watchedDirs[dir] <= 0 { + f.watchedDirs[dir] = 0 // Ensure non-negative count. if err := f.Watcher.Remove(dir); err != nil { return xerrors.Errorf("remove directory from watcher: %w", err) } From 5a96a55e0c1d7854d8a8e8b6f542d129b59b9334 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 12:57:16 +0000 Subject: [PATCH 16/23] fix oopsie --- agent/agentcontainers/watcher/watcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go index fd65872fc7e86..89d3f88120870 100644 --- a/agent/agentcontainers/watcher/watcher.go +++ b/agent/agentcontainers/watcher/watcher.go @@ -174,6 +174,7 @@ func (f *fsnotifyWatcher) Close() (err error) { f.watchedFiles = nil f.watchedDirs = nil closed := f.closed + f.closed = true f.mu.Unlock() if closed { From 0b16448052173f8a890bec005578ea1c8a3e018b Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 13:03:59 +0000 Subject: [PATCH 17/23] s/ErrWatcherClosed/ErrClosed/g --- agent/agentcontainers/api.go | 2 +- agent/agentcontainers/watcher/noop.go | 4 ++-- agent/agentcontainers/watcher/watcher.go | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 3401f865cd774..1c3505a49dd12 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -152,7 +152,7 @@ func (api *API) start() { for { event, err := api.watcher.Next(api.ctx) if err != nil { - if errors.Is(err, watcher.ErrWatcherClosed) { + if errors.Is(err, watcher.ErrClosed) { api.logger.Debug(api.ctx, "watcher closed") return } diff --git a/agent/agentcontainers/watcher/noop.go b/agent/agentcontainers/watcher/noop.go index 9401a4fe48ab6..4d1307b71c9ad 100644 --- a/agent/agentcontainers/watcher/noop.go +++ b/agent/agentcontainers/watcher/noop.go @@ -32,7 +32,7 @@ func (n *noopWatcher) Next(ctx context.Context) (*fsnotify.Event, error) { case <-ctx.Done(): return nil, ctx.Err() case <-n.done: - return nil, ErrWatcherClosed + return nil, ErrClosed } } @@ -40,7 +40,7 @@ func (n *noopWatcher) Close() error { n.mu.Lock() defer n.mu.Unlock() if n.closed { - return ErrWatcherClosed + return ErrClosed } n.closed = true close(n.done) diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go index 89d3f88120870..fa641e4753b65 100644 --- a/agent/agentcontainers/watcher/watcher.go +++ b/agent/agentcontainers/watcher/watcher.go @@ -15,7 +15,7 @@ import ( "golang.org/x/xerrors" ) -var ErrWatcherClosed = xerrors.New("watcher closed") +var ErrClosed = xerrors.New("watcher closed") // Watcher defines an interface for monitoring file system changes. // Implementations track file modifications and provide an event stream @@ -140,7 +140,7 @@ func (f *fsnotifyWatcher) Next(ctx context.Context) (event *fsnotify.Event, err return nil, ctx.Err() case evt, ok := <-f.Events: if !ok { - return nil, ErrWatcherClosed + return nil, ErrClosed } // Get the absolute path to match against our watched files. @@ -160,11 +160,11 @@ func (f *fsnotifyWatcher) Next(ctx context.Context) (event *fsnotify.Event, err case err, ok := <-f.Errors: if !ok { - return nil, ErrWatcherClosed + return nil, ErrClosed } return nil, xerrors.Errorf("watcher error: %w", err) case <-f.done: - return nil, ErrWatcherClosed + return nil, ErrClosed } } } @@ -178,7 +178,7 @@ func (f *fsnotifyWatcher) Close() (err error) { f.mu.Unlock() if closed { - return ErrWatcherClosed + return ErrClosed } close(f.done) From eaf59220ba9043eb6a27cca1353cc9c6ded2d060 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 13:05:10 +0000 Subject: [PATCH 18/23] inverse continue/return for clarity --- agent/agentcontainers/watcher/watcher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go index fa641e4753b65..22ec426f2828d 100644 --- a/agent/agentcontainers/watcher/watcher.go +++ b/agent/agentcontainers/watcher/watcher.go @@ -152,11 +152,11 @@ func (f *fsnotifyWatcher) Next(ctx context.Context) (event *fsnotify.Event, err f.mu.Lock() isWatched := f.watchedFiles[absPath] f.mu.Unlock() - if isWatched { - return &evt, nil + if !isWatched { + continue // Ignore events for files not being watched. } - continue // Ignore events for files not being watched. + return &evt, nil case err, ok := <-f.Errors: if !ok { From 617fed256cc99e961d854d796158f4163a31e4b2 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 13:07:52 +0000 Subject: [PATCH 19/23] better close handling --- agent/agentcontainers/watcher/watcher.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/agent/agentcontainers/watcher/watcher.go b/agent/agentcontainers/watcher/watcher.go index 22ec426f2828d..8e1acb9697cce 100644 --- a/agent/agentcontainers/watcher/watcher.go +++ b/agent/agentcontainers/watcher/watcher.go @@ -72,7 +72,7 @@ func (f *fsnotifyWatcher) Add(file string) error { defer f.mu.Unlock() // Already watching this file. - if f.watchedFiles[absPath] { + if f.closed || f.watchedFiles[absPath] { return nil } @@ -103,7 +103,7 @@ func (f *fsnotifyWatcher) Remove(file string) error { defer f.mu.Unlock() // Not watching this file. - if !f.watchedFiles[absPath] { + if f.closed || !f.watchedFiles[absPath] { return nil } @@ -150,6 +150,10 @@ func (f *fsnotifyWatcher) Next(ctx context.Context) (event *fsnotify.Event, err } f.mu.Lock() + if f.closed { + f.mu.Unlock() + return nil, ErrClosed + } isWatched := f.watchedFiles[absPath] f.mu.Unlock() if !isWatched { From 8ee991482a4ab6980e76f9ad437a82a7298c7d0c Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 13:11:29 +0000 Subject: [PATCH 20/23] restore removed comment --- agent/agentcontainers/api.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 1c3505a49dd12..330117dc65320 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -252,9 +252,11 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC api.mtime = now dirtyStates := make(map[string]bool) + // Reset all known devcontainers to not running. for i := range api.knownDevcontainers { api.knownDevcontainers[i].Running = false api.knownDevcontainers[i].Container = nil + // Preserve the dirty state and store in map for lookup. dirtyStates[api.knownDevcontainers[i].WorkspaceFolder] = api.knownDevcontainers[i].Dirty } From 263c683914373c711b1840ead0e48058d3026db1 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 13:13:30 +0000 Subject: [PATCH 21/23] add note about name --- agent/agentcontainers/api.go | 1 + 1 file changed, 1 insertion(+) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 330117dc65320..23a682f3a853e 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -301,6 +301,7 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC continue } + // NOTE(mafredri): This name impl. may change to accommodate devcontainer agents RFC. // If not in our known list, add as a runtime detected entry. name := path.Base(workspaceFolder) if _, ok := api.devcontainerNames[name]; ok { From 58a70e8d2f9d0e637f59359e85d763241bd591fb Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 13:39:07 +0000 Subject: [PATCH 22/23] fix test setup/clock advancement --- agent/agentcontainers/api_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/agent/agentcontainers/api_test.go b/agent/agentcontainers/api_test.go index 1deff56f84650..a246d929d9089 100644 --- a/agent/agentcontainers/api_test.go +++ b/agent/agentcontainers/api_test.go @@ -652,6 +652,9 @@ func TestAPI(t *testing.T) { assert.Contains(t, fWatcher.addedPaths, configPath, "watcher should be watching the container's config file") + // Make sure the start loop has been called. + fWatcher.waitNext(ctx) + // Send a file modification event and check if the container is // marked dirty. fWatcher.sendEventWaitNextCalled(ctx, fsnotify.Event{ @@ -659,6 +662,8 @@ func TestAPI(t *testing.T) { Op: fsnotify.Write, }) + mClock.Advance(time.Minute).MustWait(ctx) + // Check if the container is marked as dirty. req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil) rec = httptest.NewRecorder() @@ -671,7 +676,7 @@ func TestAPI(t *testing.T) { assert.True(t, response.Devcontainers[0].Dirty, "container should be marked as dirty after config file was modified") - mClock.Advance(10 * time.Minute).MustWait(ctx) + mClock.Advance(time.Minute).MustWait(ctx) container.ID = "new-container-id" // Simulate a new container ID after recreation. container.FriendlyName = "new-container-name" From 9f739ca261290616df08bd9dc1f3789cb5d404b3 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 28 Apr 2025 14:45:32 +0000 Subject: [PATCH 23/23] outdent --- agent/agentcontainers/api.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 23a682f3a853e..489bc1e55194c 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -483,19 +483,21 @@ func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) { api.configFileModifiedTimes[configPath] = modifiedAt for i := range api.knownDevcontainers { - if api.knownDevcontainers[i].ConfigPath == configPath { - // TODO(mafredri): Simplistic mark for now, we should check if the - // container is running and if the config file was modified after - // the container was created. - if !api.knownDevcontainers[i].Dirty { - api.logger.Info(api.ctx, "marking devcontainer as dirty", - slog.F("file", configPath), - slog.F("name", api.knownDevcontainers[i].Name), - slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder), - slog.F("modified_at", modifiedAt), - ) - api.knownDevcontainers[i].Dirty = true - } + if api.knownDevcontainers[i].ConfigPath != configPath { + continue + } + + // TODO(mafredri): Simplistic mark for now, we should check if the + // container is running and if the config file was modified after + // the container was created. + if !api.knownDevcontainers[i].Dirty { + api.logger.Info(api.ctx, "marking devcontainer as dirty", + slog.F("file", configPath), + slog.F("name", api.knownDevcontainers[i].Name), + slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder), + slog.F("modified_at", modifiedAt), + ) + api.knownDevcontainers[i].Dirty = true } } }