Skip to content

Commit 0947224

Browse files
committed
WIP: agent reinitialization
1 parent 8b9e30d commit 0947224

33 files changed

+1129
-458
lines changed

agent/agent.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ import (
3636
"tailscale.com/util/clientmetric"
3737

3838
"cdr.dev/slog"
39+
40+
"github.com/coder/retry"
41+
3942
"github.com/coder/clistat"
4043
"github.com/coder/coder/v2/agent/agentcontainers"
4144
"github.com/coder/coder/v2/agent/agentexec"
@@ -53,7 +56,6 @@ import (
5356
"github.com/coder/coder/v2/tailnet"
5457
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
5558
"github.com/coder/quartz"
56-
"github.com/coder/retry"
5759
)
5860

5961
const (
@@ -365,9 +367,11 @@ func (a *agent) runLoop() {
365367
if ctx.Err() != nil {
366368
// Context canceled errors may come from websocket pings, so we
367369
// don't want to use `errors.Is(err, context.Canceled)` here.
370+
a.logger.Warn(ctx, "runLoop exited with error", slog.Error(ctx.Err()))
368371
return
369372
}
370373
if a.isClosed() {
374+
a.logger.Warn(ctx, "runLoop exited because agent is closed")
371375
return
372376
}
373377
if errors.Is(err, io.EOF) {
@@ -1048,7 +1052,12 @@ func (a *agent) run() (retErr error) {
10481052
return a.statsReporter.reportLoop(ctx, aAPI)
10491053
})
10501054

1051-
return connMan.wait()
1055+
err = connMan.wait()
1056+
// TODO: this broke some tests at some point. investigate.
1057+
if err != nil {
1058+
a.logger.Warn(context.Background(), "connection manager errored", slog.Error(err))
1059+
}
1060+
return err
10521061
}
10531062

10541063
// handleManifest returns a function that fetches and processes the manifest

agent/metrics.go

+10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type agentMetrics struct {
2020
// took to run. This is reported once per agent.
2121
startupScriptSeconds *prometheus.GaugeVec
2222
currentConnections *prometheus.GaugeVec
23+
manifestsReceived prometheus.Counter
2324
}
2425

2526
func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics {
@@ -54,11 +55,20 @@ func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics {
5455
}, []string{"connection_type"})
5556
registerer.MustRegister(currentConnections)
5657

58+
manifestsReceived := prometheus.NewCounter(prometheus.CounterOpts{
59+
Namespace: "coderd",
60+
Subsystem: "agentstats",
61+
Name: "manifests_received",
62+
Help: "The number of manifests this agent has received from the control plane.",
63+
})
64+
registerer.MustRegister(manifestsReceived)
65+
5766
return &agentMetrics{
5867
connectionsTotal: connectionsTotal,
5968
reconnectingPTYErrors: reconnectingPTYErrors,
6069
startupScriptSeconds: startupScriptSeconds,
6170
currentConnections: currentConnections,
71+
manifestsReceived: manifestsReceived,
6272
}
6373
}
6474

agent/reaper/reaper_unix.go

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package reaper
44

55
import (
6+
"fmt"
67
"os"
78
"os/signal"
89
"syscall"
@@ -29,6 +30,10 @@ func catchSignals(pid int, sigs []os.Signal) {
2930
s := <-sc
3031
sig, ok := s.(syscall.Signal)
3132
if ok {
33+
// TODO:
34+
// Tried using a logger here but the I/O streams are already closed at this point...
35+
// Why is os.Stderr still working then?
36+
_, _ = fmt.Fprintf(os.Stderr, "reaper caught %q signal, killing process %v\n", sig.String(), pid)
3237
_ = syscall.Kill(pid, sig)
3338
}
3439
}

cli/agent.go

+89-41
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ import (
1919
"golang.org/x/xerrors"
2020
"gopkg.in/natefinch/lumberjack.v2"
2121

22+
"github.com/coder/retry"
23+
2224
"github.com/prometheus/client_golang/prometheus"
2325

2426
"cdr.dev/slog"
2527
"cdr.dev/slog/sloggers/sloghuman"
2628
"cdr.dev/slog/sloggers/slogjson"
2729
"cdr.dev/slog/sloggers/slogstackdriver"
30+
"github.com/coder/serpent"
31+
2832
"github.com/coder/coder/v2/agent"
2933
"github.com/coder/coder/v2/agent/agentcontainers"
3034
"github.com/coder/coder/v2/agent/agentexec"
@@ -34,7 +38,6 @@ import (
3438
"github.com/coder/coder/v2/cli/clilog"
3539
"github.com/coder/coder/v2/codersdk"
3640
"github.com/coder/coder/v2/codersdk/agentsdk"
37-
"github.com/coder/serpent"
3841
)
3942

4043
func (r *RootCmd) workspaceAgent() *serpent.Command {
@@ -63,8 +66,10 @@ func (r *RootCmd) workspaceAgent() *serpent.Command {
6366
// This command isn't useful to manually execute.
6467
Hidden: true,
6568
Handler: func(inv *serpent.Invocation) error {
66-
ctx, cancel := context.WithCancel(inv.Context())
67-
defer cancel()
69+
ctx, cancel := context.WithCancelCause(inv.Context())
70+
defer func() {
71+
cancel(xerrors.New("defer"))
72+
}()
6873

6974
var (
7075
ignorePorts = map[int]string{}
@@ -281,7 +286,6 @@ func (r *RootCmd) workspaceAgent() *serpent.Command {
281286
return xerrors.Errorf("add executable to $PATH: %w", err)
282287
}
283288

284-
prometheusRegistry := prometheus.NewRegistry()
285289
subsystemsRaw := inv.Environ.Get(agent.EnvAgentSubsystem)
286290
subsystems := []codersdk.AgentSubsystem{}
287291
for _, s := range strings.Split(subsystemsRaw, ",") {
@@ -328,46 +332,90 @@ func (r *RootCmd) workspaceAgent() *serpent.Command {
328332
containerLister = agentcontainers.NewDocker(execer)
329333
}
330334

331-
agnt := agent.New(agent.Options{
332-
Client: client,
333-
Logger: logger,
334-
LogDir: logDir,
335-
ScriptDataDir: scriptDataDir,
336-
// #nosec G115 - Safe conversion as tailnet listen port is within uint16 range (0-65535)
337-
TailnetListenPort: uint16(tailnetListenPort),
338-
ExchangeToken: func(ctx context.Context) (string, error) {
339-
if exchangeToken == nil {
340-
return client.SDK.SessionToken(), nil
335+
// TODO: timeout ok?
336+
reinitCtx, reinitCancel := context.WithTimeout(context.Background(), time.Hour*24)
337+
defer reinitCancel()
338+
reinitEvents := make(chan agentsdk.ReinitializationResponse)
339+
340+
go func() {
341+
// Retry to wait for reinit, main context cancels the retrier.
342+
for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
343+
select {
344+
case <-reinitCtx.Done():
345+
return
346+
default:
341347
}
342-
resp, err := exchangeToken(ctx)
348+
349+
err := client.WaitForReinit(reinitCtx, reinitEvents)
343350
if err != nil {
344-
return "", err
351+
logger.Error(ctx, "failed to wait for reinit instructions, will retry", slog.Error(err))
345352
}
346-
client.SetSessionToken(resp.SessionToken)
347-
return resp.SessionToken, nil
348-
},
349-
EnvironmentVariables: environmentVariables,
350-
IgnorePorts: ignorePorts,
351-
SSHMaxTimeout: sshMaxTimeout,
352-
Subsystems: subsystems,
353-
354-
PrometheusRegistry: prometheusRegistry,
355-
BlockFileTransfer: blockFileTransfer,
356-
Execer: execer,
357-
ContainerLister: containerLister,
358-
359-
ExperimentalDevcontainersEnabled: experimentalDevcontainersEnabled,
360-
})
361-
362-
promHandler := agent.PrometheusMetricsHandler(prometheusRegistry, logger)
363-
prometheusSrvClose := ServeHandler(ctx, logger, promHandler, prometheusAddress, "prometheus")
364-
defer prometheusSrvClose()
365-
366-
debugSrvClose := ServeHandler(ctx, logger, agnt.HTTPDebug(), debugAddress, "debug")
367-
defer debugSrvClose()
368-
369-
<-ctx.Done()
370-
return agnt.Close()
353+
}
354+
}()
355+
356+
var (
357+
lastErr error
358+
mustExit bool
359+
)
360+
for {
361+
prometheusRegistry := prometheus.NewRegistry()
362+
363+
agnt := agent.New(agent.Options{
364+
Client: client,
365+
Logger: logger,
366+
LogDir: logDir,
367+
ScriptDataDir: scriptDataDir,
368+
// #nosec G115 - Safe conversion as tailnet listen port is within uint16 range (0-65535)
369+
TailnetListenPort: uint16(tailnetListenPort),
370+
ExchangeToken: func(ctx context.Context) (string, error) {
371+
if exchangeToken == nil {
372+
return client.SDK.SessionToken(), nil
373+
}
374+
resp, err := exchangeToken(ctx)
375+
if err != nil {
376+
return "", err
377+
}
378+
client.SetSessionToken(resp.SessionToken)
379+
return resp.SessionToken, nil
380+
},
381+
EnvironmentVariables: environmentVariables,
382+
IgnorePorts: ignorePorts,
383+
SSHMaxTimeout: sshMaxTimeout,
384+
Subsystems: subsystems,
385+
386+
PrometheusRegistry: prometheusRegistry,
387+
BlockFileTransfer: blockFileTransfer,
388+
Execer: execer,
389+
ContainerLister: containerLister,
390+
ExperimentalDevcontainersEnabled: experimentalDevcontainersEnabled,
391+
})
392+
393+
promHandler := agent.PrometheusMetricsHandler(prometheusRegistry, logger)
394+
prometheusSrvClose := ServeHandler(ctx, logger, promHandler, prometheusAddress, "prometheus")
395+
396+
debugSrvClose := ServeHandler(ctx, logger, agnt.HTTPDebug(), debugAddress, "debug")
397+
398+
select {
399+
case <-ctx.Done():
400+
logger.Warn(ctx, "agent shutting down", slog.Error(ctx.Err()), slog.F("cause", context.Cause(ctx)))
401+
mustExit = true
402+
case event := <-reinitEvents:
403+
logger.Warn(ctx, "agent received instruction to reinitialize",
404+
slog.F("message", event.Message), slog.F("reason", event.Reason))
405+
}
406+
407+
lastErr = agnt.Close()
408+
debugSrvClose()
409+
prometheusSrvClose()
410+
411+
if mustExit {
412+
reinitCancel()
413+
break
414+
}
415+
416+
logger.Info(ctx, "reinitializing...")
417+
}
418+
return lastErr
371419
},
372420
}
373421

cli/testdata/coder_server_--help.golden

+6
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,12 @@ workspaces stopping during the day due to template scheduling.
670670
must be *. Only one hour and minute can be specified (ranges or comma
671671
separated values are not supported).
672672

673+
WORKSPACE PREBUILDS OPTIONS:
674+
Configure how workspace prebuilds behave.
675+
676+
--workspace-prebuilds-reconciliation-interval duration, $CODER_WORKSPACE_PREBUILDS_RECONCILIATION_INTERVAL (default: 15s)
677+
How often to reconcile workspace prebuilds state.
678+
673679
⚠️ DANGEROUS OPTIONS:
674680
--dangerous-allow-path-app-sharing bool, $CODER_DANGEROUS_ALLOW_PATH_APP_SHARING
675681
Allow workspace apps that are not served from subdomains to be shared.

cli/testdata/server-config.yaml.golden

+12
Original file line numberDiff line numberDiff line change
@@ -688,3 +688,15 @@ notifications:
688688
# How often to query the database for queued notifications.
689689
# (default: 15s, type: duration)
690690
fetchInterval: 15s
691+
# Configure how workspace prebuilds behave.
692+
workspace_prebuilds:
693+
# How often to reconcile workspace prebuilds state.
694+
# (default: 15s, type: duration)
695+
reconciliation_interval: 15s
696+
# Interval to increase reconciliation backoff by when unrecoverable errors occur.
697+
# (default: 15s, type: duration)
698+
reconciliation_backoff_interval: 15s
699+
# Interval to look back to determine number of failed builds, which influences
700+
# backoff.
701+
# (default: 1h0m0s, type: duration)
702+
reconciliation_backoff_lookback_period: 1h0m0s

coderd/agentapi/api.go

+2
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ func New(opts Options) *API {
109109
Database: opts.Database,
110110
DerpMapFn: opts.DerpMapFn,
111111
WorkspaceID: opts.WorkspaceID,
112+
Log: opts.Log.Named("manifests"),
113+
Pubsub: opts.Pubsub,
112114
}
113115

114116
api.AnnouncementBannerAPI = &AnnouncementBannerAPI{

coderd/agentapi/manifest.go

+6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"strings"
99
"time"
1010

11+
"cdr.dev/slog"
12+
13+
"github.com/coder/coder/v2/coderd/database/pubsub"
14+
1115
"github.com/google/uuid"
1216
"golang.org/x/sync/errgroup"
1317
"golang.org/x/xerrors"
@@ -35,6 +39,8 @@ type ManifestAPI struct {
3539
AgentFn func(context.Context) (database.WorkspaceAgent, error)
3640
Database database.Store
3741
DerpMapFn func() *tailcfg.DERPMap
42+
Pubsub pubsub.Pubsub
43+
Log slog.Logger
3844
}
3945

4046
func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {

0 commit comments

Comments
 (0)