Skip to content

Commit 98164f6

Browse files
kylecarbsmafredri
andauthored
fix!: remove startup logs eof for streaming (coder#8528)
* fix: remove startup logs eof for streaming We have external utilities like logstream-kube that may send logs after an agent shuts down unexpectedly to report additional information. In a recent change we stopped accepting these logs, which broke these utilities. In the future we'll rename startup logs to agent logs or something more generalized so this is less confusing in the future. * fix(cli/cliui): handle never ending startup log stream in Agent --------- Co-authored-by: Mathias Fredriksson <mafredri@gmail.com>
1 parent 5826588 commit 98164f6

File tree

5 files changed

+82
-239
lines changed

5 files changed

+82
-239
lines changed

cli/cliui/agent.go

+23-5
Original file line numberDiff line numberDiff line change
@@ -137,26 +137,44 @@ func Agent(ctx context.Context, writer io.Writer, agentID uuid.UUID, opts AgentO
137137
}
138138
defer logsCloser.Close()
139139

140+
var lastLog codersdk.WorkspaceAgentStartupLog
141+
fetchedAgentWhileFollowing := fetchedAgent
142+
if !follow {
143+
fetchedAgentWhileFollowing = nil
144+
}
140145
for {
141146
// This select is essentially and inline `fetch()`.
142147
select {
143148
case <-ctx.Done():
144149
return ctx.Err()
145-
case f := <-fetchedAgent:
150+
case f := <-fetchedAgentWhileFollowing:
146151
if f.err != nil {
147152
return xerrors.Errorf("fetch: %w", f.err)
148153
}
149-
// We could handle changes in the agent status here, like
150-
// if the agent becomes disconnected, we may want to stop.
151-
// But for now, we'll just keep going, hopefully the agent
152-
// will reconnect and update its status.
153154
agent = f.agent
155+
156+
// If the agent is no longer starting, stop following
157+
// logs because FetchLogs will keep streaming forever.
158+
// We do one last non-follow request to ensure we have
159+
// fetched all logs.
160+
if !agent.LifecycleState.Starting() {
161+
_ = logsCloser.Close()
162+
fetchedAgentWhileFollowing = nil
163+
164+
logStream, logsCloser, err = opts.FetchLogs(ctx, agent.ID, lastLog.ID, false)
165+
if err != nil {
166+
return xerrors.Errorf("fetch workspace agent startup logs: %w", err)
167+
}
168+
// Logs are already primed, so we can call close.
169+
_ = logsCloser.Close()
170+
}
154171
case logs, ok := <-logStream:
155172
if !ok {
156173
return nil
157174
}
158175
for _, log := range logs {
159176
sw.Log(log.CreatedAt, log.Level, log.Output)
177+
lastLog = log
160178
}
161179
}
162180
}

cli/cliui/agent_test.go

+15-12
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ func TestAgent(t *testing.T) {
4646
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
4747
agent.Status = codersdk.WorkspaceAgentConnected
4848
agent.FirstConnectedAt = ptr.Ref(time.Now())
49-
close(logs)
5049
return nil
5150
},
5251
},
@@ -79,7 +78,6 @@ func TestAgent(t *testing.T) {
7978
agent.FirstConnectedAt = ptr.Ref(time.Now())
8079
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady
8180
agent.ReadyAt = ptr.Ref(time.Now())
82-
close(logs)
8381
return nil
8482
},
8583
},
@@ -113,10 +111,6 @@ func TestAgent(t *testing.T) {
113111
agent.LastConnectedAt = ptr.Ref(time.Now())
114112
return nil
115113
},
116-
func(_ context.Context, _ *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
117-
close(logs)
118-
return nil
119-
},
120114
},
121115
want: []string{
122116
"⧗ The workspace agent lost connection",
@@ -154,7 +148,6 @@ func TestAgent(t *testing.T) {
154148
Output: "Bye now",
155149
},
156150
}
157-
close(logs)
158151
return nil
159152
},
160153
},
@@ -184,7 +177,6 @@ func TestAgent(t *testing.T) {
184177
Output: "Hello world",
185178
},
186179
}
187-
close(logs)
188180
return nil
189181
},
190182
},
@@ -205,7 +197,6 @@ func TestAgent(t *testing.T) {
205197
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
206198
agent.Status = codersdk.WorkspaceAgentDisconnected
207199
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleOff
208-
close(logs)
209200
return nil
210201
},
211202
},
@@ -234,7 +225,6 @@ func TestAgent(t *testing.T) {
234225
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
235226
agent.ReadyAt = ptr.Ref(time.Now())
236227
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleShuttingDown
237-
close(logs)
238228
return nil
239229
},
240230
},
@@ -316,8 +306,21 @@ func TestAgent(t *testing.T) {
316306
}
317307
return agent, err
318308
}
319-
tc.opts.FetchLogs = func(_ context.Context, _ uuid.UUID, _ int64, _ bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
320-
return logs, closeFunc(func() error { return nil }), nil
309+
tc.opts.FetchLogs = func(ctx context.Context, _ uuid.UUID, _ int64, follow bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
310+
if follow {
311+
return logs, closeFunc(func() error { return nil }), nil
312+
}
313+
314+
fetchLogs := make(chan []codersdk.WorkspaceAgentStartupLog, 1)
315+
select {
316+
case <-ctx.Done():
317+
return nil, nil, ctx.Err()
318+
case l := <-logs:
319+
fetchLogs <- l
320+
default:
321+
}
322+
close(fetchLogs)
323+
return fetchLogs, closeFunc(func() error { return nil }), nil
321324
}
322325
err := cliui.Agent(inv.Context(), &buf, uuid.Nil, tc.opts)
323326
return err

coderd/workspaceagents.go

+44-97
Original file line numberDiff line numberDiff line change
@@ -280,81 +280,61 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R
280280
level = append(level, parsedLevel)
281281
}
282282

283-
var logs []database.WorkspaceAgentStartupLog
284-
// Ensure logs are not written after script ended.
285-
scriptEndedError := xerrors.New("startup script has ended")
286-
err := api.Database.InTx(func(db database.Store) error {
287-
state, err := db.GetWorkspaceAgentLifecycleStateByID(ctx, workspaceAgent.ID)
288-
if err != nil {
289-
return xerrors.Errorf("workspace agent startup script status: %w", err)
283+
logs, err := api.Database.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
284+
AgentID: workspaceAgent.ID,
285+
CreatedAt: createdAt,
286+
Output: output,
287+
Level: level,
288+
OutputLength: int32(outputLength),
289+
})
290+
if err != nil {
291+
if !database.IsStartupLogsLimitError(err) {
292+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
293+
Message: "Failed to upload startup logs",
294+
Detail: err.Error(),
295+
})
296+
return
290297
}
291-
292-
if state.ReadyAt.Valid {
293-
// The agent startup script has already ended, so we don't want to
294-
// process any more logs.
295-
return scriptEndedError
298+
if workspaceAgent.StartupLogsOverflowed {
299+
httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
300+
Message: "Startup logs limit exceeded",
301+
Detail: err.Error(),
302+
})
303+
return
296304
}
297-
298-
logs, err = db.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
299-
AgentID: workspaceAgent.ID,
300-
CreatedAt: createdAt,
301-
Output: output,
302-
Level: level,
303-
OutputLength: int32(outputLength),
305+
err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{
306+
ID: workspaceAgent.ID,
307+
StartupLogsOverflowed: true,
304308
})
305-
return err
306-
}, nil)
307-
if err != nil {
308-
if errors.Is(err, scriptEndedError) {
309-
httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{
310-
Message: "Failed to upload logs, startup script has already ended.",
309+
if err != nil {
310+
// We don't want to return here, because the agent will retry
311+
// on failure and this isn't a huge deal. The overflow state
312+
// is just a hint to the user that the logs are incomplete.
313+
api.Logger.Warn(ctx, "failed to update workspace agent startup log overflow", slog.Error(err))
314+
}
315+
316+
resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID)
317+
if err != nil {
318+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
319+
Message: "Failed to get workspace resource.",
311320
Detail: err.Error(),
312321
})
313322
return
314323
}
315-
if database.IsStartupLogsLimitError(err) {
316-
if !workspaceAgent.StartupLogsOverflowed {
317-
err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{
318-
ID: workspaceAgent.ID,
319-
StartupLogsOverflowed: true,
320-
})
321-
if err != nil {
322-
// We don't want to return here, because the agent will retry
323-
// on failure and this isn't a huge deal. The overflow state
324-
// is just a hint to the user that the logs are incomplete.
325-
api.Logger.Warn(ctx, "failed to update workspace agent startup log overflow", slog.Error(err))
326-
}
327324

328-
resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID)
329-
if err != nil {
330-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
331-
Message: "Failed to get workspace resource.",
332-
Detail: err.Error(),
333-
})
334-
return
335-
}
336-
337-
build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
338-
if err != nil {
339-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
340-
Message: "Internal error fetching workspace build job.",
341-
Detail: err.Error(),
342-
})
343-
return
344-
}
345-
346-
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
347-
}
348-
349-
httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
350-
Message: "Startup logs limit exceeded",
325+
build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
326+
if err != nil {
327+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
328+
Message: "Internal error fetching workspace build job.",
351329
Detail: err.Error(),
352330
})
353331
return
354332
}
355-
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
356-
Message: "Failed to upload startup logs",
357-
Detail: err.Error(),
333+
334+
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
335+
336+
httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
337+
Message: "Startup logs limit exceeded",
358338
})
359339
return
360340
}
@@ -497,18 +477,6 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
497477
return
498478
}
499479

500-
if workspaceAgent.ReadyAt.Valid {
501-
// Fast path, the startup script has finished running, so we can close
502-
// the connection.
503-
return
504-
}
505-
if !codersdk.WorkspaceAgentLifecycle(workspaceAgent.LifecycleState).Starting() {
506-
// Backwards compatibility: Avoid waiting forever in case this agent is
507-
// older than the current release and has already reported the ready
508-
// state.
509-
return
510-
}
511-
512480
lastSentLogID := after
513481
if len(logs) > 0 {
514482
lastSentLogID = logs[len(logs)-1].ID
@@ -543,11 +511,9 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
543511
t := time.NewTicker(recheckInterval)
544512
defer t.Stop()
545513

546-
var state database.GetWorkspaceAgentLifecycleStateByIDRow
547514
go func() {
548515
defer close(bufferedLogs)
549516

550-
var err error
551517
for {
552518
select {
553519
case <-ctx.Done():
@@ -557,17 +523,6 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
557523
t.Reset(recheckInterval)
558524
}
559525

560-
if !state.ReadyAt.Valid {
561-
state, err = api.Database.GetWorkspaceAgentLifecycleStateByID(ctx, workspaceAgent.ID)
562-
if err != nil {
563-
if xerrors.Is(err, context.Canceled) {
564-
return
565-
}
566-
logger.Warn(ctx, "failed to get workspace agent lifecycle state", slog.Error(err))
567-
continue
568-
}
569-
}
570-
571526
logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(ctx, database.GetWorkspaceAgentStartupLogsAfterParams{
572527
AgentID: workspaceAgent.ID,
573528
CreatedAfter: lastSentLogID,
@@ -580,9 +535,7 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
580535
continue
581536
}
582537
if len(logs) == 0 {
583-
if state.ReadyAt.Valid {
584-
return
585-
}
538+
// Just keep listening - more logs might come in the future!
586539
continue
587540
}
588541

@@ -1689,12 +1642,6 @@ func (api *API) workspaceAgentReportLifecycle(rw http.ResponseWriter, r *http.Re
16891642
return
16901643
}
16911644

1692-
if readyAt.Valid {
1693-
api.publishWorkspaceAgentStartupLogsUpdate(ctx, workspaceAgent.ID, agentsdk.StartupLogsNotifyMessage{
1694-
EndOfLogs: true,
1695-
})
1696-
}
1697-
16981645
api.publishWorkspaceUpdate(ctx, workspace.ID)
16991646

17001647
httpapi.Write(ctx, rw, http.StatusNoContent, nil)

0 commit comments

Comments
 (0)