@@ -14,6 +14,7 @@ import (
14
14
"github.com/stretchr/testify/assert"
15
15
"github.com/stretchr/testify/require"
16
16
"go.uber.org/goleak"
17
+ "golang.org/x/exp/slices"
17
18
"golang.org/x/xerrors"
18
19
19
20
"cdr.dev/slog"
@@ -203,11 +204,9 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
203
204
client := newTestClient (t , coordinator , agent .id )
204
205
defer client .close ()
205
206
206
- nodes := client .recvNodes (ctx , t )
207
- assertHasDERPs (t , nodes , 10 )
207
+ assertEventuallyHasDERPs (ctx , t , client , 10 )
208
208
client .sendNode (& agpl.Node {PreferredDERP : 11 })
209
- nodes = agent .recvNodes (ctx , t )
210
- assertHasDERPs (t , nodes , 11 )
209
+ assertEventuallyHasDERPs (ctx , t , agent , 11 )
211
210
212
211
// simulate a second coordinator via DB calls only --- our goal is to test broken heart-beating, so we can't use a
213
212
// real coordinator
@@ -233,8 +232,7 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
233
232
}()
234
233
fCoord2 .heartbeat ()
235
234
fCoord2 .agentNode (agent .id , & agpl.Node {PreferredDERP : 12 })
236
- nodes = client .recvNodes (ctx , t )
237
- assertHasDERPs (t , nodes , 12 )
235
+ assertEventuallyHasDERPs (ctx , t , client , 12 )
238
236
239
237
fCoord3 := & fakeCoordinator {
240
238
ctx : ctx ,
@@ -245,24 +243,20 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
245
243
start := time .Now ()
246
244
fCoord3 .heartbeat ()
247
245
fCoord3 .agentNode (agent .id , & agpl.Node {PreferredDERP : 13 })
248
- nodes = client .recvNodes (ctx , t )
249
- assertHasDERPs (t , nodes , 13 )
246
+ assertEventuallyHasDERPs (ctx , t , client , 13 )
250
247
251
248
// when the fCoord3 misses enough heartbeats, the real coordinator should send an update with the
252
249
// node from fCoord2 for the agent.
253
- nodes = client . recvNodes (ctx , t )
250
+ assertEventuallyHasDERPs (ctx , t , client , 12 )
254
251
assert .Greater (t , time .Since (start ), tailnet .HeartbeatPeriod * tailnet .MissedHeartbeats )
255
- assertHasDERPs (t , nodes , 12 )
256
252
257
253
// stop fCoord2 heartbeats, which should cause us to revert to the original agent mapping
258
254
cancel2 ()
259
- nodes = client .recvNodes (ctx , t )
260
- assertHasDERPs (t , nodes , 10 )
255
+ assertEventuallyHasDERPs (ctx , t , client , 10 )
261
256
262
257
// send fCoord3 heartbeat, which should trigger us to consider that mapping valid again.
263
258
fCoord3 .heartbeat ()
264
- nodes = client .recvNodes (ctx , t )
265
- assertHasDERPs (t , nodes , 13 )
259
+ assertEventuallyHasDERPs (ctx , t , client , 13 )
266
260
267
261
err = agent .close ()
268
262
require .NoError (t , err )
@@ -358,33 +352,24 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
358
352
defer client22 .close ()
359
353
360
354
client11 .sendNode (& agpl.Node {PreferredDERP : 11 })
361
- nodes := agent1 .recvNodes (ctx , t )
362
- assert .Len (t , nodes , 1 )
363
- assertHasDERPs (t , nodes , 11 )
355
+ assertEventuallyHasDERPs (ctx , t , agent1 , 11 )
364
356
365
357
client21 .sendNode (& agpl.Node {PreferredDERP : 21 })
366
- nodes = agent1 .recvNodes (ctx , t )
367
- assertHasDERPs (t , nodes , 21 , 11 )
358
+ assertEventuallyHasDERPs (ctx , t , agent1 , 21 , 11 )
368
359
369
360
client22 .sendNode (& agpl.Node {PreferredDERP : 22 })
370
- nodes = agent2 .recvNodes (ctx , t )
371
- assertHasDERPs (t , nodes , 22 )
361
+ assertEventuallyHasDERPs (ctx , t , agent2 , 22 )
372
362
373
363
agent2 .sendNode (& agpl.Node {PreferredDERP : 2 })
374
- nodes = client22 .recvNodes (ctx , t )
375
- assertHasDERPs (t , nodes , 2 )
376
- nodes = client12 .recvNodes (ctx , t )
377
- assertHasDERPs (t , nodes , 2 )
364
+ assertEventuallyHasDERPs (ctx , t , client22 , 2 )
365
+ assertEventuallyHasDERPs (ctx , t , client12 , 2 )
378
366
379
367
client12 .sendNode (& agpl.Node {PreferredDERP : 12 })
380
- nodes = agent2 .recvNodes (ctx , t )
381
- assertHasDERPs (t , nodes , 12 , 22 )
368
+ assertEventuallyHasDERPs (ctx , t , agent2 , 12 , 22 )
382
369
383
370
agent1 .sendNode (& agpl.Node {PreferredDERP : 1 })
384
- nodes = client21 .recvNodes (ctx , t )
385
- assertHasDERPs (t , nodes , 1 )
386
- nodes = client11 .recvNodes (ctx , t )
387
- assertHasDERPs (t , nodes , 1 )
371
+ assertEventuallyHasDERPs (ctx , t , client21 , 1 )
372
+ assertEventuallyHasDERPs (ctx , t , client11 , 1 )
388
373
389
374
// let's close coord2
390
375
err = coord2 .Close ()
@@ -402,8 +387,7 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
402
387
// In this case the update is superfluous because client11's node hasn't changed, and agents don't deprogram clients
403
388
// from the dataplane even if they are missing. Suppressing this kind of update would require the coordinator to
404
389
// store all the data its sent to each connection, so we don't bother.
405
- nodes = agent1 .recvNodes (ctx , t )
406
- assertHasDERPs (t , nodes , 11 )
390
+ assertEventuallyHasDERPs (ctx , t , agent1 , 11 )
407
391
408
392
// note that although agent2 is disconnected, client12 does NOT get an update because we suppress empty updates.
409
393
// (Its easy to tell these are superfluous.)
@@ -492,36 +476,29 @@ func TestPGCoordinator_MultiAgent(t *testing.T) {
492
476
defer client .close ()
493
477
494
478
client .sendNode (& agpl.Node {PreferredDERP : 3 })
495
- nodes := agent1 .recvNodes (ctx , t )
496
- assertHasDERPs (t , nodes , 3 )
497
- nodes = agent2 .recvNodes (ctx , t )
498
- assertHasDERPs (t , nodes , 3 )
479
+ assertEventuallyHasDERPs (ctx , t , agent1 , 3 )
480
+ assertEventuallyHasDERPs (ctx , t , agent2 , 3 )
499
481
500
482
agent1 .sendNode (& agpl.Node {PreferredDERP : 1 })
501
- nodes = client .recvNodes (ctx , t )
502
- assertHasDERPs (t , nodes , 1 )
483
+ assertEventuallyHasDERPs (ctx , t , client , 1 )
503
484
504
485
// agent2's update overrides agent1 because it is newer
505
486
agent2 .sendNode (& agpl.Node {PreferredDERP : 2 })
506
- nodes = client .recvNodes (ctx , t )
507
- assertHasDERPs (t , nodes , 2 )
487
+ assertEventuallyHasDERPs (ctx , t , client , 2 )
508
488
509
489
// agent2 disconnects, and we should revert back to agent1
510
490
err = agent2 .close ()
511
491
require .NoError (t , err )
512
492
err = agent2 .recvErr (ctx , t )
513
493
require .ErrorIs (t , err , io .ErrClosedPipe )
514
494
agent2 .waitForClose (ctx , t )
515
- nodes = client .recvNodes (ctx , t )
516
- assertHasDERPs (t , nodes , 1 )
495
+ assertEventuallyHasDERPs (ctx , t , client , 1 )
517
496
518
497
agent1 .sendNode (& agpl.Node {PreferredDERP : 11 })
519
- nodes = client .recvNodes (ctx , t )
520
- assertHasDERPs (t , nodes , 11 )
498
+ assertEventuallyHasDERPs (ctx , t , client , 11 )
521
499
522
500
client .sendNode (& agpl.Node {PreferredDERP : 31 })
523
- nodes = agent1 .recvNodes (ctx , t )
524
- assertHasDERPs (t , nodes , 31 )
501
+ assertEventuallyHasDERPs (ctx , t , agent1 , 31 )
525
502
526
503
err = agent1 .close ()
527
504
require .NoError (t , err )
@@ -625,17 +602,27 @@ func newTestClient(t *testing.T, coord agpl.Coordinator, agentID uuid.UUID, id .
625
602
return c
626
603
}
627
604
628
- func assertHasDERPs (t * testing.T , nodes []* agpl.Node , expected ... int ) {
629
- if ! assert .Len (t , nodes , len (expected ), "expected %d node(s), got %d" , len (expected ), len (nodes )) {
605
+ func assertEventuallyHasDERPs (ctx context.Context , t * testing.T , c * testConn , expected ... int ) {
606
+ t .Helper ()
607
+ for {
608
+ nodes := c .recvNodes (ctx , t )
609
+ if len (nodes ) != len (expected ) {
610
+ t .Logf ("expected %d, got %d nodes" , len (expected ), len (nodes ))
611
+ continue
612
+ }
613
+
614
+ derps := make ([]int , 0 , len (nodes ))
615
+ for _ , n := range nodes {
616
+ derps = append (derps , n .PreferredDERP )
617
+ }
618
+ for _ , e := range expected {
619
+ if ! slices .Contains (derps , e ) {
620
+ t .Logf ("expected DERP %d to be in %v" , e , derps )
621
+ continue
622
+ }
623
+ }
630
624
return
631
625
}
632
- derps := make ([]int , 0 , len (nodes ))
633
- for _ , n := range nodes {
634
- derps = append (derps , n .PreferredDERP )
635
- }
636
- for _ , e := range expected {
637
- assert .Contains (t , derps , e , "expected DERP %v, got %v" , e , derps )
638
- }
639
626
}
640
627
641
628
func assertEventuallyNoAgents (ctx context.Context , t * testing.T , store database.Store , agentID uuid.UUID ) {
0 commit comments