Skip to content

fix(agent/agentcontainers): update sub agent client on reconnect #18399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,14 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
}
a.metrics.startupScriptSeconds.WithLabelValues(label).Set(dur)
a.scriptRunner.StartCron()

// If the container API is enabled, trigger an immediate refresh
// for quick sub agent injection.
if cAPI := a.containerAPI.Load(); cAPI != nil {
if err := cAPI.RefreshContainers(ctx); err != nil {
a.logger.Error(ctx, "failed to refresh containers", slog.Error(err))
}
}
})
if err != nil {
return xerrors.Errorf("track conn goroutine: %w", err)
Expand Down Expand Up @@ -1253,6 +1261,12 @@ func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(co
network.SetDERPMap(manifest.DERPMap)
network.SetDERPForceWebSockets(manifest.DERPForceWebSockets)
network.SetBlockEndpoints(manifest.DisableDirectConnections)

// Update the subagent client if the container API is available.
if cAPI := a.containerAPI.Load(); cAPI != nil {
client := agentcontainers.NewSubAgentClientFromAPI(a.logger, aAPI)
cAPI.UpdateSubAgentClient(client)
}
}
return nil
}
Expand Down
39 changes: 27 additions & 12 deletions agent/agentcontainers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -59,7 +60,7 @@ type API struct {
dccli DevcontainerCLI
clock quartz.Clock
scriptLogger func(logSourceID uuid.UUID) ScriptLogger
subAgentClient SubAgentClient
subAgentClient atomic.Pointer[SubAgentClient]
subAgentURL string
subAgentEnv []string

Expand Down Expand Up @@ -133,7 +134,7 @@ func WithDevcontainerCLI(dccli DevcontainerCLI) Option {
// This is used to list, create, and delete devcontainer agents.
func WithSubAgentClient(client SubAgentClient) Option {
return func(api *API) {
api.subAgentClient = client
api.subAgentClient.Store(&client)
}
}

Expand Down Expand Up @@ -230,7 +231,6 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
logger: logger,
clock: quartz.NewReal(),
execer: agentexec.DefaultExecer,
subAgentClient: noopSubAgentClient{},
containerLabelIncludeFilter: make(map[string]string),
devcontainerNames: make(map[string]bool),
knownDevcontainers: make(map[string]codersdk.WorkspaceAgentDevcontainer),
Expand Down Expand Up @@ -259,6 +259,10 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
api.watcher = watcher.NewNoop()
}
}
if api.subAgentClient.Load() == nil {
var c SubAgentClient = noopSubAgentClient{}
api.subAgentClient.Store(&c)
}

go api.watcherLoop()
go api.updaterLoop()
Expand Down Expand Up @@ -375,6 +379,11 @@ func (api *API) updaterLoop() {
}
}

// UpdateSubAgentClient updates the `SubAgentClient` for the API.
func (api *API) UpdateSubAgentClient(client SubAgentClient) {
api.subAgentClient.Store(&client)
}

// Routes returns the HTTP handler for container-related routes.
func (api *API) Routes() http.Handler {
r := chi.NewRouter()
Expand Down Expand Up @@ -623,9 +632,9 @@ func safeFriendlyName(name string) string {
return name
}

// refreshContainers triggers an immediate update of the container list
// RefreshContainers triggers an immediate update of the container list
// and waits for it to complete.
func (api *API) refreshContainers(ctx context.Context) (err error) {
func (api *API) RefreshContainers(ctx context.Context) (err error) {
defer func() {
if err != nil {
err = xerrors.Errorf("refresh containers failed: %w", err)
Expand Down Expand Up @@ -860,7 +869,7 @@ func (api *API) recreateDevcontainer(dc codersdk.WorkspaceAgentDevcontainer, con

// Ensure an immediate refresh to accurately reflect the
// devcontainer state after recreation.
if err := api.refreshContainers(ctx); err != nil {
if err := api.RefreshContainers(ctx); err != nil {
logger.Error(ctx, "failed to trigger immediate refresh after devcontainer recreation", slog.Error(err))
}
}
Expand Down Expand Up @@ -904,7 +913,8 @@ func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) {
// slate. This method has an internal timeout to prevent blocking
// indefinitely if something goes wrong with the subagent deletion.
func (api *API) cleanupSubAgents(ctx context.Context) error {
agents, err := api.subAgentClient.List(ctx)
client := *api.subAgentClient.Load()
agents, err := client.List(ctx)
if err != nil {
return xerrors.Errorf("list agents: %w", err)
}
Expand All @@ -927,7 +937,8 @@ func (api *API) cleanupSubAgents(ctx context.Context) error {
if injected[agent.ID] {
continue
}
err := api.subAgentClient.Delete(ctx, agent.ID)
client := *api.subAgentClient.Load()
err := client.Delete(ctx, agent.ID)
if err != nil {
api.logger.Error(ctx, "failed to delete agent",
slog.Error(err),
Expand Down Expand Up @@ -1101,7 +1112,8 @@ func (api *API) maybeInjectSubAgentIntoContainerLocked(ctx context.Context, dc c

if proc.agent.ID != uuid.Nil && recreateSubAgent {
logger.Debug(ctx, "deleting existing subagent for recreation", slog.F("agent_id", proc.agent.ID))
err = api.subAgentClient.Delete(ctx, proc.agent.ID)
client := *api.subAgentClient.Load()
err = client.Delete(ctx, proc.agent.ID)
if err != nil {
return xerrors.Errorf("delete existing subagent failed: %w", err)
}
Expand Down Expand Up @@ -1144,7 +1156,8 @@ func (api *API) maybeInjectSubAgentIntoContainerLocked(ctx context.Context, dc c
)

// Create new subagent record in the database to receive the auth token.
proc.agent, err = api.subAgentClient.Create(ctx, SubAgent{
client := *api.subAgentClient.Load()
proc.agent, err = client.Create(ctx, SubAgent{
Name: dc.Name,
Directory: directory,
OperatingSystem: "linux", // Assuming Linux for devcontainers.
Expand All @@ -1163,7 +1176,8 @@ func (api *API) maybeInjectSubAgentIntoContainerLocked(ctx context.Context, dc c
if api.closed {
deleteCtx, deleteCancel := context.WithTimeout(context.Background(), defaultOperationTimeout)
defer deleteCancel()
err := api.subAgentClient.Delete(deleteCtx, proc.agent.ID)
client := *api.subAgentClient.Load()
err := client.Delete(deleteCtx, proc.agent.ID)
if err != nil {
return xerrors.Errorf("delete existing subagent failed after API closed: %w", err)
}
Expand Down Expand Up @@ -1249,8 +1263,9 @@ func (api *API) Close() error {
// Note: We can't use api.ctx here because it's canceled.
deleteCtx, deleteCancel := context.WithTimeout(context.Background(), defaultOperationTimeout)
defer deleteCancel()
client := *api.subAgentClient.Load()
for _, id := range subAgentIDs {
err := api.subAgentClient.Delete(deleteCtx, id)
err := client.Delete(deleteCtx, id)
if err != nil {
api.logger.Error(api.ctx, "delete subagent record during shutdown failed",
slog.Error(err),
Expand Down
Loading