diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index d92f614766c4b..82778ca119e12 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -570,11 +570,27 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { lastSentLogID = logs[len(logs)-1].ID } + workspaceNotifyCh := make(chan struct{}, 1) notifyCh := make(chan struct{}, 1) // Allow us to immediately check if we missed any logs // between initial fetch and subscribe. notifyCh <- struct{}{} + // Subscribe to workspace to detect new builds. + closeSubscribeWorkspace, err := api.Pubsub.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, _ []byte) { + select { + case workspaceNotifyCh <- struct{}{}: + default: + } + }) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Failed to subscribe to workspace for log streaming.", + Detail: err.Error(), + }) + return + } + defer closeSubscribeWorkspace() // Subscribe early to prevent missing log events. closeSubscribe, err := api.Pubsub.Subscribe(agentsdk.LogsNotifyChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) { // The message is not important, we're tracking lastSentLogID manually. @@ -585,7 +601,7 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { }) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ - Message: "Failed to subscribe to logs.", + Message: "Failed to subscribe to agent for log streaming.", Detail: err.Error(), }) return @@ -600,20 +616,33 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { defer t.Stop() go func() { - defer close(bufferedLogs) + defer func() { + logger.Debug(ctx, "end log streaming loop") + close(bufferedLogs) + }() + logger.Debug(ctx, "start log streaming loop", slog.F("last_sent_log_id", lastSentLogID)) keepGoing := true for keepGoing { + var ( + debugTriggeredBy string + onlyCheckLatestBuild bool + ) select { case <-ctx.Done(): return case <-t.C: + debugTriggeredBy = "timer" + case <-workspaceNotifyCh: + debugTriggeredBy = "workspace" + onlyCheckLatestBuild = true case <-notifyCh: + debugTriggeredBy = "log" t.Reset(recheckInterval) } agents, err := api.Database.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, workspace.ID) - if err != nil { + if err != nil && !xerrors.Is(err, sql.ErrNoRows) { if xerrors.Is(err, context.Canceled) { return } @@ -624,6 +653,20 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { // checking once. keepGoing = slices.ContainsFunc(agents, func(agent database.WorkspaceAgent) bool { return agent.ID == workspaceAgent.ID }) + logger.Debug( + ctx, + "checking for new logs", + slog.F("triggered_by", debugTriggeredBy), + slog.F("only_check_latest_build", onlyCheckLatestBuild), + slog.F("keep_going", keepGoing), + slog.F("last_sent_log_id", lastSentLogID), + slog.F("workspace_has_agents", len(agents) > 0), + ) + + if onlyCheckLatestBuild && keepGoing { + continue + } + logs, err := api.Database.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ AgentID: workspaceAgent.ID, CreatedAfter: lastSentLogID, diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index d900ead4175cf..1a0e5e2cd7b54 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -385,34 +385,14 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) { _ = closer.Close() }() - first := make(chan struct{}) - go func() { - select { - case <-ctx.Done(): - assert.Fail(t, "context done while waiting in goroutine") - case <-logs: - close(first) - } - }() select { case <-ctx.Done(): require.FailNow(t, "context done while waiting for first log") - case <-first: + case <-logs: } _ = coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart) - // Send a new log message to trigger a re-check. - err = agentClient.PatchLogs(ctx, agentsdk.PatchLogs{ - Logs: []agentsdk.Log{ - { - CreatedAt: dbtime.Now(), - Output: "testing2", - }, - }, - }) - require.NoError(t, err) - select { case <-ctx.Done(): require.FailNow(t, "context done while waiting for logs close") diff --git a/codersdk/workspaceagents.go b/codersdk/workspaceagents.go index 846134ba89f45..7f5caac54f657 100644 --- a/codersdk/workspaceagents.go +++ b/codersdk/workspaceagents.go @@ -703,7 +703,7 @@ func (c *Client) WorkspaceAgentLogsAfter(ctx context.Context, agentID uuid.UUID, } return nil, nil, ReadBodyAsError(res) } - logChunks := make(chan []WorkspaceAgentLog) + logChunks := make(chan []WorkspaceAgentLog, 1) closed := make(chan struct{}) ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText) decoder := json.NewDecoder(wsNetConn)