Skip to content

Commit 530dd9d

Browse files
authored
fix(coderd): subscribe to workspace when streaming agent logs to detect outdated build (#9729)
Fixes #9721
1 parent 87d50f1 commit 530dd9d

File tree

3 files changed

+48
-25
lines changed

3 files changed

+48
-25
lines changed

coderd/workspaceagents.go

+46-3
Original file line numberDiff line numberDiff line change
@@ -570,11 +570,27 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
570570
lastSentLogID = logs[len(logs)-1].ID
571571
}
572572

573+
workspaceNotifyCh := make(chan struct{}, 1)
573574
notifyCh := make(chan struct{}, 1)
574575
// Allow us to immediately check if we missed any logs
575576
// between initial fetch and subscribe.
576577
notifyCh <- struct{}{}
577578

579+
// Subscribe to workspace to detect new builds.
580+
closeSubscribeWorkspace, err := api.Pubsub.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, _ []byte) {
581+
select {
582+
case workspaceNotifyCh <- struct{}{}:
583+
default:
584+
}
585+
})
586+
if err != nil {
587+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
588+
Message: "Failed to subscribe to workspace for log streaming.",
589+
Detail: err.Error(),
590+
})
591+
return
592+
}
593+
defer closeSubscribeWorkspace()
578594
// Subscribe early to prevent missing log events.
579595
closeSubscribe, err := api.Pubsub.Subscribe(agentsdk.LogsNotifyChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
580596
// The message is not important, we're tracking lastSentLogID manually.
@@ -585,7 +601,7 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
585601
})
586602
if err != nil {
587603
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
588-
Message: "Failed to subscribe to logs.",
604+
Message: "Failed to subscribe to agent for log streaming.",
589605
Detail: err.Error(),
590606
})
591607
return
@@ -600,20 +616,33 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
600616
defer t.Stop()
601617

602618
go func() {
603-
defer close(bufferedLogs)
619+
defer func() {
620+
logger.Debug(ctx, "end log streaming loop")
621+
close(bufferedLogs)
622+
}()
623+
logger.Debug(ctx, "start log streaming loop", slog.F("last_sent_log_id", lastSentLogID))
604624

605625
keepGoing := true
606626
for keepGoing {
627+
var (
628+
debugTriggeredBy string
629+
onlyCheckLatestBuild bool
630+
)
607631
select {
608632
case <-ctx.Done():
609633
return
610634
case <-t.C:
635+
debugTriggeredBy = "timer"
636+
case <-workspaceNotifyCh:
637+
debugTriggeredBy = "workspace"
638+
onlyCheckLatestBuild = true
611639
case <-notifyCh:
640+
debugTriggeredBy = "log"
612641
t.Reset(recheckInterval)
613642
}
614643

615644
agents, err := api.Database.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, workspace.ID)
616-
if err != nil {
645+
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
617646
if xerrors.Is(err, context.Canceled) {
618647
return
619648
}
@@ -624,6 +653,20 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
624653
// checking once.
625654
keepGoing = slices.ContainsFunc(agents, func(agent database.WorkspaceAgent) bool { return agent.ID == workspaceAgent.ID })
626655

656+
logger.Debug(
657+
ctx,
658+
"checking for new logs",
659+
slog.F("triggered_by", debugTriggeredBy),
660+
slog.F("only_check_latest_build", onlyCheckLatestBuild),
661+
slog.F("keep_going", keepGoing),
662+
slog.F("last_sent_log_id", lastSentLogID),
663+
slog.F("workspace_has_agents", len(agents) > 0),
664+
)
665+
666+
if onlyCheckLatestBuild && keepGoing {
667+
continue
668+
}
669+
627670
logs, err := api.Database.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
628671
AgentID: workspaceAgent.ID,
629672
CreatedAfter: lastSentLogID,

coderd/workspaceagents_test.go

+1-21
Original file line numberDiff line numberDiff line change
@@ -385,34 +385,14 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
385385
_ = closer.Close()
386386
}()
387387

388-
first := make(chan struct{})
389-
go func() {
390-
select {
391-
case <-ctx.Done():
392-
assert.Fail(t, "context done while waiting in goroutine")
393-
case <-logs:
394-
close(first)
395-
}
396-
}()
397388
select {
398389
case <-ctx.Done():
399390
require.FailNow(t, "context done while waiting for first log")
400-
case <-first:
391+
case <-logs:
401392
}
402393

403394
_ = coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart)
404395

405-
// Send a new log message to trigger a re-check.
406-
err = agentClient.PatchLogs(ctx, agentsdk.PatchLogs{
407-
Logs: []agentsdk.Log{
408-
{
409-
CreatedAt: dbtime.Now(),
410-
Output: "testing2",
411-
},
412-
},
413-
})
414-
require.NoError(t, err)
415-
416396
select {
417397
case <-ctx.Done():
418398
require.FailNow(t, "context done while waiting for logs close")

codersdk/workspaceagents.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ func (c *Client) WorkspaceAgentLogsAfter(ctx context.Context, agentID uuid.UUID,
703703
}
704704
return nil, nil, ReadBodyAsError(res)
705705
}
706-
logChunks := make(chan []WorkspaceAgentLog)
706+
logChunks := make(chan []WorkspaceAgentLog, 1)
707707
closed := make(chan struct{})
708708
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText)
709709
decoder := json.NewDecoder(wsNetConn)

0 commit comments

Comments
 (0)