From 96a912c237c48e69c721c1630306ea99698f0152 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 18 Sep 2023 11:19:37 +0000 Subject: [PATCH 1/3] fix(coderd): Subscribe to workspace when streaming agent logs to detect outdated build Fixes #9721 --- coderd/workspaceagents.go | 26 +++++++++++++++++++++++++- coderd/workspaceagents_test.go | 22 +--------------------- codersdk/workspaceagents.go | 2 +- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index d92f614766c4b..1944f579b5cc0 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -570,11 +570,28 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { lastSentLogID = logs[len(logs)-1].ID } + workspaceNotifyCh := make(chan struct{}, 1) notifyCh := make(chan struct{}, 1) // Allow us to immediately check if we missed any logs // between initial fetch and subscribe. notifyCh <- struct{}{} + // Subscribe to workspace to detect new builds. + closeSubscribeWorkspace, err := api.Pubsub.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, _ []byte) { + // The message is not important, we're tracking lastSentLogID manually. + select { + case workspaceNotifyCh <- struct{}{}: + default: + } + }) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Failed to subscribe to workspace for log streaming.", + Detail: err.Error(), + }) + return + } + defer closeSubscribeWorkspace() // Subscribe early to prevent missing log events. closeSubscribe, err := api.Pubsub.Subscribe(agentsdk.LogsNotifyChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) { // The message is not important, we're tracking lastSentLogID manually. @@ -585,7 +602,7 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { }) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ - Message: "Failed to subscribe to logs.", + Message: "Failed to subscribe to agent for log streaming.", Detail: err.Error(), }) return @@ -604,10 +621,13 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { keepGoing := true for keepGoing { + onlyCheckLatestBuild := false select { case <-ctx.Done(): return case <-t.C: + case <-workspaceNotifyCh: + onlyCheckLatestBuild = true case <-notifyCh: t.Reset(recheckInterval) } @@ -624,6 +644,10 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { // checking once. keepGoing = slices.ContainsFunc(agents, func(agent database.WorkspaceAgent) bool { return agent.ID == workspaceAgent.ID }) + if onlyCheckLatestBuild && keepGoing { + continue + } + logs, err := api.Database.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ AgentID: workspaceAgent.ID, CreatedAfter: lastSentLogID, diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index d900ead4175cf..1a0e5e2cd7b54 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -385,34 +385,14 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) { _ = closer.Close() }() - first := make(chan struct{}) - go func() { - select { - case <-ctx.Done(): - assert.Fail(t, "context done while waiting in goroutine") - case <-logs: - close(first) - } - }() select { case <-ctx.Done(): require.FailNow(t, "context done while waiting for first log") - case <-first: + case <-logs: } _ = coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart) - // Send a new log message to trigger a re-check. - err = agentClient.PatchLogs(ctx, agentsdk.PatchLogs{ - Logs: []agentsdk.Log{ - { - CreatedAt: dbtime.Now(), - Output: "testing2", - }, - }, - }) - require.NoError(t, err) - select { case <-ctx.Done(): require.FailNow(t, "context done while waiting for logs close") diff --git a/codersdk/workspaceagents.go b/codersdk/workspaceagents.go index 846134ba89f45..7f5caac54f657 100644 --- a/codersdk/workspaceagents.go +++ b/codersdk/workspaceagents.go @@ -703,7 +703,7 @@ func (c *Client) WorkspaceAgentLogsAfter(ctx context.Context, agentID uuid.UUID, } return nil, nil, ReadBodyAsError(res) } - logChunks := make(chan []WorkspaceAgentLog) + logChunks := make(chan []WorkspaceAgentLog, 1) closed := make(chan struct{}) ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText) decoder := json.NewDecoder(wsNetConn) From 2de6c3a3df6f4df65cc97ab1406ad24f7a986aef Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 18 Sep 2023 14:30:17 +0300 Subject: [PATCH 2/3] Update workspaceagents.go --- coderd/workspaceagents.go | 1 - 1 file changed, 1 deletion(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 1944f579b5cc0..b2af79e92240b 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -578,7 +578,6 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { // Subscribe to workspace to detect new builds. closeSubscribeWorkspace, err := api.Pubsub.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, _ []byte) { - // The message is not important, we're tracking lastSentLogID manually. select { case workspaceNotifyCh <- struct{}{}: default: From e0c0a705e075e81535ae2c44e9dd5f5f368826b4 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Tue, 19 Sep 2023 12:36:09 +0000 Subject: [PATCH 3/3] add debug logs and sql no rows handling --- coderd/workspaceagents.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index b2af79e92240b..82778ca119e12 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -616,23 +616,33 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { defer t.Stop() go func() { - defer close(bufferedLogs) + defer func() { + logger.Debug(ctx, "end log streaming loop") + close(bufferedLogs) + }() + logger.Debug(ctx, "start log streaming loop", slog.F("last_sent_log_id", lastSentLogID)) keepGoing := true for keepGoing { - onlyCheckLatestBuild := false + var ( + debugTriggeredBy string + onlyCheckLatestBuild bool + ) select { case <-ctx.Done(): return case <-t.C: + debugTriggeredBy = "timer" case <-workspaceNotifyCh: + debugTriggeredBy = "workspace" onlyCheckLatestBuild = true case <-notifyCh: + debugTriggeredBy = "log" t.Reset(recheckInterval) } agents, err := api.Database.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, workspace.ID) - if err != nil { + if err != nil && !xerrors.Is(err, sql.ErrNoRows) { if xerrors.Is(err, context.Canceled) { return } @@ -643,6 +653,16 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) { // checking once. keepGoing = slices.ContainsFunc(agents, func(agent database.WorkspaceAgent) bool { return agent.ID == workspaceAgent.ID }) + logger.Debug( + ctx, + "checking for new logs", + slog.F("triggered_by", debugTriggeredBy), + slog.F("only_check_latest_build", onlyCheckLatestBuild), + slog.F("keep_going", keepGoing), + slog.F("last_sent_log_id", lastSentLogID), + slog.F("workspace_has_agents", len(agents) > 0), + ) + if onlyCheckLatestBuild && keepGoing { continue }