Skip to content

Commit 5bd19f8

Browse files
authored
fix: fix flake in TestWorkspaceAgentClientCoordinate_ResumeToken (#14642)
fixes #14365 I bet what's going on is that in `connectToCoordinatorAndFetchResumeToken()` we call `Coordinate()`, send a message on the `Coordinate` client and then close it in rapid succession. We don't wait around for a response from the coordinator, so dRPC is likely aborting the call `Coordinate()` in the backend because the stream is closed before it even gets a chance. Instead of using the Coordinator to record the peer ID assigned on the API call, we can wrap the resume token provider, since we call that API _and_ wait for a response. This also affords the opportunity to directly assert we get called with the right token.
1 parent 1b5f341 commit 5bd19f8

File tree

3 files changed

+44
-42
lines changed

3 files changed

+44
-42
lines changed

coderd/workspaceagents.go

+2
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,8 @@ func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.R
864864
})
865865
return
866866
}
867+
api.Logger.Debug(ctx, "accepted coordinate resume token for peer",
868+
slog.F("peer_id", peerID.String()))
867869
}
868870

869871
api.WebsocketWaitMutex.Lock()

coderd/workspaceagents_test.go

+40-42
Original file line numberDiff line numberDiff line change
@@ -513,30 +513,42 @@ func TestWorkspaceAgentClientCoordinate_BadVersion(t *testing.T) {
513513
require.Equal(t, "version", sdkErr.Validations[0].Field)
514514
}
515515

516-
type resumeTokenTestFakeCoordinator struct {
517-
tailnet.Coordinator
518-
t testing.TB
519-
peerIDCh chan uuid.UUID
516+
type resumeTokenRecordingProvider struct {
517+
tailnet.ResumeTokenProvider
518+
t testing.TB
519+
generateCalls chan uuid.UUID
520+
verifyCalls chan string
520521
}
521522

522-
var _ tailnet.Coordinator = &resumeTokenTestFakeCoordinator{}
523+
var _ tailnet.ResumeTokenProvider = &resumeTokenRecordingProvider{}
523524

524-
func (c *resumeTokenTestFakeCoordinator) storeID(id uuid.UUID) {
525-
select {
526-
case c.peerIDCh <- id:
527-
default:
528-
c.t.Fatal("peer ID channel full")
525+
func newResumeTokenRecordingProvider(t testing.TB, underlying tailnet.ResumeTokenProvider) *resumeTokenRecordingProvider {
526+
return &resumeTokenRecordingProvider{
527+
ResumeTokenProvider: underlying,
528+
t: t,
529+
generateCalls: make(chan uuid.UUID, 1),
530+
verifyCalls: make(chan string, 1),
529531
}
530532
}
531533

532-
func (c *resumeTokenTestFakeCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agentID uuid.UUID) error {
533-
c.storeID(id)
534-
return c.Coordinator.ServeClient(conn, id, agentID)
534+
func (r *resumeTokenRecordingProvider) GenerateResumeToken(peerID uuid.UUID) (*tailnetproto.RefreshResumeTokenResponse, error) {
535+
select {
536+
case r.generateCalls <- peerID:
537+
return r.ResumeTokenProvider.GenerateResumeToken(peerID)
538+
default:
539+
r.t.Error("generateCalls full")
540+
return nil, xerrors.New("generateCalls full")
541+
}
535542
}
536543

537-
func (c *resumeTokenTestFakeCoordinator) Coordinate(ctx context.Context, id uuid.UUID, name string, a tailnet.CoordinateeAuth) (chan<- *tailnetproto.CoordinateRequest, <-chan *tailnetproto.CoordinateResponse) {
538-
c.storeID(id)
539-
return c.Coordinator.Coordinate(ctx, id, name, a)
544+
func (r *resumeTokenRecordingProvider) VerifyResumeToken(token string) (uuid.UUID, error) {
545+
select {
546+
case r.verifyCalls <- token:
547+
return r.ResumeTokenProvider.VerifyResumeToken(token)
548+
default:
549+
r.t.Error("verifyCalls full")
550+
return uuid.Nil, xerrors.New("verifyCalls full")
551+
}
540552
}
541553

542554
func TestWorkspaceAgentClientCoordinate_ResumeToken(t *testing.T) {
@@ -546,15 +558,12 @@ func TestWorkspaceAgentClientCoordinate_ResumeToken(t *testing.T) {
546558
clock := quartz.NewMock(t)
547559
resumeTokenSigningKey, err := tailnet.GenerateResumeTokenSigningKey()
548560
require.NoError(t, err)
549-
resumeTokenProvider := tailnet.NewResumeTokenKeyProvider(resumeTokenSigningKey, clock, time.Hour)
550-
coordinator := &resumeTokenTestFakeCoordinator{
551-
Coordinator: tailnet.NewCoordinator(logger),
552-
t: t,
553-
peerIDCh: make(chan uuid.UUID, 1),
554-
}
555-
defer close(coordinator.peerIDCh)
561+
resumeTokenProvider := newResumeTokenRecordingProvider(
562+
t,
563+
tailnet.NewResumeTokenKeyProvider(resumeTokenSigningKey, clock, time.Hour),
564+
)
556565
client, closer, api := coderdtest.NewWithAPI(t, &coderdtest.Options{
557-
Coordinator: coordinator,
566+
Coordinator: tailnet.NewCoordinator(logger),
558567
CoordinatorResumeTokenProvider: resumeTokenProvider,
559568
})
560569
defer closer.Close()
@@ -576,15 +585,17 @@ func TestWorkspaceAgentClientCoordinate_ResumeToken(t *testing.T) {
576585
// random value.
577586
originalResumeToken, err := connectToCoordinatorAndFetchResumeToken(ctx, logger, client, agentAndBuild.WorkspaceAgent.ID, "")
578587
require.NoError(t, err)
579-
originalPeerID := testutil.RequireRecvCtx(ctx, t, coordinator.peerIDCh)
588+
originalPeerID := testutil.RequireRecvCtx(ctx, t, resumeTokenProvider.generateCalls)
580589
require.NotEqual(t, originalPeerID, uuid.Nil)
581590

582591
// Connect with a valid resume token, and ensure that the peer ID is set to
583592
// the stored value.
584593
clock.Advance(time.Second)
585594
newResumeToken, err := connectToCoordinatorAndFetchResumeToken(ctx, logger, client, agentAndBuild.WorkspaceAgent.ID, originalResumeToken)
586595
require.NoError(t, err)
587-
newPeerID := testutil.RequireRecvCtx(ctx, t, coordinator.peerIDCh)
596+
verifiedToken := testutil.RequireRecvCtx(ctx, t, resumeTokenProvider.verifyCalls)
597+
require.Equal(t, originalResumeToken, verifiedToken)
598+
newPeerID := testutil.RequireRecvCtx(ctx, t, resumeTokenProvider.generateCalls)
588599
require.Equal(t, originalPeerID, newPeerID)
589600
require.NotEqual(t, originalResumeToken, newResumeToken)
590601

@@ -598,9 +609,11 @@ func TestWorkspaceAgentClientCoordinate_ResumeToken(t *testing.T) {
598609
require.Equal(t, http.StatusUnauthorized, sdkErr.StatusCode())
599610
require.Len(t, sdkErr.Validations, 1)
600611
require.Equal(t, "resume_token", sdkErr.Validations[0].Field)
612+
verifiedToken = testutil.RequireRecvCtx(ctx, t, resumeTokenProvider.verifyCalls)
613+
require.Equal(t, "invalid", verifiedToken)
601614

602615
select {
603-
case <-coordinator.peerIDCh:
616+
case <-resumeTokenProvider.generateCalls:
604617
t.Fatal("unexpected peer ID in channel")
605618
default:
606619
}
@@ -646,21 +659,6 @@ func connectToCoordinatorAndFetchResumeToken(ctx context.Context, logger slog.Lo
646659
return "", xerrors.Errorf("new dRPC client: %w", err)
647660
}
648661

649-
// Send an empty coordination request. This will do nothing on the server,
650-
// but ensures our wrapped coordinator can record the peer ID.
651-
coordinateClient, err := rpcClient.Coordinate(ctx)
652-
if err != nil {
653-
return "", xerrors.Errorf("coordinate: %w", err)
654-
}
655-
err = coordinateClient.Send(&tailnetproto.CoordinateRequest{})
656-
if err != nil {
657-
return "", xerrors.Errorf("send empty coordination request: %w", err)
658-
}
659-
err = coordinateClient.Close()
660-
if err != nil {
661-
return "", xerrors.Errorf("close coordination request: %w", err)
662-
}
663-
664662
// Fetch a resume token.
665663
newResumeToken, err := rpcClient.RefreshResumeToken(ctx, &tailnetproto.RefreshResumeTokenRequest{})
666664
if err != nil {

tailnet/service.go

+2
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ func (s ClientService) ServeConnV2(ctx context.Context, conn net.Conn, streamID
119119
return xerrors.Errorf("yamux init failed: %w", err)
120120
}
121121
ctx = WithStreamID(ctx, streamID)
122+
s.Logger.Debug(ctx, "serving dRPC tailnet v2 API session",
123+
slog.F("peer_id", streamID.ID.String()))
122124
return s.drpc.Serve(ctx, session)
123125
}
124126

0 commit comments

Comments
 (0)