diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index b881e337906ba..f2d3acfb742dd 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -1434,9 +1434,6 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ }) } - // Send initial metadata. - sendMetadata(true) - // We debounce metadata updates to avoid overloading the frontend when // an agent is sending a lot of updates. pubsubDebounce := debounce.New(time.Second) @@ -1444,7 +1441,8 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ pubsubDebounce = debounce.New(time.Millisecond * 100) } - // Send metadata on updates. + // Send metadata on updates, we must ensure subscription before sending + // initial metadata to guarantee that events in-between are not missed. cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) { pubsubDebounce(func() { sendMetadata(true) @@ -1456,12 +1454,14 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ } defer cancelSub() + // Send initial metadata. + sendMetadata(true) + for { select { case <-senderClosed: return case <-refreshTicker.C: - break } // Avoid spamming the DB with reads we know there are no updates. We want diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 73b540ea5ac68..2a514385d94df 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -1268,11 +1268,6 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { var update []codersdk.WorkspaceAgentMetadata - check := func(want codersdk.WorkspaceAgentMetadataResult, got codersdk.WorkspaceAgentMetadata) { - require.Equal(t, want.Value, got.Result.Value) - require.Equal(t, want.Error, got.Result.Error) - } - wantMetadata1 := codersdk.WorkspaceAgentMetadataResult{ CollectedAt: time.Now(), Value: "bar", @@ -1285,17 +1280,38 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { recvUpdate := func() []codersdk.WorkspaceAgentMetadata { select { + case <-ctx.Done(): + t.Fatalf("context done: %v", ctx.Err()) case err := <-errors: t.Fatalf("error watching metadata: %v", err) - return nil case update := <-updates: return update } + return nil + } + + check := func(want codersdk.WorkspaceAgentMetadataResult, got codersdk.WorkspaceAgentMetadata, retry bool) { + // We can't trust the order of the updates due to timers and debounces, + // so let's check a few times more. + for i := 0; retry && i < 2 && (want.Value != got.Result.Value || want.Error != got.Result.Error); i++ { + update = recvUpdate() + for _, m := range update { + if m.Description.Key == got.Description.Key { + got = m + break + } + } + } + ok1 := assert.Equal(t, want.Value, got.Result.Value) + ok2 := assert.Equal(t, want.Error, got.Result.Error) + if !ok1 || !ok2 { + require.FailNow(t, "check failed") + } } update = recvUpdate() require.Len(t, update, 3) - check(wantMetadata1, update[0]) + check(wantMetadata1, update[0], false) // The second metadata result is not yet posted. require.Zero(t, update[1].Result.CollectedAt) @@ -1303,14 +1319,14 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { post("foo2", wantMetadata2) update = recvUpdate() require.Len(t, update, 3) - check(wantMetadata1, update[0]) - check(wantMetadata2, update[1]) + check(wantMetadata1, update[0], true) + check(wantMetadata2, update[1], true) wantMetadata1.Error = "error" post("foo1", wantMetadata1) update = recvUpdate() require.Len(t, update, 3) - check(wantMetadata1, update[0]) + check(wantMetadata1, update[0], true) const maxValueLen = 32 << 10 tooLongValueMetadata := wantMetadata1 @@ -1319,6 +1335,9 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { tooLongValueMetadata.CollectedAt = time.Now() post("foo3", tooLongValueMetadata) got := recvUpdate()[2] + for i := 0; i < 2 && len(got.Result.Value) != maxValueLen; i++ { + got = recvUpdate()[2] + } require.Len(t, got.Result.Value, maxValueLen) require.NotEmpty(t, got.Result.Error) diff --git a/codersdk/workspaceagents.go b/codersdk/workspaceagents.go index b99185bd7cc8d..34397e89c43f7 100644 --- a/codersdk/workspaceagents.go +++ b/codersdk/workspaceagents.go @@ -304,6 +304,7 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID) metadataChan := make(chan []WorkspaceAgentMetadata, 256) + ready := make(chan struct{}) watch := func() error { res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("/api/v2/workspaceagents/%s/watch-metadata", id), nil) if err != nil { @@ -316,12 +317,12 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID) nextEvent := ServerSentEventReader(ctx, res.Body) defer res.Body.Close() + firstEvent := true for { select { case <-ctx.Done(): return ctx.Err() default: - break } sse, err := nextEvent() @@ -329,6 +330,11 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID) return err } + if firstEvent { + close(ready) // Only close ready after the first event is received. + firstEvent = false + } + b, ok := sse.Data.([]byte) if !ok { return xerrors.Errorf("unexpected data type: %T", sse.Data) @@ -358,9 +364,18 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID) errorChan := make(chan error, 1) go func() { defer close(errorChan) - errorChan <- watch() + err := watch() + select { + case <-ready: + default: + close(ready) // Error before first event. + } + errorChan <- err }() + // Wait until first event is received and the subscription is registered. + <-ready + return metadataChan, errorChan }