Skip to content

Commit 7d7df61

Browse files
committed
feat: use agent v2 API to send agent logs
1 parent 2014110 commit 7d7df61

File tree

8 files changed

+118
-375
lines changed

8 files changed

+118
-375
lines changed

agent/agent.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ type Client interface {
9191
ConnectRPC(ctx context.Context) (drpc.Conn, error)
9292
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
9393
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
94-
PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error
9594
RewriteDERPMap(derpMap *tailcfg.DERPMap)
9695
}
9796

@@ -165,6 +164,7 @@ func New(options Options) Agent {
165164
syscaller: options.Syscaller,
166165
modifiedProcs: options.ModifiedProcesses,
167166
processManagementTick: options.ProcessManagementTick,
167+
logSender: newLogSender(options.Logger),
168168

169169
prometheusRegistry: prometheusRegistry,
170170
metrics: newAgentMetrics(prometheusRegistry),
@@ -215,6 +215,7 @@ type agent struct {
215215
network *tailnet.Conn
216216
addresses []netip.Prefix
217217
statsReporter *statsReporter
218+
logSender *logSender
218219

219220
connCountReconnectingPTY atomic.Int64
220221

@@ -245,11 +246,11 @@ func (a *agent) init(ctx context.Context) {
245246
sshSrv.ServiceBanner = &a.serviceBanner
246247
a.sshServer = sshSrv
247248
a.scriptRunner = agentscripts.New(agentscripts.Options{
248-
LogDir: a.logDir,
249-
Logger: a.logger,
250-
SSHServer: sshSrv,
251-
Filesystem: a.filesystem,
252-
PatchLogs: a.client.PatchLogs,
249+
LogDir: a.logDir,
250+
Logger: a.logger,
251+
SSHServer: sshSrv,
252+
Filesystem: a.filesystem,
253+
GetScriptLogger: a.logSender.getScriptLogger,
253254
})
254255
// Register runner metrics. If the prom registry is nil, the metrics
255256
// will not report anywhere.
@@ -876,6 +877,15 @@ func (a *agent) run(ctx context.Context) error {
876877
return nil
877878
})
878879

880+
eg.Go(func() error {
881+
a.logger.Debug(egCtx, "running send logs loop")
882+
err := a.logSender.sendLoop(egCtx, aAPI)
883+
if err != nil {
884+
return xerrors.Errorf("send logs loop: %w", err)
885+
}
886+
return nil
887+
})
888+
879889
return eg.Wait()
880890
}
881891

agent/agentscripts/agentscripts.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/google/uuid"
17+
1618
"github.com/prometheus/client_golang/prometheus"
1719
"github.com/robfig/cron/v3"
1820
"github.com/spf13/afero"
@@ -41,13 +43,18 @@ var (
4143
parser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.DowOptional)
4244
)
4345

46+
type ScriptLogger interface {
47+
Send(ctx context.Context, log ...agentsdk.Log) error
48+
Flush(context.Context) error
49+
}
50+
4451
// Options are a set of options for the runner.
4552
type Options struct {
46-
LogDir string
47-
Logger slog.Logger
48-
SSHServer *agentssh.Server
49-
Filesystem afero.Fs
50-
PatchLogs func(ctx context.Context, req agentsdk.PatchLogs) error
53+
LogDir string
54+
Logger slog.Logger
55+
SSHServer *agentssh.Server
56+
Filesystem afero.Fs
57+
GetScriptLogger func(logSourceID uuid.UUID) ScriptLogger
5158
}
5259

5360
// New creates a runner for the provided scripts.
@@ -238,20 +245,20 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript)
238245
cmd.WaitDelay = 10 * time.Second
239246
cmd.Cancel = cmdCancel(cmd)
240247

241-
send, flushAndClose := agentsdk.LogsSender(script.LogSourceID, r.PatchLogs, logger)
248+
scriptLogger := r.GetScriptLogger(script.LogSourceID)
242249
// If ctx is canceled here (or in a writer below), we may be
243250
// discarding logs, but that's okay because we're shutting down
244251
// anyway. We could consider creating a new context here if we
245252
// want better control over flush during shutdown.
246253
defer func() {
247-
if err := flushAndClose(ctx); err != nil {
254+
if err := scriptLogger.Flush(ctx); err != nil {
248255
logger.Warn(ctx, "flush startup logs failed", slog.Error(err))
249256
}
250257
}()
251258

252-
infoW := agentsdk.LogsWriter(ctx, send, script.LogSourceID, codersdk.LogLevelInfo)
259+
infoW := agentsdk.LogsWriter(ctx, scriptLogger.Send, script.LogSourceID, codersdk.LogLevelInfo)
253260
defer infoW.Close()
254-
errW := agentsdk.LogsWriter(ctx, send, script.LogSourceID, codersdk.LogLevelError)
261+
errW := agentsdk.LogsWriter(ctx, scriptLogger.Send, script.LogSourceID, codersdk.LogLevelError)
255262
defer errW.Close()
256263
cmd.Stdout = io.MultiWriter(fileWriter, infoW)
257264
cmd.Stderr = io.MultiWriter(fileWriter, errW)

agent/agentscripts/agentscripts_test.go

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/coder/coder/v2/testutil"
9+
10+
"github.com/google/uuid"
11+
812
"github.com/prometheus/client_golang/prometheus"
913
"github.com/spf13/afero"
1014
"github.com/stretchr/testify/require"
@@ -24,10 +28,10 @@ func TestMain(m *testing.M) {
2428

2529
func TestExecuteBasic(t *testing.T) {
2630
t.Parallel()
27-
logs := make(chan agentsdk.PatchLogs, 1)
28-
runner := setup(t, func(ctx context.Context, req agentsdk.PatchLogs) error {
29-
logs <- req
30-
return nil
31+
ctx := testutil.Context(t, testutil.WaitShort)
32+
fLogger := newFakeScriptLogger()
33+
runner := setup(t, func(uuid2 uuid.UUID) agentscripts.ScriptLogger {
34+
return fLogger
3135
})
3236
defer runner.Close()
3337
err := runner.Init([]codersdk.WorkspaceAgentScript{{
@@ -37,8 +41,8 @@ func TestExecuteBasic(t *testing.T) {
3741
require.NoError(t, runner.Execute(context.Background(), func(script codersdk.WorkspaceAgentScript) bool {
3842
return true
3943
}))
40-
log := <-logs
41-
require.Equal(t, "hello", log.Logs[0].Output)
44+
log := testutil.RequireRecvCtx(ctx, t, fLogger.logs)
45+
require.Equal(t, "hello", log.Output)
4246
}
4347

4448
func TestTimeout(t *testing.T) {
@@ -62,12 +66,12 @@ func TestCronClose(t *testing.T) {
6266
require.NoError(t, runner.Close(), "close runner")
6367
}
6468

65-
func setup(t *testing.T, patchLogs func(ctx context.Context, req agentsdk.PatchLogs) error) *agentscripts.Runner {
69+
func setup(t *testing.T, getScriptLogger func(logSourceID uuid.UUID) agentscripts.ScriptLogger) *agentscripts.Runner {
6670
t.Helper()
67-
if patchLogs == nil {
71+
if getScriptLogger == nil {
6872
// noop
69-
patchLogs = func(ctx context.Context, req agentsdk.PatchLogs) error {
70-
return nil
73+
getScriptLogger = func(uuid uuid.UUID) agentscripts.ScriptLogger {
74+
return noopScriptLogger{}
7175
}
7276
}
7377
fs := afero.NewMemMapFs()
@@ -80,10 +84,44 @@ func setup(t *testing.T, patchLogs func(ctx context.Context, req agentsdk.PatchL
8084
_ = s.Close()
8185
})
8286
return agentscripts.New(agentscripts.Options{
83-
LogDir: t.TempDir(),
84-
Logger: logger,
85-
SSHServer: s,
86-
Filesystem: fs,
87-
PatchLogs: patchLogs,
87+
LogDir: t.TempDir(),
88+
Logger: logger,
89+
SSHServer: s,
90+
Filesystem: fs,
91+
GetScriptLogger: getScriptLogger,
8892
})
8993
}
94+
95+
type noopScriptLogger struct{}
96+
97+
func (noopScriptLogger) Send(context.Context, ...agentsdk.Log) error {
98+
return nil
99+
}
100+
101+
func (noopScriptLogger) Flush(context.Context) error {
102+
return nil
103+
}
104+
105+
type fakeScriptLogger struct {
106+
logs chan agentsdk.Log
107+
}
108+
109+
func (f *fakeScriptLogger) Send(ctx context.Context, logs ...agentsdk.Log) error {
110+
for _, log := range logs {
111+
select {
112+
case <-ctx.Done():
113+
return ctx.Err()
114+
case f.logs <- log:
115+
// OK!
116+
}
117+
}
118+
return nil
119+
}
120+
121+
func (*fakeScriptLogger) Flush(context.Context) error {
122+
return nil
123+
}
124+
125+
func newFakeScriptLogger() *fakeScriptLogger {
126+
return &fakeScriptLogger{make(chan agentsdk.Log, 100)}
127+
}

agent/agenttest/client.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ type Client struct {
8585
server *drpcserver.Server
8686
fakeAgentAPI *FakeAgentAPI
8787
LastWorkspaceAgent func()
88-
PatchWorkspaceLogs func() error
8988

9089
mu sync.Mutex // Protects following.
9190
lifecycleStates []codersdk.WorkspaceAgentLifecycle
@@ -165,17 +164,6 @@ func (c *Client) GetStartupLogs() []agentsdk.Log {
165164
return c.logs
166165
}
167166

168-
func (c *Client) PatchLogs(ctx context.Context, logs agentsdk.PatchLogs) error {
169-
c.mu.Lock()
170-
defer c.mu.Unlock()
171-
if c.PatchWorkspaceLogs != nil {
172-
return c.PatchWorkspaceLogs()
173-
}
174-
c.logs = append(c.logs, logs.Logs...)
175-
c.logger.Debug(ctx, "patch startup logs", slog.F("req", logs))
176-
return nil
177-
}
178-
179167
func (c *Client) SetServiceBannerFunc(f func() (codersdk.ServiceBannerConfig, error)) {
180168
c.fakeAgentAPI.SetServiceBannerFunc(f)
181169
}
@@ -257,9 +245,9 @@ func (*FakeAgentAPI) BatchUpdateMetadata(context.Context, *agentproto.BatchUpdat
257245
panic("implement me")
258246
}
259247

260-
func (*FakeAgentAPI) BatchCreateLogs(context.Context, *agentproto.BatchCreateLogsRequest) (*agentproto.BatchCreateLogsResponse, error) {
261-
// TODO implement me
262-
panic("implement me")
248+
func (f *FakeAgentAPI) BatchCreateLogs(ctx context.Context, req *agentproto.BatchCreateLogsRequest) (*agentproto.BatchCreateLogsResponse, error) {
249+
f.logger.Info(ctx, "batch create logs called", slog.F("req", req))
250+
return &agentproto.BatchCreateLogsResponse{}, nil
263251
}
264252

265253
func NewFakeAgentAPI(t testing.TB, logger slog.Logger, manifest *agentproto.Manifest, statsCh chan *agentproto.Stats) *FakeAgentAPI {

agent/logs.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"golang.org/x/xerrors"
1010

1111
"cdr.dev/slog"
12+
"github.com/coder/coder/v2/agent/agentscripts"
1213
"github.com/coder/coder/v2/agent/proto"
1314
"github.com/coder/coder/v2/codersdk/agentsdk"
1415
)
@@ -234,3 +235,30 @@ func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
234235
}
235236
return src, q
236237
}
238+
239+
func (l *logSender) getScriptLogger(logSourceID uuid.UUID) agentscripts.ScriptLogger {
240+
return scriptLogger{srcID: logSourceID, sender: l}
241+
}
242+
243+
type scriptLogger struct {
244+
sender *logSender
245+
srcID uuid.UUID
246+
}
247+
248+
func (s scriptLogger) Send(ctx context.Context, log ...agentsdk.Log) error {
249+
select {
250+
case <-ctx.Done():
251+
return ctx.Err()
252+
default:
253+
return s.sender.enqueue(s.srcID, log...)
254+
}
255+
}
256+
257+
func (s scriptLogger) Flush(ctx context.Context) error {
258+
select {
259+
case <-ctx.Done():
260+
return ctx.Err()
261+
default:
262+
return s.sender.flush(s.srcID)
263+
}
264+
}

codersdk/agentsdk/agentsdk.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,8 @@ type PatchLogs struct {
512512

513513
// PatchLogs writes log messages to the agent startup script.
514514
// Log messages are limited to 1MB in total.
515+
//
516+
// Deprecated: use the DRPCAgentClient.BatchCreateLogs instead
515517
func (c *Client) PatchLogs(ctx context.Context, req PatchLogs) error {
516518
res, err := c.SDK.Request(ctx, http.MethodPatch, "/api/v2/workspaceagents/me/logs", req)
517519
if err != nil {

0 commit comments

Comments
 (0)