Skip to content

Commit 4cc132c

Browse files
authored
feat: switch agent to use v2 API for sending logs (#12068)
Changes the agent to use the new v2 API for sending logs, via the logSender component. We keep the PatchLogs function around, but deprecate it so that we can test the v1 endpoint.
1 parent af3fdc6 commit 4cc132c

File tree

6 files changed

+203
-63
lines changed

6 files changed

+203
-63
lines changed

agent/agent.go

+25-2
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ type Client interface {
9292
ConnectRPC(ctx context.Context) (drpc.Conn, error)
9393
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
9494
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
95-
PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error
9695
RewriteDERPMap(derpMap *tailcfg.DERPMap)
9796
}
9897

@@ -181,6 +180,7 @@ func New(options Options) Agent {
181180
syscaller: options.Syscaller,
182181
modifiedProcs: options.ModifiedProcesses,
183182
processManagementTick: options.ProcessManagementTick,
183+
logSender: agentsdk.NewLogSender(options.Logger),
184184

185185
prometheusRegistry: prometheusRegistry,
186186
metrics: newAgentMetrics(prometheusRegistry),
@@ -245,6 +245,7 @@ type agent struct {
245245
network *tailnet.Conn
246246
addresses []netip.Prefix
247247
statsReporter *statsReporter
248+
logSender *agentsdk.LogSender
248249

249250
connCountReconnectingPTY atomic.Int64
250251

@@ -283,7 +284,9 @@ func (a *agent) init() {
283284
Logger: a.logger,
284285
SSHServer: sshSrv,
285286
Filesystem: a.filesystem,
286-
PatchLogs: a.client.PatchLogs,
287+
GetScriptLogger: func(logSourceID uuid.UUID) agentscripts.ScriptLogger {
288+
return a.logSender.GetScriptLogger(logSourceID)
289+
},
287290
})
288291
// Register runner metrics. If the prom registry is nil, the metrics
289292
// will not report anywhere.
@@ -763,6 +766,20 @@ func (a *agent) run() (retErr error) {
763766
},
764767
)
765768

769+
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
770+
// shutdown scripts.
771+
connMan.start("send logs", gracefulShutdownBehaviorRemain,
772+
func(ctx context.Context, conn drpc.Conn) error {
773+
err := a.logSender.SendLoop(ctx, proto.NewDRPCAgentClient(conn))
774+
if xerrors.Is(err, agentsdk.LogLimitExceededError) {
775+
// we don't want this error to tear down the API connection and propagate to the
776+
// other routines that use the API. The LogSender has already dropped a warning
777+
// log, so just return nil here.
778+
return nil
779+
}
780+
return err
781+
})
782+
766783
// channels to sync goroutines below
767784
// handle manifest
768785
// |
@@ -1769,6 +1786,12 @@ lifecycleWaitLoop:
17691786
a.logger.Debug(context.Background(), "coordinator RPC disconnected")
17701787
}
17711788

1789+
// Wait for logs to be sent
1790+
err = a.logSender.WaitUntilEmpty(a.hardCtx)
1791+
if err != nil {
1792+
a.logger.Warn(context.Background(), "timed out waiting for all logs to be sent", slog.Error(err))
1793+
}
1794+
17721795
a.hardCancel()
17731796
if a.network != nil {
17741797
_ = a.network.Close()

agent/agent_test.go

+78-4
Original file line numberDiff line numberDiff line change
@@ -2062,6 +2062,80 @@ func TestAgent_DebugServer(t *testing.T) {
20622062
})
20632063
}
20642064

2065+
func TestAgent_ScriptLogging(t *testing.T) {
2066+
if runtime.GOOS == "windows" {
2067+
t.Skip("bash scripts only")
2068+
}
2069+
t.Parallel()
2070+
ctx := testutil.Context(t, testutil.WaitMedium)
2071+
2072+
derpMap, _ := tailnettest.RunDERPAndSTUN(t)
2073+
logsCh := make(chan *proto.BatchCreateLogsRequest, 100)
2074+
lsStart := uuid.UUID{0x11}
2075+
lsStop := uuid.UUID{0x22}
2076+
//nolint:dogsled
2077+
_, _, _, _, agnt := setupAgent(
2078+
t,
2079+
agentsdk.Manifest{
2080+
DERPMap: derpMap,
2081+
Scripts: []codersdk.WorkspaceAgentScript{
2082+
{
2083+
LogSourceID: lsStart,
2084+
RunOnStart: true,
2085+
Script: `#!/bin/sh
2086+
i=0
2087+
while [ $i -ne 5 ]
2088+
do
2089+
i=$(($i+1))
2090+
echo "start $i"
2091+
done
2092+
`,
2093+
},
2094+
{
2095+
LogSourceID: lsStop,
2096+
RunOnStop: true,
2097+
Script: `#!/bin/sh
2098+
i=0
2099+
while [ $i -ne 3000 ]
2100+
do
2101+
i=$(($i+1))
2102+
echo "stop $i"
2103+
done
2104+
`, // send a lot of stop logs to make sure we don't truncate shutdown logs before closing the API conn
2105+
},
2106+
},
2107+
},
2108+
0,
2109+
func(cl *agenttest.Client, _ *agent.Options) {
2110+
cl.SetLogsChannel(logsCh)
2111+
},
2112+
)
2113+
2114+
n := 1
2115+
for n <= 5 {
2116+
logs := testutil.RequireRecvCtx(ctx, t, logsCh)
2117+
require.NotNil(t, logs)
2118+
for _, l := range logs.GetLogs() {
2119+
require.Equal(t, fmt.Sprintf("start %d", n), l.GetOutput())
2120+
n++
2121+
}
2122+
}
2123+
2124+
err := agnt.Close()
2125+
require.NoError(t, err)
2126+
2127+
n = 1
2128+
for n <= 3000 {
2129+
logs := testutil.RequireRecvCtx(ctx, t, logsCh)
2130+
require.NotNil(t, logs)
2131+
for _, l := range logs.GetLogs() {
2132+
require.Equal(t, fmt.Sprintf("stop %d", n), l.GetOutput())
2133+
n++
2134+
}
2135+
t.Logf("got %d stop logs", n-1)
2136+
}
2137+
}
2138+
20652139
// setupAgentSSHClient creates an agent, dials it, and sets up an ssh.Client for it
20662140
func setupAgentSSHClient(ctx context.Context, t *testing.T) *ssh.Client {
20672141
//nolint: dogsled
@@ -2137,7 +2211,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21372211
})
21382212
statsCh := make(chan *proto.Stats, 50)
21392213
fs := afero.NewMemMapFs()
2140-
c := agenttest.NewClient(t, logger.Named("agent"), metadata.AgentID, metadata, statsCh, coordinator)
2214+
c := agenttest.NewClient(t, logger.Named("agenttest"), metadata.AgentID, metadata, statsCh, coordinator)
21412215
t.Cleanup(c.Close)
21422216

21432217
options := agent.Options{
@@ -2152,9 +2226,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21522226
opt(c, &options)
21532227
}
21542228

2155-
closer := agent.New(options)
2229+
agnt := agent.New(options)
21562230
t.Cleanup(func() {
2157-
_ = closer.Close()
2231+
_ = agnt.Close()
21582232
})
21592233
conn, err := tailnet.NewConn(&tailnet.Options{
21602234
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
@@ -2191,7 +2265,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21912265
if !agentConn.AwaitReachable(ctx) {
21922266
t.Fatal("agent not reachable")
21932267
}
2194-
return agentConn, c, statsCh, fs, closer
2268+
return agentConn, c, statsCh, fs, agnt
21952269
}
21962270

21972271
var dialTestPayload = []byte("dean-was-here123")

agent/agentscripts/agentscripts.go

+16-10
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/google/uuid"
1617
"github.com/prometheus/client_golang/prometheus"
1718
"github.com/robfig/cron/v3"
1819
"github.com/spf13/afero"
@@ -41,14 +42,19 @@ var (
4142
parser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.DowOptional)
4243
)
4344

45+
type ScriptLogger interface {
46+
Send(ctx context.Context, log ...agentsdk.Log) error
47+
Flush(context.Context) error
48+
}
49+
4450
// Options are a set of options for the runner.
4551
type Options struct {
46-
DataDirBase string
47-
LogDir string
48-
Logger slog.Logger
49-
SSHServer *agentssh.Server
50-
Filesystem afero.Fs
51-
PatchLogs func(ctx context.Context, req agentsdk.PatchLogs) error
52+
DataDirBase string
53+
LogDir string
54+
Logger slog.Logger
55+
SSHServer *agentssh.Server
56+
Filesystem afero.Fs
57+
GetScriptLogger func(logSourceID uuid.UUID) ScriptLogger
5258
}
5359

5460
// New creates a runner for the provided scripts.
@@ -275,20 +281,20 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript)
275281
cmd.Env = append(cmd.Env, "CODER_SCRIPT_DATA_DIR="+scriptDataDir)
276282
cmd.Env = append(cmd.Env, "CODER_SCRIPT_BIN_DIR="+r.ScriptBinDir())
277283

278-
send, flushAndClose := agentsdk.LogsSender(script.LogSourceID, r.PatchLogs, logger)
284+
scriptLogger := r.GetScriptLogger(script.LogSourceID)
279285
// If ctx is canceled here (or in a writer below), we may be
280286
// discarding logs, but that's okay because we're shutting down
281287
// anyway. We could consider creating a new context here if we
282288
// want better control over flush during shutdown.
283289
defer func() {
284-
if err := flushAndClose(ctx); err != nil {
290+
if err := scriptLogger.Flush(ctx); err != nil {
285291
logger.Warn(ctx, "flush startup logs failed", slog.Error(err))
286292
}
287293
}()
288294

289-
infoW := agentsdk.LogsWriter(ctx, send, script.LogSourceID, codersdk.LogLevelInfo)
295+
infoW := agentsdk.LogsWriter(ctx, scriptLogger.Send, script.LogSourceID, codersdk.LogLevelInfo)
290296
defer infoW.Close()
291-
errW := agentsdk.LogsWriter(ctx, send, script.LogSourceID, codersdk.LogLevelError)
297+
errW := agentsdk.LogsWriter(ctx, scriptLogger.Send, script.LogSourceID, codersdk.LogLevelError)
292298
defer errW.Close()
293299
cmd.Stdout = io.MultiWriter(fileWriter, infoW)
294300
cmd.Stderr = io.MultiWriter(fileWriter, errW)

agent/agentscripts/agentscripts_test.go

+56-31
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,10 @@ func TestMain(m *testing.M) {
2828

2929
func TestExecuteBasic(t *testing.T) {
3030
t.Parallel()
31-
logs := make(chan agentsdk.PatchLogs, 1)
32-
runner := setup(t, func(ctx context.Context, req agentsdk.PatchLogs) error {
33-
select {
34-
case <-ctx.Done():
35-
case logs <- req:
36-
}
37-
return nil
31+
ctx := testutil.Context(t, testutil.WaitShort)
32+
fLogger := newFakeScriptLogger()
33+
runner := setup(t, func(uuid2 uuid.UUID) agentscripts.ScriptLogger {
34+
return fLogger
3835
})
3936
defer runner.Close()
4037
err := runner.Init([]codersdk.WorkspaceAgentScript{{
@@ -45,19 +42,15 @@ func TestExecuteBasic(t *testing.T) {
4542
require.NoError(t, runner.Execute(context.Background(), func(script codersdk.WorkspaceAgentScript) bool {
4643
return true
4744
}))
48-
log := <-logs
49-
require.Equal(t, "hello", log.Logs[0].Output)
45+
log := testutil.RequireRecvCtx(ctx, t, fLogger.logs)
46+
require.Equal(t, "hello", log.Output)
5047
}
5148

5249
func TestEnv(t *testing.T) {
5350
t.Parallel()
54-
logs := make(chan agentsdk.PatchLogs, 2)
55-
runner := setup(t, func(ctx context.Context, req agentsdk.PatchLogs) error {
56-
select {
57-
case <-ctx.Done():
58-
case logs <- req:
59-
}
60-
return nil
51+
fLogger := newFakeScriptLogger()
52+
runner := setup(t, func(uuid2 uuid.UUID) agentscripts.ScriptLogger {
53+
return fLogger
6154
})
6255
defer runner.Close()
6356
id := uuid.New()
@@ -88,11 +81,9 @@ func TestEnv(t *testing.T) {
8881
select {
8982
case <-ctx.Done():
9083
require.Fail(t, "timed out waiting for logs")
91-
case l := <-logs:
92-
for _, l := range l.Logs {
93-
t.Logf("log: %s", l.Output)
94-
}
95-
log = append(log, l.Logs...)
84+
case l := <-fLogger.logs:
85+
t.Logf("log: %s", l.Output)
86+
log = append(log, l)
9687
}
9788
if len(log) >= 2 {
9889
break
@@ -124,12 +115,12 @@ func TestCronClose(t *testing.T) {
124115
require.NoError(t, runner.Close(), "close runner")
125116
}
126117

127-
func setup(t *testing.T, patchLogs func(ctx context.Context, req agentsdk.PatchLogs) error) *agentscripts.Runner {
118+
func setup(t *testing.T, getScriptLogger func(logSourceID uuid.UUID) agentscripts.ScriptLogger) *agentscripts.Runner {
128119
t.Helper()
129-
if patchLogs == nil {
120+
if getScriptLogger == nil {
130121
// noop
131-
patchLogs = func(ctx context.Context, req agentsdk.PatchLogs) error {
132-
return nil
122+
getScriptLogger = func(uuid uuid.UUID) agentscripts.ScriptLogger {
123+
return noopScriptLogger{}
133124
}
134125
}
135126
fs := afero.NewMemMapFs()
@@ -140,11 +131,45 @@ func setup(t *testing.T, patchLogs func(ctx context.Context, req agentsdk.PatchL
140131
_ = s.Close()
141132
})
142133
return agentscripts.New(agentscripts.Options{
143-
LogDir: t.TempDir(),
144-
DataDirBase: t.TempDir(),
145-
Logger: logger,
146-
SSHServer: s,
147-
Filesystem: fs,
148-
PatchLogs: patchLogs,
134+
LogDir: t.TempDir(),
135+
DataDirBase: t.TempDir(),
136+
Logger: logger,
137+
SSHServer: s,
138+
Filesystem: fs,
139+
GetScriptLogger: getScriptLogger,
149140
})
150141
}
142+
143+
type noopScriptLogger struct{}
144+
145+
func (noopScriptLogger) Send(context.Context, ...agentsdk.Log) error {
146+
return nil
147+
}
148+
149+
func (noopScriptLogger) Flush(context.Context) error {
150+
return nil
151+
}
152+
153+
type fakeScriptLogger struct {
154+
logs chan agentsdk.Log
155+
}
156+
157+
func (f *fakeScriptLogger) Send(ctx context.Context, logs ...agentsdk.Log) error {
158+
for _, log := range logs {
159+
select {
160+
case <-ctx.Done():
161+
return ctx.Err()
162+
case f.logs <- log:
163+
// OK!
164+
}
165+
}
166+
return nil
167+
}
168+
169+
func (*fakeScriptLogger) Flush(context.Context) error {
170+
return nil
171+
}
172+
173+
func newFakeScriptLogger() *fakeScriptLogger {
174+
return &fakeScriptLogger{make(chan agentsdk.Log, 100)}
175+
}

0 commit comments

Comments
 (0)