Skip to content

Commit 249076e

Browse files
committed
feat: use agent v2 API to send agent logs
1 parent 2f05031 commit 249076e

File tree

6 files changed

+197
-46
lines changed

6 files changed

+197
-46
lines changed

agent/agent.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ type Client interface {
9191
ConnectRPC(ctx context.Context) (drpc.Conn, error)
9292
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
9393
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
94-
PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error
9594
RewriteDERPMap(derpMap *tailcfg.DERPMap)
9695
}
9796

@@ -170,6 +169,7 @@ func New(options Options) Agent {
170169
syscaller: options.Syscaller,
171170
modifiedProcs: options.ModifiedProcesses,
172171
processManagementTick: options.ProcessManagementTick,
172+
logSender: agentsdk.NewLogSender(options.Logger),
173173

174174
prometheusRegistry: prometheusRegistry,
175175
metrics: newAgentMetrics(prometheusRegistry),
@@ -234,6 +234,7 @@ type agent struct {
234234
network *tailnet.Conn
235235
addresses []netip.Prefix
236236
statsReporter *statsReporter
237+
logSender *agentsdk.LogSender
237238

238239
connCountReconnectingPTY atomic.Int64
239240

@@ -271,7 +272,9 @@ func (a *agent) init() {
271272
Logger: a.logger,
272273
SSHServer: sshSrv,
273274
Filesystem: a.filesystem,
274-
PatchLogs: a.client.PatchLogs,
275+
GetScriptLogger: func(logSourceID uuid.UUID) agentscripts.ScriptLogger {
276+
return a.logSender.GetScriptLogger(logSourceID)
277+
},
275278
})
276279
// Register runner metrics. If the prom registry is nil, the metrics
277280
// will not report anywhere.
@@ -751,6 +754,20 @@ func (a *agent) run() (retErr error) {
751754
},
752755
)
753756

757+
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
758+
// shutdown scripts.
759+
arm.run("send logs", gracefulShutdownBehaviorRemain,
760+
func(ctx context.Context, conn drpc.Conn) error {
761+
err := a.logSender.SendLoop(ctx, proto.NewDRPCAgentClient(conn))
762+
if xerrors.Is(err, agentsdk.LogLimitExceededError) {
763+
// we don't want this error to tear down the API connection and propagate to the
764+
// other routines that use the API. The LogSender has already dropped a warning
765+
// log, so just return nil here.
766+
return nil
767+
}
768+
return err
769+
})
770+
754771
// channels to sync goroutines below
755772
// handle manifest
756773
// |
@@ -1747,6 +1764,12 @@ lifecycleWaitLoop:
17471764
a.logger.Debug(context.Background(), "coordinator RPC disconnected")
17481765
}
17491766

1767+
// Wait for logs to be sent
1768+
err = a.logSender.WaitUntilEmpty(a.hardCtx)
1769+
if err != nil {
1770+
a.logger.Warn(context.Background(), "timed out waiting for all logs to be sent", slog.Error(err))
1771+
}
1772+
17501773
close(a.closed)
17511774
a.hardCancel()
17521775
_ = a.sshServer.Close()

agent/agent_test.go

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2054,6 +2054,80 @@ func TestAgent_DebugServer(t *testing.T) {
20542054
})
20552055
}
20562056

2057+
func TestAgent_ScriptLogging(t *testing.T) {
2058+
if runtime.GOOS == "windows" {
2059+
t.Skip("bash scripts only")
2060+
}
2061+
t.Parallel()
2062+
ctx := testutil.Context(t, testutil.WaitMedium)
2063+
2064+
derpMap, _ := tailnettest.RunDERPAndSTUN(t)
2065+
logsCh := make(chan *proto.BatchCreateLogsRequest, 100)
2066+
lsStart := uuid.UUID{0x11}
2067+
lsStop := uuid.UUID{0x22}
2068+
//nolint:dogsled
2069+
_, _, _, _, agnt := setupAgent(
2070+
t,
2071+
agentsdk.Manifest{
2072+
DERPMap: derpMap,
2073+
Scripts: []codersdk.WorkspaceAgentScript{
2074+
{
2075+
LogSourceID: lsStart,
2076+
RunOnStart: true,
2077+
Script: `#!/bin/sh
2078+
i=0
2079+
while [ $i -ne 5 ]
2080+
do
2081+
i=$(($i+1))
2082+
echo "start $i"
2083+
done
2084+
`,
2085+
},
2086+
{
2087+
LogSourceID: lsStop,
2088+
RunOnStop: true,
2089+
Script: `#!/bin/sh
2090+
i=0
2091+
while [ $i -ne 3000 ]
2092+
do
2093+
i=$(($i+1))
2094+
echo "stop $i"
2095+
done
2096+
`, // send a lot of stop logs to make sure we don't truncate shutdown logs before closing the API conn
2097+
},
2098+
},
2099+
},
2100+
0,
2101+
func(cl *agenttest.Client, _ *agent.Options) {
2102+
cl.SetLogsChannel(logsCh)
2103+
},
2104+
)
2105+
2106+
n := 1
2107+
for n <= 5 {
2108+
logs := testutil.RequireRecvCtx(ctx, t, logsCh)
2109+
require.NotNil(t, logs)
2110+
for _, l := range logs.GetLogs() {
2111+
require.Equal(t, fmt.Sprintf("start %d", n), l.GetOutput())
2112+
n++
2113+
}
2114+
}
2115+
2116+
err := agnt.Close()
2117+
require.NoError(t, err)
2118+
2119+
n = 1
2120+
for n <= 3000 {
2121+
logs := testutil.RequireRecvCtx(ctx, t, logsCh)
2122+
require.NotNil(t, logs)
2123+
for _, l := range logs.GetLogs() {
2124+
require.Equal(t, fmt.Sprintf("stop %d", n), l.GetOutput())
2125+
n++
2126+
}
2127+
t.Logf("got %d stop logs", n-1)
2128+
}
2129+
}
2130+
20572131
// setupAgentSSHClient creates an agent, dials it, and sets up an ssh.Client for it
20582132
func setupAgentSSHClient(ctx context.Context, t *testing.T) *ssh.Client {
20592133
//nolint: dogsled
@@ -2129,7 +2203,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21292203
})
21302204
statsCh := make(chan *proto.Stats, 50)
21312205
fs := afero.NewMemMapFs()
2132-
c := agenttest.NewClient(t, logger.Named("agent"), metadata.AgentID, metadata, statsCh, coordinator)
2206+
c := agenttest.NewClient(t, logger.Named("agenttest"), metadata.AgentID, metadata, statsCh, coordinator)
21332207
t.Cleanup(c.Close)
21342208

21352209
options := agent.Options{
@@ -2144,9 +2218,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21442218
opt(c, &options)
21452219
}
21462220

2147-
closer := agent.New(options)
2221+
agnt := agent.New(options)
21482222
t.Cleanup(func() {
2149-
_ = closer.Close()
2223+
_ = agnt.Close()
21502224
})
21512225
conn, err := tailnet.NewConn(&tailnet.Options{
21522226
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
@@ -2183,7 +2257,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21832257
if !agentConn.AwaitReachable(ctx) {
21842258
t.Fatal("agent not reachable")
21852259
}
2186-
return agentConn, c, statsCh, fs, closer
2260+
return agentConn, c, statsCh, fs, agnt
21872261
}
21882262

21892263
var dialTestPayload = []byte("dean-was-here123")

agent/agentscripts/agentscripts.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/google/uuid"
1617
"github.com/prometheus/client_golang/prometheus"
1718
"github.com/robfig/cron/v3"
1819
"github.com/spf13/afero"
@@ -41,13 +42,18 @@ var (
4142
parser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.DowOptional)
4243
)
4344

45+
type ScriptLogger interface {
46+
Send(ctx context.Context, log ...agentsdk.Log) error
47+
Flush(context.Context) error
48+
}
49+
4450
// Options are a set of options for the runner.
4551
type Options struct {
46-
LogDir string
47-
Logger slog.Logger
48-
SSHServer *agentssh.Server
49-
Filesystem afero.Fs
50-
PatchLogs func(ctx context.Context, req agentsdk.PatchLogs) error
52+
LogDir string
53+
Logger slog.Logger
54+
SSHServer *agentssh.Server
55+
Filesystem afero.Fs
56+
GetScriptLogger func(logSourceID uuid.UUID) ScriptLogger
5157
}
5258

5359
// New creates a runner for the provided scripts.
@@ -238,20 +244,20 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript)
238244
cmd.WaitDelay = 10 * time.Second
239245
cmd.Cancel = cmdCancel(cmd)
240246

241-
send, flushAndClose := agentsdk.LogsSender(script.LogSourceID, r.PatchLogs, logger)
247+
scriptLogger := r.GetScriptLogger(script.LogSourceID)
242248
// If ctx is canceled here (or in a writer below), we may be
243249
// discarding logs, but that's okay because we're shutting down
244250
// anyway. We could consider creating a new context here if we
245251
// want better control over flush during shutdown.
246252
defer func() {
247-
if err := flushAndClose(ctx); err != nil {
253+
if err := scriptLogger.Flush(ctx); err != nil {
248254
logger.Warn(ctx, "flush startup logs failed", slog.Error(err))
249255
}
250256
}()
251257

252-
infoW := agentsdk.LogsWriter(ctx, send, script.LogSourceID, codersdk.LogLevelInfo)
258+
infoW := agentsdk.LogsWriter(ctx, scriptLogger.Send, script.LogSourceID, codersdk.LogLevelInfo)
253259
defer infoW.Close()
254-
errW := agentsdk.LogsWriter(ctx, send, script.LogSourceID, codersdk.LogLevelError)
260+
errW := agentsdk.LogsWriter(ctx, scriptLogger.Send, script.LogSourceID, codersdk.LogLevelError)
255261
defer errW.Close()
256262
cmd.Stdout = io.MultiWriter(fileWriter, infoW)
257263
cmd.Stderr = io.MultiWriter(fileWriter, errW)

agent/agentscripts/agentscripts_test.go

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/google/uuid"
89
"github.com/prometheus/client_golang/prometheus"
910
"github.com/spf13/afero"
1011
"github.com/stretchr/testify/require"
@@ -15,6 +16,7 @@ import (
1516
"github.com/coder/coder/v2/agent/agentssh"
1617
"github.com/coder/coder/v2/codersdk"
1718
"github.com/coder/coder/v2/codersdk/agentsdk"
19+
"github.com/coder/coder/v2/testutil"
1820
)
1921

2022
func TestMain(m *testing.M) {
@@ -23,10 +25,10 @@ func TestMain(m *testing.M) {
2325

2426
func TestExecuteBasic(t *testing.T) {
2527
t.Parallel()
26-
logs := make(chan agentsdk.PatchLogs, 1)
27-
runner := setup(t, func(ctx context.Context, req agentsdk.PatchLogs) error {
28-
logs <- req
29-
return nil
28+
ctx := testutil.Context(t, testutil.WaitShort)
29+
fLogger := newFakeScriptLogger()
30+
runner := setup(t, func(uuid2 uuid.UUID) agentscripts.ScriptLogger {
31+
return fLogger
3032
})
3133
defer runner.Close()
3234
err := runner.Init([]codersdk.WorkspaceAgentScript{{
@@ -36,8 +38,8 @@ func TestExecuteBasic(t *testing.T) {
3638
require.NoError(t, runner.Execute(context.Background(), func(script codersdk.WorkspaceAgentScript) bool {
3739
return true
3840
}))
39-
log := <-logs
40-
require.Equal(t, "hello", log.Logs[0].Output)
41+
log := testutil.RequireRecvCtx(ctx, t, fLogger.logs)
42+
require.Equal(t, "hello", log.Output)
4143
}
4244

4345
func TestTimeout(t *testing.T) {
@@ -61,12 +63,12 @@ func TestCronClose(t *testing.T) {
6163
require.NoError(t, runner.Close(), "close runner")
6264
}
6365

64-
func setup(t *testing.T, patchLogs func(ctx context.Context, req agentsdk.PatchLogs) error) *agentscripts.Runner {
66+
func setup(t *testing.T, getScriptLogger func(logSourceID uuid.UUID) agentscripts.ScriptLogger) *agentscripts.Runner {
6567
t.Helper()
66-
if patchLogs == nil {
68+
if getScriptLogger == nil {
6769
// noop
68-
patchLogs = func(ctx context.Context, req agentsdk.PatchLogs) error {
69-
return nil
70+
getScriptLogger = func(uuid uuid.UUID) agentscripts.ScriptLogger {
71+
return noopScriptLogger{}
7072
}
7173
}
7274
fs := afero.NewMemMapFs()
@@ -77,10 +79,44 @@ func setup(t *testing.T, patchLogs func(ctx context.Context, req agentsdk.PatchL
7779
_ = s.Close()
7880
})
7981
return agentscripts.New(agentscripts.Options{
80-
LogDir: t.TempDir(),
81-
Logger: logger,
82-
SSHServer: s,
83-
Filesystem: fs,
84-
PatchLogs: patchLogs,
82+
LogDir: t.TempDir(),
83+
Logger: logger,
84+
SSHServer: s,
85+
Filesystem: fs,
86+
GetScriptLogger: getScriptLogger,
8587
})
8688
}
89+
90+
type noopScriptLogger struct{}
91+
92+
func (noopScriptLogger) Send(context.Context, ...agentsdk.Log) error {
93+
return nil
94+
}
95+
96+
func (noopScriptLogger) Flush(context.Context) error {
97+
return nil
98+
}
99+
100+
type fakeScriptLogger struct {
101+
logs chan agentsdk.Log
102+
}
103+
104+
func (f *fakeScriptLogger) Send(ctx context.Context, logs ...agentsdk.Log) error {
105+
for _, log := range logs {
106+
select {
107+
case <-ctx.Done():
108+
return ctx.Err()
109+
case f.logs <- log:
110+
// OK!
111+
}
112+
}
113+
return nil
114+
}
115+
116+
func (*fakeScriptLogger) Flush(context.Context) error {
117+
return nil
118+
}
119+
120+
func newFakeScriptLogger() *fakeScriptLogger {
121+
return &fakeScriptLogger{make(chan agentsdk.Log, 100)}
122+
}

0 commit comments

Comments
 (0)