From 1b9e884b0836e226b2d30dd1c3da73531e469085 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Tue, 17 Jun 2025 07:32:52 +0000 Subject: [PATCH] fix(agent/agentcontainers): update sub agent client on reconnect Fixes coder/internal#697 --- agent/agent.go | 14 +++++++++++++ agent/agentcontainers/api.go | 39 +++++++++++++++++++++++++----------- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 9f105ee296f5c..79f3feb21c50e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1188,6 +1188,14 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, } a.metrics.startupScriptSeconds.WithLabelValues(label).Set(dur) a.scriptRunner.StartCron() + + // If the container API is enabled, trigger an immediate refresh + // for quick sub agent injection. + if cAPI := a.containerAPI.Load(); cAPI != nil { + if err := cAPI.RefreshContainers(ctx); err != nil { + a.logger.Error(ctx, "failed to refresh containers", slog.Error(err)) + } + } }) if err != nil { return xerrors.Errorf("track conn goroutine: %w", err) @@ -1253,6 +1261,12 @@ func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(co network.SetDERPMap(manifest.DERPMap) network.SetDERPForceWebSockets(manifest.DERPForceWebSockets) network.SetBlockEndpoints(manifest.DisableDirectConnections) + + // Update the subagent client if the container API is available. + if cAPI := a.containerAPI.Load(); cAPI != nil { + client := agentcontainers.NewSubAgentClientFromAPI(a.logger, aAPI) + cAPI.UpdateSubAgentClient(client) + } } return nil } diff --git a/agent/agentcontainers/api.go b/agent/agentcontainers/api.go index 71b5267f40fec..cdc4992022a85 100644 --- a/agent/agentcontainers/api.go +++ b/agent/agentcontainers/api.go @@ -14,6 +14,7 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" "github.com/fsnotify/fsnotify" @@ -59,7 +60,7 @@ type API struct { dccli DevcontainerCLI clock quartz.Clock scriptLogger func(logSourceID uuid.UUID) ScriptLogger - subAgentClient SubAgentClient + subAgentClient atomic.Pointer[SubAgentClient] subAgentURL string subAgentEnv []string @@ -133,7 +134,7 @@ func WithDevcontainerCLI(dccli DevcontainerCLI) Option { // This is used to list, create, and delete devcontainer agents. func WithSubAgentClient(client SubAgentClient) Option { return func(api *API) { - api.subAgentClient = client + api.subAgentClient.Store(&client) } } @@ -230,7 +231,6 @@ func NewAPI(logger slog.Logger, options ...Option) *API { logger: logger, clock: quartz.NewReal(), execer: agentexec.DefaultExecer, - subAgentClient: noopSubAgentClient{}, containerLabelIncludeFilter: make(map[string]string), devcontainerNames: make(map[string]bool), knownDevcontainers: make(map[string]codersdk.WorkspaceAgentDevcontainer), @@ -259,6 +259,10 @@ func NewAPI(logger slog.Logger, options ...Option) *API { api.watcher = watcher.NewNoop() } } + if api.subAgentClient.Load() == nil { + var c SubAgentClient = noopSubAgentClient{} + api.subAgentClient.Store(&c) + } go api.watcherLoop() go api.updaterLoop() @@ -375,6 +379,11 @@ func (api *API) updaterLoop() { } } +// UpdateSubAgentClient updates the `SubAgentClient` for the API. +func (api *API) UpdateSubAgentClient(client SubAgentClient) { + api.subAgentClient.Store(&client) +} + // Routes returns the HTTP handler for container-related routes. func (api *API) Routes() http.Handler { r := chi.NewRouter() @@ -623,9 +632,9 @@ func safeFriendlyName(name string) string { return name } -// refreshContainers triggers an immediate update of the container list +// RefreshContainers triggers an immediate update of the container list // and waits for it to complete. -func (api *API) refreshContainers(ctx context.Context) (err error) { +func (api *API) RefreshContainers(ctx context.Context) (err error) { defer func() { if err != nil { err = xerrors.Errorf("refresh containers failed: %w", err) @@ -860,7 +869,7 @@ func (api *API) recreateDevcontainer(dc codersdk.WorkspaceAgentDevcontainer, con // Ensure an immediate refresh to accurately reflect the // devcontainer state after recreation. - if err := api.refreshContainers(ctx); err != nil { + if err := api.RefreshContainers(ctx); err != nil { logger.Error(ctx, "failed to trigger immediate refresh after devcontainer recreation", slog.Error(err)) } } @@ -904,7 +913,8 @@ func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) { // slate. This method has an internal timeout to prevent blocking // indefinitely if something goes wrong with the subagent deletion. func (api *API) cleanupSubAgents(ctx context.Context) error { - agents, err := api.subAgentClient.List(ctx) + client := *api.subAgentClient.Load() + agents, err := client.List(ctx) if err != nil { return xerrors.Errorf("list agents: %w", err) } @@ -927,7 +937,8 @@ func (api *API) cleanupSubAgents(ctx context.Context) error { if injected[agent.ID] { continue } - err := api.subAgentClient.Delete(ctx, agent.ID) + client := *api.subAgentClient.Load() + err := client.Delete(ctx, agent.ID) if err != nil { api.logger.Error(ctx, "failed to delete agent", slog.Error(err), @@ -1101,7 +1112,8 @@ func (api *API) maybeInjectSubAgentIntoContainerLocked(ctx context.Context, dc c if proc.agent.ID != uuid.Nil && recreateSubAgent { logger.Debug(ctx, "deleting existing subagent for recreation", slog.F("agent_id", proc.agent.ID)) - err = api.subAgentClient.Delete(ctx, proc.agent.ID) + client := *api.subAgentClient.Load() + err = client.Delete(ctx, proc.agent.ID) if err != nil { return xerrors.Errorf("delete existing subagent failed: %w", err) } @@ -1144,7 +1156,8 @@ func (api *API) maybeInjectSubAgentIntoContainerLocked(ctx context.Context, dc c ) // Create new subagent record in the database to receive the auth token. - proc.agent, err = api.subAgentClient.Create(ctx, SubAgent{ + client := *api.subAgentClient.Load() + proc.agent, err = client.Create(ctx, SubAgent{ Name: dc.Name, Directory: directory, OperatingSystem: "linux", // Assuming Linux for devcontainers. @@ -1163,7 +1176,8 @@ func (api *API) maybeInjectSubAgentIntoContainerLocked(ctx context.Context, dc c if api.closed { deleteCtx, deleteCancel := context.WithTimeout(context.Background(), defaultOperationTimeout) defer deleteCancel() - err := api.subAgentClient.Delete(deleteCtx, proc.agent.ID) + client := *api.subAgentClient.Load() + err := client.Delete(deleteCtx, proc.agent.ID) if err != nil { return xerrors.Errorf("delete existing subagent failed after API closed: %w", err) } @@ -1249,8 +1263,9 @@ func (api *API) Close() error { // Note: We can't use api.ctx here because it's canceled. deleteCtx, deleteCancel := context.WithTimeout(context.Background(), defaultOperationTimeout) defer deleteCancel() + client := *api.subAgentClient.Load() for _, id := range subAgentIDs { - err := api.subAgentClient.Delete(deleteCtx, id) + err := client.Delete(deleteCtx, id) if err != nil { api.logger.Error(api.ctx, "delete subagent record during shutdown failed", slog.Error(err),