Skip to content

Commit c0a01ec

Browse files
authored
fix: fix TestPGCoordinatorDual_Mainline flake (coder#8228)
* fix TestPGCoordinatorDual_Mainline flake Signed-off-by: Spike Curtis <spike@coder.com> * use slices.Contains instead of local function Signed-off-by: Spike Curtis <spike@coder.com> --------- Signed-off-by: Spike Curtis <spike@coder.com>
1 parent df95cf7 commit c0a01ec

File tree

2 files changed

+47
-59
lines changed

2 files changed

+47
-59
lines changed

enterprise/tailnet/pgcoord_test.go

Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
1616
"go.uber.org/goleak"
17+
"golang.org/x/exp/slices"
1718
"golang.org/x/xerrors"
1819

1920
"cdr.dev/slog"
@@ -203,11 +204,9 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
203204
client := newTestClient(t, coordinator, agent.id)
204205
defer client.close()
205206

206-
nodes := client.recvNodes(ctx, t)
207-
assertHasDERPs(t, nodes, 10)
207+
assertEventuallyHasDERPs(ctx, t, client, 10)
208208
client.sendNode(&agpl.Node{PreferredDERP: 11})
209-
nodes = agent.recvNodes(ctx, t)
210-
assertHasDERPs(t, nodes, 11)
209+
assertEventuallyHasDERPs(ctx, t, agent, 11)
211210

212211
// simulate a second coordinator via DB calls only --- our goal is to test broken heart-beating, so we can't use a
213212
// real coordinator
@@ -233,8 +232,7 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
233232
}()
234233
fCoord2.heartbeat()
235234
fCoord2.agentNode(agent.id, &agpl.Node{PreferredDERP: 12})
236-
nodes = client.recvNodes(ctx, t)
237-
assertHasDERPs(t, nodes, 12)
235+
assertEventuallyHasDERPs(ctx, t, client, 12)
238236

239237
fCoord3 := &fakeCoordinator{
240238
ctx: ctx,
@@ -245,24 +243,20 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
245243
start := time.Now()
246244
fCoord3.heartbeat()
247245
fCoord3.agentNode(agent.id, &agpl.Node{PreferredDERP: 13})
248-
nodes = client.recvNodes(ctx, t)
249-
assertHasDERPs(t, nodes, 13)
246+
assertEventuallyHasDERPs(ctx, t, client, 13)
250247

251248
// when the fCoord3 misses enough heartbeats, the real coordinator should send an update with the
252249
// node from fCoord2 for the agent.
253-
nodes = client.recvNodes(ctx, t)
250+
assertEventuallyHasDERPs(ctx, t, client, 12)
254251
assert.Greater(t, time.Since(start), tailnet.HeartbeatPeriod*tailnet.MissedHeartbeats)
255-
assertHasDERPs(t, nodes, 12)
256252

257253
// stop fCoord2 heartbeats, which should cause us to revert to the original agent mapping
258254
cancel2()
259-
nodes = client.recvNodes(ctx, t)
260-
assertHasDERPs(t, nodes, 10)
255+
assertEventuallyHasDERPs(ctx, t, client, 10)
261256

262257
// send fCoord3 heartbeat, which should trigger us to consider that mapping valid again.
263258
fCoord3.heartbeat()
264-
nodes = client.recvNodes(ctx, t)
265-
assertHasDERPs(t, nodes, 13)
259+
assertEventuallyHasDERPs(ctx, t, client, 13)
266260

267261
err = agent.close()
268262
require.NoError(t, err)
@@ -358,33 +352,24 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
358352
defer client22.close()
359353

360354
client11.sendNode(&agpl.Node{PreferredDERP: 11})
361-
nodes := agent1.recvNodes(ctx, t)
362-
assert.Len(t, nodes, 1)
363-
assertHasDERPs(t, nodes, 11)
355+
assertEventuallyHasDERPs(ctx, t, agent1, 11)
364356

365357
client21.sendNode(&agpl.Node{PreferredDERP: 21})
366-
nodes = agent1.recvNodes(ctx, t)
367-
assertHasDERPs(t, nodes, 21, 11)
358+
assertEventuallyHasDERPs(ctx, t, agent1, 21, 11)
368359

369360
client22.sendNode(&agpl.Node{PreferredDERP: 22})
370-
nodes = agent2.recvNodes(ctx, t)
371-
assertHasDERPs(t, nodes, 22)
361+
assertEventuallyHasDERPs(ctx, t, agent2, 22)
372362

373363
agent2.sendNode(&agpl.Node{PreferredDERP: 2})
374-
nodes = client22.recvNodes(ctx, t)
375-
assertHasDERPs(t, nodes, 2)
376-
nodes = client12.recvNodes(ctx, t)
377-
assertHasDERPs(t, nodes, 2)
364+
assertEventuallyHasDERPs(ctx, t, client22, 2)
365+
assertEventuallyHasDERPs(ctx, t, client12, 2)
378366

379367
client12.sendNode(&agpl.Node{PreferredDERP: 12})
380-
nodes = agent2.recvNodes(ctx, t)
381-
assertHasDERPs(t, nodes, 12, 22)
368+
assertEventuallyHasDERPs(ctx, t, agent2, 12, 22)
382369

383370
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
384-
nodes = client21.recvNodes(ctx, t)
385-
assertHasDERPs(t, nodes, 1)
386-
nodes = client11.recvNodes(ctx, t)
387-
assertHasDERPs(t, nodes, 1)
371+
assertEventuallyHasDERPs(ctx, t, client21, 1)
372+
assertEventuallyHasDERPs(ctx, t, client11, 1)
388373

389374
// let's close coord2
390375
err = coord2.Close()
@@ -402,8 +387,7 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
402387
// In this case the update is superfluous because client11's node hasn't changed, and agents don't deprogram clients
403388
// from the dataplane even if they are missing. Suppressing this kind of update would require the coordinator to
404389
// store all the data its sent to each connection, so we don't bother.
405-
nodes = agent1.recvNodes(ctx, t)
406-
assertHasDERPs(t, nodes, 11)
390+
assertEventuallyHasDERPs(ctx, t, agent1, 11)
407391

408392
// note that although agent2 is disconnected, client12 does NOT get an update because we suppress empty updates.
409393
// (Its easy to tell these are superfluous.)
@@ -492,36 +476,29 @@ func TestPGCoordinator_MultiAgent(t *testing.T) {
492476
defer client.close()
493477

494478
client.sendNode(&agpl.Node{PreferredDERP: 3})
495-
nodes := agent1.recvNodes(ctx, t)
496-
assertHasDERPs(t, nodes, 3)
497-
nodes = agent2.recvNodes(ctx, t)
498-
assertHasDERPs(t, nodes, 3)
479+
assertEventuallyHasDERPs(ctx, t, agent1, 3)
480+
assertEventuallyHasDERPs(ctx, t, agent2, 3)
499481

500482
agent1.sendNode(&agpl.Node{PreferredDERP: 1})
501-
nodes = client.recvNodes(ctx, t)
502-
assertHasDERPs(t, nodes, 1)
483+
assertEventuallyHasDERPs(ctx, t, client, 1)
503484

504485
// agent2's update overrides agent1 because it is newer
505486
agent2.sendNode(&agpl.Node{PreferredDERP: 2})
506-
nodes = client.recvNodes(ctx, t)
507-
assertHasDERPs(t, nodes, 2)
487+
assertEventuallyHasDERPs(ctx, t, client, 2)
508488

509489
// agent2 disconnects, and we should revert back to agent1
510490
err = agent2.close()
511491
require.NoError(t, err)
512492
err = agent2.recvErr(ctx, t)
513493
require.ErrorIs(t, err, io.ErrClosedPipe)
514494
agent2.waitForClose(ctx, t)
515-
nodes = client.recvNodes(ctx, t)
516-
assertHasDERPs(t, nodes, 1)
495+
assertEventuallyHasDERPs(ctx, t, client, 1)
517496

518497
agent1.sendNode(&agpl.Node{PreferredDERP: 11})
519-
nodes = client.recvNodes(ctx, t)
520-
assertHasDERPs(t, nodes, 11)
498+
assertEventuallyHasDERPs(ctx, t, client, 11)
521499

522500
client.sendNode(&agpl.Node{PreferredDERP: 31})
523-
nodes = agent1.recvNodes(ctx, t)
524-
assertHasDERPs(t, nodes, 31)
501+
assertEventuallyHasDERPs(ctx, t, agent1, 31)
525502

526503
err = agent1.close()
527504
require.NoError(t, err)
@@ -625,17 +602,27 @@ func newTestClient(t *testing.T, coord agpl.Coordinator, agentID uuid.UUID, id .
625602
return c
626603
}
627604

628-
func assertHasDERPs(t *testing.T, nodes []*agpl.Node, expected ...int) {
629-
if !assert.Len(t, nodes, len(expected), "expected %d node(s), got %d", len(expected), len(nodes)) {
605+
func assertEventuallyHasDERPs(ctx context.Context, t *testing.T, c *testConn, expected ...int) {
606+
t.Helper()
607+
for {
608+
nodes := c.recvNodes(ctx, t)
609+
if len(nodes) != len(expected) {
610+
t.Logf("expected %d, got %d nodes", len(expected), len(nodes))
611+
continue
612+
}
613+
614+
derps := make([]int, 0, len(nodes))
615+
for _, n := range nodes {
616+
derps = append(derps, n.PreferredDERP)
617+
}
618+
for _, e := range expected {
619+
if !slices.Contains(derps, e) {
620+
t.Logf("expected DERP %d to be in %v", e, derps)
621+
continue
622+
}
623+
}
630624
return
631625
}
632-
derps := make([]int, 0, len(nodes))
633-
for _, n := range nodes {
634-
derps = append(derps, n.PreferredDERP)
635-
}
636-
for _, e := range expected {
637-
assert.Contains(t, derps, e, "expected DERP %v, got %v", e, derps)
638-
}
639626
}
640627

641628
func assertEventuallyNoAgents(ctx context.Context, t *testing.T, store database.Store, agentID uuid.UUID) {

tailnet/coordinator.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func (t *TrackedConn) SendUpdates() {
227227
return
228228
}
229229
if bytes.Equal(t.lastData, data) {
230-
t.logger.Debug(t.ctx, "skipping duplicate update", slog.F("nodes", nodes))
230+
t.logger.Debug(t.ctx, "skipping duplicate update", slog.F("nodes", string(data)))
231231
continue
232232
}
233233

@@ -243,11 +243,12 @@ func (t *TrackedConn) SendUpdates() {
243243
_, err = t.conn.Write(data)
244244
if err != nil {
245245
// often, this is just because the connection is closed/broken, so only log at debug.
246-
t.logger.Debug(t.ctx, "could not write nodes to connection", slog.Error(err), slog.F("nodes", nodes))
246+
t.logger.Debug(t.ctx, "could not write nodes to connection",
247+
slog.Error(err), slog.F("nodes", string(data)))
247248
_ = t.Close()
248249
return
249250
}
250-
t.logger.Debug(t.ctx, "wrote nodes", slog.F("nodes", nodes))
251+
t.logger.Debug(t.ctx, "wrote nodes", slog.F("nodes", string(data)))
251252

252253
// nhooyr.io/websocket has a bugged implementation of deadlines on a websocket net.Conn. What they are
253254
// *supposed* to do is set a deadline for any subsequent writes to complete, otherwise the call to Write()

0 commit comments

Comments
 (0)