From 295dbdddfacab429ac972cd2ae2790399431caf7 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Fri, 9 Jun 2023 13:16:49 +0000 Subject: [PATCH 1/4] test(coderd): Fix TestWorkspaceAgent_Metadata flake --- coderd/workspaceagents.go | 1 - coderd/workspaceagents_test.go | 22 ++++++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index b881e337906ba..bdb26198b8261 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -1461,7 +1461,6 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ case <-senderClosed: return case <-refreshTicker.C: - break } // Avoid spamming the DB with reads we know there are no updates. We want diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 73b540ea5ac68..727403007c22a 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -1268,11 +1268,6 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { var update []codersdk.WorkspaceAgentMetadata - check := func(want codersdk.WorkspaceAgentMetadataResult, got codersdk.WorkspaceAgentMetadata) { - require.Equal(t, want.Value, got.Result.Value) - require.Equal(t, want.Error, got.Result.Error) - } - wantMetadata1 := codersdk.WorkspaceAgentMetadataResult{ CollectedAt: time.Now(), Value: "bar", @@ -1285,12 +1280,27 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { recvUpdate := func() []codersdk.WorkspaceAgentMetadata { select { + case <-ctx.Done(): + t.Fatalf("context done: %v", ctx.Err()) case err := <-errors: t.Fatalf("error watching metadata: %v", err) - return nil case update := <-updates: return update } + return nil + } + + check := func(want codersdk.WorkspaceAgentMetadataResult, got codersdk.WorkspaceAgentMetadata) { + // We can't trust the order of the updates due to timers and debounces, + // so let's check a few times once more. + for i := 0; i < 2 && (want.Value != got.Result.Value || want.Error != got.Result.Error); i++ { + recvUpdate() + } + ok1 := assert.Equal(t, want.Value, got.Result.Value) + ok2 := assert.Equal(t, want.Error, got.Result.Error) + if !ok1 || !ok2 { + require.FailNow(t, "check failed") + } } update = recvUpdate() From 3e908d75b43bc4f7b615d69970ea4e62b42e2336 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Fri, 9 Jun 2023 14:33:35 +0000 Subject: [PATCH 2/4] Fix update of got --- coderd/workspaceagents_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 727403007c22a..8f62a619c37b1 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -1294,7 +1294,13 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { // We can't trust the order of the updates due to timers and debounces, // so let's check a few times once more. for i := 0; i < 2 && (want.Value != got.Result.Value || want.Error != got.Result.Error); i++ { - recvUpdate() + update = recvUpdate() + for _, m := range update { + if m.Description.Key == got.Description.Key { + got = m + break + } + } } ok1 := assert.Equal(t, want.Value, got.Result.Value) ok2 := assert.Equal(t, want.Error, got.Result.Error) From 89f3db5e83a65669140cea741b250b04b461fe3a Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 12 Jun 2023 10:29:40 +0000 Subject: [PATCH 3/4] fix(codersdk): Wait for subscription in WatchWorkspaceAgentMetadata --- coderd/workspaceagents_test.go | 3 +++ codersdk/workspaceagents.go | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 8f62a619c37b1..e653e1e0c6d34 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -1335,6 +1335,9 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { tooLongValueMetadata.CollectedAt = time.Now() post("foo3", tooLongValueMetadata) got := recvUpdate()[2] + for len(got.Result.Value) != maxValueLen { + got = recvUpdate()[2] + } require.Len(t, got.Result.Value, maxValueLen) require.NotEmpty(t, got.Result.Error) diff --git a/codersdk/workspaceagents.go b/codersdk/workspaceagents.go index b99185bd7cc8d..34397e89c43f7 100644 --- a/codersdk/workspaceagents.go +++ b/codersdk/workspaceagents.go @@ -304,6 +304,7 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID) metadataChan := make(chan []WorkspaceAgentMetadata, 256) + ready := make(chan struct{}) watch := func() error { res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("/api/v2/workspaceagents/%s/watch-metadata", id), nil) if err != nil { @@ -316,12 +317,12 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID) nextEvent := ServerSentEventReader(ctx, res.Body) defer res.Body.Close() + firstEvent := true for { select { case <-ctx.Done(): return ctx.Err() default: - break } sse, err := nextEvent() @@ -329,6 +330,11 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID) return err } + if firstEvent { + close(ready) // Only close ready after the first event is received. + firstEvent = false + } + b, ok := sse.Data.([]byte) if !ok { return xerrors.Errorf("unexpected data type: %T", sse.Data) @@ -358,9 +364,18 @@ func (c *Client) WatchWorkspaceAgentMetadata(ctx context.Context, id uuid.UUID) errorChan := make(chan error, 1) go func() { defer close(errorChan) - errorChan <- watch() + err := watch() + select { + case <-ready: + default: + close(ready) // Error before first event. + } + errorChan <- err }() + // Wait until first event is received and the subscription is registered. + <-ready + return metadataChan, errorChan } From 2a53d6ac63a2ee222bcc1e38e4988e1a4b6cd0a4 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 12 Jun 2023 10:47:38 +0000 Subject: [PATCH 4/4] fix(coderd): Subscribe before sending initial metadata event --- coderd/workspaceagents.go | 9 +++++---- coderd/workspaceagents_test.go | 16 ++++++++-------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index bdb26198b8261..f2d3acfb742dd 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -1434,9 +1434,6 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ }) } - // Send initial metadata. - sendMetadata(true) - // We debounce metadata updates to avoid overloading the frontend when // an agent is sending a lot of updates. pubsubDebounce := debounce.New(time.Second) @@ -1444,7 +1441,8 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ pubsubDebounce = debounce.New(time.Millisecond * 100) } - // Send metadata on updates. + // Send metadata on updates, we must ensure subscription before sending + // initial metadata to guarantee that events in-between are not missed. cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) { pubsubDebounce(func() { sendMetadata(true) @@ -1456,6 +1454,9 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ } defer cancelSub() + // Send initial metadata. + sendMetadata(true) + for { select { case <-senderClosed: diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index e653e1e0c6d34..2a514385d94df 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -1290,10 +1290,10 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { return nil } - check := func(want codersdk.WorkspaceAgentMetadataResult, got codersdk.WorkspaceAgentMetadata) { + check := func(want codersdk.WorkspaceAgentMetadataResult, got codersdk.WorkspaceAgentMetadata, retry bool) { // We can't trust the order of the updates due to timers and debounces, - // so let's check a few times once more. - for i := 0; i < 2 && (want.Value != got.Result.Value || want.Error != got.Result.Error); i++ { + // so let's check a few times more. + for i := 0; retry && i < 2 && (want.Value != got.Result.Value || want.Error != got.Result.Error); i++ { update = recvUpdate() for _, m := range update { if m.Description.Key == got.Description.Key { @@ -1311,7 +1311,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { update = recvUpdate() require.Len(t, update, 3) - check(wantMetadata1, update[0]) + check(wantMetadata1, update[0], false) // The second metadata result is not yet posted. require.Zero(t, update[1].Result.CollectedAt) @@ -1319,14 +1319,14 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { post("foo2", wantMetadata2) update = recvUpdate() require.Len(t, update, 3) - check(wantMetadata1, update[0]) - check(wantMetadata2, update[1]) + check(wantMetadata1, update[0], true) + check(wantMetadata2, update[1], true) wantMetadata1.Error = "error" post("foo1", wantMetadata1) update = recvUpdate() require.Len(t, update, 3) - check(wantMetadata1, update[0]) + check(wantMetadata1, update[0], true) const maxValueLen = 32 << 10 tooLongValueMetadata := wantMetadata1 @@ -1335,7 +1335,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { tooLongValueMetadata.CollectedAt = time.Now() post("foo3", tooLongValueMetadata) got := recvUpdate()[2] - for len(got.Result.Value) != maxValueLen { + for i := 0; i < 2 && len(got.Result.Value) != maxValueLen; i++ { got = recvUpdate()[2] } require.Len(t, got.Result.Value, maxValueLen)