Skip to content

Commit 2471dd5

Browse files
committed
Merge branch 'main' into 6818-reverse-port-fwd
2 parents 0b53a5c + 164b816 commit 2471dd5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1672
-817
lines changed

cli/cliui/agent.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,26 +137,44 @@ func Agent(ctx context.Context, writer io.Writer, agentID uuid.UUID, opts AgentO
137137
}
138138
defer logsCloser.Close()
139139

140+
var lastLog codersdk.WorkspaceAgentStartupLog
141+
fetchedAgentWhileFollowing := fetchedAgent
142+
if !follow {
143+
fetchedAgentWhileFollowing = nil
144+
}
140145
for {
141146
// This select is essentially and inline `fetch()`.
142147
select {
143148
case <-ctx.Done():
144149
return ctx.Err()
145-
case f := <-fetchedAgent:
150+
case f := <-fetchedAgentWhileFollowing:
146151
if f.err != nil {
147152
return xerrors.Errorf("fetch: %w", f.err)
148153
}
149-
// We could handle changes in the agent status here, like
150-
// if the agent becomes disconnected, we may want to stop.
151-
// But for now, we'll just keep going, hopefully the agent
152-
// will reconnect and update its status.
153154
agent = f.agent
155+
156+
// If the agent is no longer starting, stop following
157+
// logs because FetchLogs will keep streaming forever.
158+
// We do one last non-follow request to ensure we have
159+
// fetched all logs.
160+
if !agent.LifecycleState.Starting() {
161+
_ = logsCloser.Close()
162+
fetchedAgentWhileFollowing = nil
163+
164+
logStream, logsCloser, err = opts.FetchLogs(ctx, agent.ID, lastLog.ID, false)
165+
if err != nil {
166+
return xerrors.Errorf("fetch workspace agent startup logs: %w", err)
167+
}
168+
// Logs are already primed, so we can call close.
169+
_ = logsCloser.Close()
170+
}
154171
case logs, ok := <-logStream:
155172
if !ok {
156173
return nil
157174
}
158175
for _, log := range logs {
159176
sw.Log(log.CreatedAt, log.Level, log.Output)
177+
lastLog = log
160178
}
161179
}
162180
}

cli/cliui/agent_test.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ func TestAgent(t *testing.T) {
4646
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
4747
agent.Status = codersdk.WorkspaceAgentConnected
4848
agent.FirstConnectedAt = ptr.Ref(time.Now())
49-
close(logs)
5049
return nil
5150
},
5251
},
@@ -79,7 +78,6 @@ func TestAgent(t *testing.T) {
7978
agent.FirstConnectedAt = ptr.Ref(time.Now())
8079
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady
8180
agent.ReadyAt = ptr.Ref(time.Now())
82-
close(logs)
8381
return nil
8482
},
8583
},
@@ -113,10 +111,6 @@ func TestAgent(t *testing.T) {
113111
agent.LastConnectedAt = ptr.Ref(time.Now())
114112
return nil
115113
},
116-
func(_ context.Context, _ *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
117-
close(logs)
118-
return nil
119-
},
120114
},
121115
want: []string{
122116
"⧗ The workspace agent lost connection",
@@ -154,7 +148,6 @@ func TestAgent(t *testing.T) {
154148
Output: "Bye now",
155149
},
156150
}
157-
close(logs)
158151
return nil
159152
},
160153
},
@@ -184,7 +177,6 @@ func TestAgent(t *testing.T) {
184177
Output: "Hello world",
185178
},
186179
}
187-
close(logs)
188180
return nil
189181
},
190182
},
@@ -205,7 +197,6 @@ func TestAgent(t *testing.T) {
205197
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
206198
agent.Status = codersdk.WorkspaceAgentDisconnected
207199
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleOff
208-
close(logs)
209200
return nil
210201
},
211202
},
@@ -234,7 +225,6 @@ func TestAgent(t *testing.T) {
234225
func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
235226
agent.ReadyAt = ptr.Ref(time.Now())
236227
agent.LifecycleState = codersdk.WorkspaceAgentLifecycleShuttingDown
237-
close(logs)
238228
return nil
239229
},
240230
},
@@ -316,8 +306,21 @@ func TestAgent(t *testing.T) {
316306
}
317307
return agent, err
318308
}
319-
tc.opts.FetchLogs = func(_ context.Context, _ uuid.UUID, _ int64, _ bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
320-
return logs, closeFunc(func() error { return nil }), nil
309+
tc.opts.FetchLogs = func(ctx context.Context, _ uuid.UUID, _ int64, follow bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
310+
if follow {
311+
return logs, closeFunc(func() error { return nil }), nil
312+
}
313+
314+
fetchLogs := make(chan []codersdk.WorkspaceAgentStartupLog, 1)
315+
select {
316+
case <-ctx.Done():
317+
return nil, nil, ctx.Err()
318+
case l := <-logs:
319+
fetchLogs <- l
320+
default:
321+
}
322+
close(fetchLogs)
323+
return fetchLogs, closeFunc(func() error { return nil }), nil
321324
}
322325
err := cliui.Agent(inv.Context(), &buf, uuid.Nil, tc.opts)
323326
return err

cli/exp_scaletest.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
848848
var (
849849
tickInterval time.Duration
850850
bytesPerTick int64
851+
ssh bool
851852
scaletestPrometheusAddress string
852853
scaletestPrometheusWait time.Duration
853854

@@ -938,20 +939,19 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
938939

939940
// Setup our workspace agent connection.
940941
config := workspacetraffic.Config{
941-
AgentID: agentID,
942-
AgentName: agentName,
943-
BytesPerTick: bytesPerTick,
944-
Duration: strategy.timeout,
945-
TickInterval: tickInterval,
946-
WorkspaceName: ws.Name,
947-
WorkspaceOwner: ws.OwnerName,
948-
Registry: reg,
942+
AgentID: agentID,
943+
BytesPerTick: bytesPerTick,
944+
Duration: strategy.timeout,
945+
TickInterval: tickInterval,
946+
ReadMetrics: metrics.ReadMetrics(ws.OwnerName, ws.Name, agentName),
947+
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agentName),
948+
SSH: ssh,
949949
}
950950

951951
if err := config.Validate(); err != nil {
952952
return xerrors.Errorf("validate config: %w", err)
953953
}
954-
var runner harness.Runnable = workspacetraffic.NewRunner(client, config, metrics)
954+
var runner harness.Runnable = workspacetraffic.NewRunner(client, config)
955955
if tracingEnabled {
956956
runner = &runnableTraceWrapper{
957957
tracer: tracer,
@@ -1002,6 +1002,13 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
10021002
Description: "How often to send traffic.",
10031003
Value: clibase.DurationOf(&tickInterval),
10041004
},
1005+
{
1006+
Flag: "ssh",
1007+
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_SSH",
1008+
Default: "",
1009+
Description: "Send traffic over SSH.",
1010+
Value: clibase.BoolOf(&ssh),
1011+
},
10051012
{
10061013
Flag: "scaletest-prometheus-address",
10071014
Env: "CODER_SCALETEST_PROMETHEUS_ADDRESS",

cli/exp_scaletest_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func TestScaleTestWorkspaceTraffic(t *testing.T) {
6969
"--tick-interval", "100ms",
7070
"--scaletest-prometheus-address", "127.0.0.1:0",
7171
"--scaletest-prometheus-wait", "0s",
72+
"--ssh",
7273
)
7374
clitest.SetupConfig(t, client, root)
7475
var stdout, stderr bytes.Buffer

coderd/workspaceagents.go

Lines changed: 44 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -280,81 +280,61 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R
280280
level = append(level, parsedLevel)
281281
}
282282

283-
var logs []database.WorkspaceAgentStartupLog
284-
// Ensure logs are not written after script ended.
285-
scriptEndedError := xerrors.New("startup script has ended")
286-
err := api.Database.InTx(func(db database.Store) error {
287-
state, err := db.GetWorkspaceAgentLifecycleStateByID(ctx, workspaceAgent.ID)
288-
if err != nil {
289-
return xerrors.Errorf("workspace agent startup script status: %w", err)
283+
logs, err := api.Database.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
284+
AgentID: workspaceAgent.ID,
285+
CreatedAt: createdAt,
286+
Output: output,
287+
Level: level,
288+
OutputLength: int32(outputLength),
289+
})
290+
if err != nil {
291+
if !database.IsStartupLogsLimitError(err) {
292+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
293+
Message: "Failed to upload startup logs",
294+
Detail: err.Error(),
295+
})
296+
return
290297
}
291-
292-
if state.ReadyAt.Valid {
293-
// The agent startup script has already ended, so we don't want to
294-
// process any more logs.
295-
return scriptEndedError
298+
if workspaceAgent.StartupLogsOverflowed {
299+
httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
300+
Message: "Startup logs limit exceeded",
301+
Detail: err.Error(),
302+
})
303+
return
296304
}
297-
298-
logs, err = db.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
299-
AgentID: workspaceAgent.ID,
300-
CreatedAt: createdAt,
301-
Output: output,
302-
Level: level,
303-
OutputLength: int32(outputLength),
305+
err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{
306+
ID: workspaceAgent.ID,
307+
StartupLogsOverflowed: true,
304308
})
305-
return err
306-
}, nil)
307-
if err != nil {
308-
if errors.Is(err, scriptEndedError) {
309-
httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{
310-
Message: "Failed to upload logs, startup script has already ended.",
309+
if err != nil {
310+
// We don't want to return here, because the agent will retry
311+
// on failure and this isn't a huge deal. The overflow state
312+
// is just a hint to the user that the logs are incomplete.
313+
api.Logger.Warn(ctx, "failed to update workspace agent startup log overflow", slog.Error(err))
314+
}
315+
316+
resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID)
317+
if err != nil {
318+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
319+
Message: "Failed to get workspace resource.",
311320
Detail: err.Error(),
312321
})
313322
return
314323
}
315-
if database.IsStartupLogsLimitError(err) {
316-
if !workspaceAgent.StartupLogsOverflowed {
317-
err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{
318-
ID: workspaceAgent.ID,
319-
StartupLogsOverflowed: true,
320-
})
321-
if err != nil {
322-
// We don't want to return here, because the agent will retry
323-
// on failure and this isn't a huge deal. The overflow state
324-
// is just a hint to the user that the logs are incomplete.
325-
api.Logger.Warn(ctx, "failed to update workspace agent startup log overflow", slog.Error(err))
326-
}
327324

328-
resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID)
329-
if err != nil {
330-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
331-
Message: "Failed to get workspace resource.",
332-
Detail: err.Error(),
333-
})
334-
return
335-
}
336-
337-
build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
338-
if err != nil {
339-
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
340-
Message: "Internal error fetching workspace build job.",
341-
Detail: err.Error(),
342-
})
343-
return
344-
}
345-
346-
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
347-
}
348-
349-
httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
350-
Message: "Startup logs limit exceeded",
325+
build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
326+
if err != nil {
327+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
328+
Message: "Internal error fetching workspace build job.",
351329
Detail: err.Error(),
352330
})
353331
return
354332
}
355-
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
356-
Message: "Failed to upload startup logs",
357-
Detail: err.Error(),
333+
334+
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
335+
336+
httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
337+
Message: "Startup logs limit exceeded",
358338
})
359339
return
360340
}
@@ -497,18 +477,6 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
497477
return
498478
}
499479

500-
if workspaceAgent.ReadyAt.Valid {
501-
// Fast path, the startup script has finished running, so we can close
502-
// the connection.
503-
return
504-
}
505-
if !codersdk.WorkspaceAgentLifecycle(workspaceAgent.LifecycleState).Starting() {
506-
// Backwards compatibility: Avoid waiting forever in case this agent is
507-
// older than the current release and has already reported the ready
508-
// state.
509-
return
510-
}
511-
512480
lastSentLogID := after
513481
if len(logs) > 0 {
514482
lastSentLogID = logs[len(logs)-1].ID
@@ -543,11 +511,9 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
543511
t := time.NewTicker(recheckInterval)
544512
defer t.Stop()
545513

546-
var state database.GetWorkspaceAgentLifecycleStateByIDRow
547514
go func() {
548515
defer close(bufferedLogs)
549516

550-
var err error
551517
for {
552518
select {
553519
case <-ctx.Done():
@@ -557,17 +523,6 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
557523
t.Reset(recheckInterval)
558524
}
559525

560-
if !state.ReadyAt.Valid {
561-
state, err = api.Database.GetWorkspaceAgentLifecycleStateByID(ctx, workspaceAgent.ID)
562-
if err != nil {
563-
if xerrors.Is(err, context.Canceled) {
564-
return
565-
}
566-
logger.Warn(ctx, "failed to get workspace agent lifecycle state", slog.Error(err))
567-
continue
568-
}
569-
}
570-
571526
logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(ctx, database.GetWorkspaceAgentStartupLogsAfterParams{
572527
AgentID: workspaceAgent.ID,
573528
CreatedAfter: lastSentLogID,
@@ -580,9 +535,7 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
580535
continue
581536
}
582537
if len(logs) == 0 {
583-
if state.ReadyAt.Valid {
584-
return
585-
}
538+
// Just keep listening - more logs might come in the future!
586539
continue
587540
}
588541

@@ -1689,12 +1642,6 @@ func (api *API) workspaceAgentReportLifecycle(rw http.ResponseWriter, r *http.Re
16891642
return
16901643
}
16911644

1692-
if readyAt.Valid {
1693-
api.publishWorkspaceAgentStartupLogsUpdate(ctx, workspaceAgent.ID, agentsdk.StartupLogsNotifyMessage{
1694-
EndOfLogs: true,
1695-
})
1696-
}
1697-
16981645
api.publishWorkspaceUpdate(ctx, workspace.ID)
16991646

17001647
httpapi.Write(ctx, rw, http.StatusNoContent, nil)

0 commit comments

Comments
 (0)