Skip to content

Commit c30f1c6

Browse files
committed
WIP — agent report metadata loop
1 parent 826cca3 commit c30f1c6

File tree

3 files changed

+123
-31
lines changed

3 files changed

+123
-31
lines changed

agent/agent.go

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/binary"
99
"encoding/json"
1010
"errors"
11+
"flag"
1112
"fmt"
1213
"io"
1314
"net"
@@ -87,6 +88,7 @@ type Client interface {
8788
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
8889
PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error
8990
PostStartup(ctx context.Context, req agentsdk.PostStartupRequest) error
91+
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
9092
}
9193

9294
func New(options Options) io.Closer {
@@ -152,8 +154,8 @@ type agent struct {
152154
closed chan struct{}
153155

154156
envVars map[string]string
155-
// metadata is atomic because values can change after reconnection.
156-
metadata atomic.Value
157+
// manifest is atomic because values can change after reconnection.
158+
manifest atomic.Pointer[agentsdk.Manifest]
157159
sessionToken atomic.Pointer[string]
158160
sshServer *ssh.Server
159161

@@ -178,6 +180,7 @@ type agent struct {
178180
// failure, you'll want the agent to reconnect.
179181
func (a *agent) runLoop(ctx context.Context) {
180182
go a.reportLifecycleLoop(ctx)
183+
go a.reportMetadataLoop(ctx)
181184

182185
for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
183186
a.logger.Info(ctx, "connecting to coderd")
@@ -200,6 +203,32 @@ func (a *agent) runLoop(ctx context.Context) {
200203
}
201204
}
202205

206+
func (a *agent) reportMetadata(ctx context.Context) error {
207+
ma := a.manifest.Load()
208+
tickers := make([]time.Ticker, 0, len(ma.Metadata))
209+
}
210+
211+
func (a *agent) reportMetadataLoop(ctx context.Context) {
212+
// In production, the minimum report interval is one second.
213+
ticker := time.Second
214+
if flag.Lookup("test.v") != nil {
215+
ticker = time.Millisecond * 100
216+
}
217+
baseTicker := time.NewTicker(ticker)
218+
219+
for {
220+
select {
221+
case <-ctx.Done():
222+
return
223+
case <-baseTicker.C:
224+
err := a.reportMetadata(ctx)
225+
if err != nil {
226+
a.logger.Error(ctx, "report metadata", slog.Error(err))
227+
}
228+
}
229+
}
230+
}
231+
203232
// reportLifecycleLoop reports the current lifecycle state once.
204233
// Only the latest state is reported, intermediate states may be
205234
// lost if the agent can't communicate with the API.
@@ -274,30 +303,30 @@ func (a *agent) run(ctx context.Context) error {
274303
}
275304
a.sessionToken.Store(&sessionToken)
276305

277-
metadata, err := a.client.Manifest(ctx)
306+
manifest, err := a.client.Manifest(ctx)
278307
if err != nil {
279308
return xerrors.Errorf("fetch metadata: %w", err)
280309
}
281-
a.logger.Info(ctx, "fetched metadata", slog.F("metadata", metadata))
310+
a.logger.Info(ctx, "fetched metadata", slog.F("metadata", manifest))
282311

283312
// Expand the directory and send it back to coderd so external
284313
// applications that rely on the directory can use it.
285314
//
286315
// An example is VS Code Remote, which must know the directory
287316
// before initializing a connection.
288-
metadata.Directory, err = expandDirectory(metadata.Directory)
317+
manifest.Directory, err = expandDirectory(manifest.Directory)
289318
if err != nil {
290319
return xerrors.Errorf("expand directory: %w", err)
291320
}
292321
err = a.client.PostStartup(ctx, agentsdk.PostStartupRequest{
293322
Version: buildinfo.Version(),
294-
ExpandedDirectory: metadata.Directory,
323+
ExpandedDirectory: manifest.Directory,
295324
})
296325
if err != nil {
297326
return xerrors.Errorf("update workspace agent version: %w", err)
298327
}
299328

300-
oldMetadata := a.metadata.Swap(metadata)
329+
oldMetadata := a.manifest.Swap(&manifest)
301330

302331
// The startup script should only execute on the first run!
303332
if oldMetadata == nil {
@@ -307,7 +336,7 @@ func (a *agent) run(ctx context.Context) error {
307336
// connect to a workspace that is not yet ready. We don't run this
308337
// concurrently with the startup script to avoid conflicts between
309338
// them.
310-
if metadata.GitAuthConfigs > 0 {
339+
if manifest.GitAuthConfigs > 0 {
311340
// If this fails, we should consider surfacing the error in the
312341
// startup log and setting the lifecycle state to be "start_error"
313342
// (after startup script completion), but for now we'll just log it.
@@ -322,7 +351,7 @@ func (a *agent) run(ctx context.Context) error {
322351
scriptStart := time.Now()
323352
err = a.trackConnGoroutine(func() {
324353
defer close(scriptDone)
325-
scriptDone <- a.runStartupScript(ctx, metadata.StartupScript)
354+
scriptDone <- a.runStartupScript(ctx, manifest.StartupScript)
326355
})
327356
if err != nil {
328357
return xerrors.Errorf("track startup script: %w", err)
@@ -331,8 +360,8 @@ func (a *agent) run(ctx context.Context) error {
331360
var timeout <-chan time.Time
332361
// If timeout is zero, an older version of the coder
333362
// provider was used. Otherwise a timeout is always > 0.
334-
if metadata.StartupScriptTimeout > 0 {
335-
t := time.NewTimer(metadata.StartupScriptTimeout)
363+
if manifest.StartupScriptTimeout > 0 {
364+
t := time.NewTimer(manifest.StartupScriptTimeout)
336365
defer t.Stop()
337366
timeout = t.C
338367
}
@@ -349,7 +378,7 @@ func (a *agent) run(ctx context.Context) error {
349378
return
350379
}
351380
// Only log if there was a startup script.
352-
if metadata.StartupScript != "" {
381+
if manifest.StartupScript != "" {
353382
execTime := time.Since(scriptStart)
354383
if err != nil {
355384
a.logger.Warn(ctx, "startup script failed", slog.F("execution_time", execTime), slog.Error(err))
@@ -366,13 +395,13 @@ func (a *agent) run(ctx context.Context) error {
366395
appReporterCtx, appReporterCtxCancel := context.WithCancel(ctx)
367396
defer appReporterCtxCancel()
368397
go NewWorkspaceAppHealthReporter(
369-
a.logger, metadata.Apps, a.client.PostAppHealth)(appReporterCtx)
398+
a.logger, manifest.Apps, a.client.PostAppHealth)(appReporterCtx)
370399

371400
a.closeMutex.Lock()
372401
network := a.network
373402
a.closeMutex.Unlock()
374403
if network == nil {
375-
network, err = a.createTailnet(ctx, metadata.DERPMap)
404+
network, err = a.createTailnet(ctx, manifest.DERPMap)
376405
if err != nil {
377406
return xerrors.Errorf("create tailnet: %w", err)
378407
}
@@ -391,7 +420,7 @@ func (a *agent) run(ctx context.Context) error {
391420
a.startReportingConnectionStats(ctx)
392421
} else {
393422
// Update the DERP map!
394-
network.SetDERPMap(metadata.DERPMap)
423+
network.SetDERPMap(manifest.DERPMap)
395424
}
396425

397426
a.logger.Debug(ctx, "running tailnet connection coordinator")
@@ -800,14 +829,10 @@ func (a *agent) createCommand(ctx context.Context, rawCommand string, env []stri
800829
return nil, xerrors.Errorf("get user shell: %w", err)
801830
}
802831

803-
rawMetadata := a.metadata.Load()
804-
if rawMetadata == nil {
832+
manifest := a.manifest.Load()
833+
if manifest == nil {
805834
return nil, xerrors.Errorf("no metadata was provided")
806835
}
807-
metadata, valid := rawMetadata.(agentsdk.Manifest)
808-
if !valid {
809-
return nil, xerrors.Errorf("metadata is the wrong type: %T", metadata)
810-
}
811836

812837
// OpenSSH executes all commands with the users current shell.
813838
// We replicate that behavior for IDE support.
@@ -829,7 +854,7 @@ func (a *agent) createCommand(ctx context.Context, rawCommand string, env []stri
829854
}
830855

831856
cmd := exec.CommandContext(ctx, shell, args...)
832-
cmd.Dir = metadata.Directory
857+
cmd.Dir = manifest.Directory
833858

834859
// If the metadata directory doesn't exist, we run the command
835860
// in the users home directory.
@@ -870,14 +895,14 @@ func (a *agent) createCommand(ctx context.Context, rawCommand string, env []stri
870895

871896
// This adds the ports dialog to code-server that enables
872897
// proxying a port dynamically.
873-
cmd.Env = append(cmd.Env, fmt.Sprintf("VSCODE_PROXY_URI=%s", metadata.VSCodePortProxyURI))
898+
cmd.Env = append(cmd.Env, fmt.Sprintf("VSCODE_PROXY_URI=%s", manifest.VSCodePortProxyURI))
874899

875900
// Hide Coder message on code-server's "Getting Started" page
876901
cmd.Env = append(cmd.Env, "CS_DISABLE_GETTING_STARTED_OVERRIDE=true")
877902

878903
// Load environment variables passed via the agent.
879904
// These should override all variables we manually specify.
880-
for envKey, value := range metadata.EnvironmentVariables {
905+
for envKey, value := range manifest.EnvironmentVariables {
881906
// Expanding environment variables allows for customization
882907
// of the $PATH, among other variables. Customers can prepend
883908
// or append to the $PATH, so allowing expand is required!
@@ -940,9 +965,9 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
940965
session.DisablePTYEmulation()
941966

942967
if !isQuietLogin(session.RawCommand()) {
943-
metadata, ok := a.metadata.Load().(agentsdk.Manifest)
944-
if ok {
945-
err = showMOTD(session, metadata.MOTDFile)
968+
manifest := a.manifest.Load()
969+
if manifest != nil {
970+
err = showMOTD(session, manifest.MOTDFile)
946971
if err != nil {
947972
a.logger.Error(ctx, "show MOTD", slog.Error(err))
948973
}
@@ -1330,19 +1355,19 @@ func (a *agent) Close() error {
13301355
a.setLifecycle(ctx, codersdk.WorkspaceAgentLifecycleShuttingDown)
13311356

13321357
lifecycleState := codersdk.WorkspaceAgentLifecycleOff
1333-
if metadata, ok := a.metadata.Load().(agentsdk.Manifest); ok && metadata.ShutdownScript != "" {
1358+
if manifest := a.manifest.Load(); manifest != nil && manifest.ShutdownScript != "" {
13341359
scriptDone := make(chan error, 1)
13351360
scriptStart := time.Now()
13361361
go func() {
13371362
defer close(scriptDone)
1338-
scriptDone <- a.runShutdownScript(ctx, metadata.ShutdownScript)
1363+
scriptDone <- a.runShutdownScript(ctx, manifest.ShutdownScript)
13391364
}()
13401365

13411366
var timeout <-chan time.Time
13421367
// If timeout is zero, an older version of the coder
13431368
// provider was used. Otherwise a timeout is always > 0.
1344-
if metadata.ShutdownScriptTimeout > 0 {
1345-
t := time.NewTimer(metadata.ShutdownScriptTimeout)
1369+
if manifest.ShutdownScriptTimeout > 0 {
1370+
t := time.NewTimer(manifest.ShutdownScriptTimeout)
13461371
defer t.Stop()
13471372
timeout = t.C
13481373
}

agent/agent_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/stretchr/testify/require"
3232
"go.uber.org/goleak"
3333
"golang.org/x/crypto/ssh"
34+
"golang.org/x/exp/maps"
3435
"golang.org/x/text/encoding/unicode"
3536
"golang.org/x/text/transform"
3637
"golang.org/x/xerrors"
@@ -772,6 +773,36 @@ func TestAgent_StartupScript(t *testing.T) {
772773
require.Equal(t, content, strings.TrimSpace(gotContent))
773774
}
774775

776+
func TestAgent_Metadata(t *testing.T) {
777+
t.Parallel()
778+
779+
//nolint:dogsled
780+
_, client, _, _, _ := setupAgent(t, agentsdk.Manifest{
781+
Metadata: []agentsdk.Metadata{
782+
{
783+
Key: "greeting",
784+
Interval: time.Millisecond * 100,
785+
Cmd: []string{"echo", "hello"},
786+
},
787+
{
788+
Key: "bad",
789+
Interval: time.Millisecond * 100,
790+
Cmd: []string{"sh", "-c", "exit 1"},
791+
},
792+
},
793+
}, 0)
794+
795+
var gotMd agentsdk.PostMetadataRequest
796+
require.Eventually(t, func() bool {
797+
gotMd = client.getMetadata()
798+
return len(gotMd) == 2
799+
}, testutil.WaitShort, testutil.IntervalMedium)
800+
801+
require.Equal(t, "hello", gotMd["greeting"].Value)
802+
require.Empty(t, gotMd["bad"].Value)
803+
require.Equal(t, "exit status 1", gotMd["bad"].Error)
804+
}
805+
775806
func TestAgent_Lifecycle(t *testing.T) {
776807
t.Parallel()
777808

@@ -1492,6 +1523,7 @@ type client struct {
14921523
t *testing.T
14931524
agentID uuid.UUID
14941525
manifest agentsdk.Manifest
1526+
metadata agentsdk.PostMetadataRequest
14951527
statsChan chan *agentsdk.Stats
14961528
coordinator tailnet.Coordinator
14971529
lastWorkspaceAgent func()
@@ -1576,6 +1608,19 @@ func (c *client) getStartup() agentsdk.PostStartupRequest {
15761608
return c.startup
15771609
}
15781610

1611+
func (c *client) getMetadata() agentsdk.PostMetadataRequest {
1612+
c.mu.Lock()
1613+
defer c.mu.Unlock()
1614+
return maps.Clone(c.metadata)
1615+
}
1616+
1617+
func (c *client) PostMetadata(_ context.Context, req agentsdk.PostMetadataRequest) error {
1618+
c.mu.Lock()
1619+
defer c.mu.Unlock()
1620+
c.metadata = req
1621+
return nil
1622+
}
1623+
15791624
func (c *client) PostStartup(_ context.Context, startup agentsdk.PostStartupRequest) error {
15801625
c.mu.Lock()
15811626
defer c.mu.Unlock()

codersdk/agentsdk/agentsdk.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,28 @@ type Metadata struct {
7474
Interval time.Duration
7575
}
7676

77+
type MetadataResult struct {
78+
Key string
79+
Value string
80+
Error string
81+
}
82+
83+
type PostMetadataRequest map[string]MetadataResult
84+
85+
func (c *Client) PostMetadata(ctx context.Context, req PostMetadataRequest) error {
86+
res, err := c.SDK.Request(ctx, http.MethodPost, "/api/v2/workspaceagents/me/metadata", req)
87+
if err != nil {
88+
return xerrors.Errorf("execute request: %w", err)
89+
}
90+
defer res.Body.Close()
91+
92+
if res.StatusCode != http.StatusOK {
93+
return codersdk.ReadBodyAsError(res)
94+
}
95+
96+
return nil
97+
}
98+
7799
type Manifest struct {
78100
// GitAuthConfigs stores the number of Git configurations
79101
// the Coder deployment has. If this number is >0, we

0 commit comments

Comments
 (0)