@@ -13,12 +13,8 @@ import (
13
13
"github.com/hashicorp/yamux"
14
14
"github.com/stretchr/testify/assert"
15
15
"github.com/stretchr/testify/require"
16
- "golang.org/x/xerrors"
17
- "google.golang.org/protobuf/types/known/durationpb"
18
- "google.golang.org/protobuf/types/known/timestamppb"
19
16
"nhooyr.io/websocket"
20
17
"storj.io/drpc"
21
- "storj.io/drpc/drpcerr"
22
18
"tailscale.com/tailcfg"
23
19
24
20
"cdr.dev/slog"
@@ -385,7 +381,12 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
385
381
DERPMapUpdateFrequency : time .Millisecond ,
386
382
DERPMapFn : func () * tailcfg.DERPMap { return <- derpMapCh },
387
383
NetworkTelemetryHandler : func (batch []* proto.TelemetryEvent ) {
388
- testutil .RequireSendCtx (ctx , t , eventCh , batch )
384
+ select {
385
+ case <- ctx .Done ():
386
+ t .Error ("timeout sending telemetry event" )
387
+ case eventCh <- batch :
388
+ t .Log ("sent telemetry batch" )
389
+ }
389
390
},
390
391
ResumeTokenProvider : tailnet .NewInsecureTestResumeTokenProvider (),
391
392
})
@@ -409,11 +410,10 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
409
410
410
411
uut := newTailnetAPIConnector (ctx , logger , agentID , svr .URL , quartz .NewReal (), & websocket.DialOptions {})
411
412
uut .runConnector (fConn )
412
- require .Eventually (t , func () bool {
413
- uut .clientMu .Lock ()
414
- defer uut .clientMu .Unlock ()
415
- return uut .client != nil
416
- }, testutil .WaitShort , testutil .IntervalFast )
413
+ // Coordinate calls happen _after_ telemetry is connected up, so we use this
414
+ // to ensure telemetry is connected before sending our event
415
+ cc := testutil .RequireRecvCtx (ctx , t , fCoord .CoordinateCalls )
416
+ defer close (cc .Resps )
417
417
418
418
uut .SendTelemetryEvent (& proto.TelemetryEvent {
419
419
Id : []byte ("test event" ),
@@ -425,86 +425,6 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
425
425
require .Equal (t , []byte ("test event" ), testEvents [0 ].Id )
426
426
}
427
427
428
- func TestTailnetAPIConnector_TelemetryUnimplemented (t * testing.T ) {
429
- t .Parallel ()
430
- ctx := testutil .Context (t , testutil .WaitShort )
431
- logger := slogtest .Make (t , nil ).Leveled (slog .LevelDebug )
432
- agentID := uuid.UUID {0x55 }
433
- fConn := newFakeTailnetConn ()
434
-
435
- fakeDRPCClient := newFakeDRPCClient ()
436
- uut := & tailnetAPIConnector {
437
- ctx : ctx ,
438
- logger : logger ,
439
- agentID : agentID ,
440
- coordinateURL : "" ,
441
- clock : quartz .NewReal (),
442
- dialOptions : & websocket.DialOptions {},
443
- connected : make (chan error , 1 ),
444
- closed : make (chan struct {}),
445
- customDialFn : func () (proto.DRPCTailnetClient , error ) {
446
- return fakeDRPCClient , nil
447
- },
448
- }
449
- uut .runConnector (fConn )
450
- require .Eventually (t , func () bool {
451
- uut .clientMu .Lock ()
452
- defer uut .clientMu .Unlock ()
453
- return uut .client != nil
454
- }, testutil .WaitShort , testutil .IntervalFast )
455
-
456
- fakeDRPCClient .telemetryError = drpcerr .WithCode (xerrors .New ("Unimplemented" ), 0 )
457
- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
458
- require .False (t , uut .telemetryUnavailable .Load ())
459
- require .Equal (t , int64 (1 ), atomic .LoadInt64 (& fakeDRPCClient .postTelemetryCalls ))
460
-
461
- fakeDRPCClient .telemetryError = drpcerr .WithCode (xerrors .New ("Unimplemented" ), drpcerr .Unimplemented )
462
- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
463
- require .True (t , uut .telemetryUnavailable .Load ())
464
- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
465
- require .Equal (t , int64 (2 ), atomic .LoadInt64 (& fakeDRPCClient .postTelemetryCalls ))
466
- }
467
-
468
- func TestTailnetAPIConnector_TelemetryNotRecognised (t * testing.T ) {
469
- t .Parallel ()
470
- ctx := testutil .Context (t , testutil .WaitShort )
471
- logger := slogtest .Make (t , nil ).Leveled (slog .LevelDebug )
472
- agentID := uuid.UUID {0x55 }
473
- fConn := newFakeTailnetConn ()
474
-
475
- fakeDRPCClient := newFakeDRPCClient ()
476
- uut := & tailnetAPIConnector {
477
- ctx : ctx ,
478
- logger : logger ,
479
- agentID : agentID ,
480
- coordinateURL : "" ,
481
- clock : quartz .NewReal (),
482
- dialOptions : & websocket.DialOptions {},
483
- connected : make (chan error , 1 ),
484
- closed : make (chan struct {}),
485
- customDialFn : func () (proto.DRPCTailnetClient , error ) {
486
- return fakeDRPCClient , nil
487
- },
488
- }
489
- uut .runConnector (fConn )
490
- require .Eventually (t , func () bool {
491
- uut .clientMu .Lock ()
492
- defer uut .clientMu .Unlock ()
493
- return uut .client != nil
494
- }, testutil .WaitShort , testutil .IntervalFast )
495
-
496
- fakeDRPCClient .telemetryError = drpc .ProtocolError .New ("Protocol Error" )
497
- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
498
- require .False (t , uut .telemetryUnavailable .Load ())
499
- require .Equal (t , int64 (1 ), atomic .LoadInt64 (& fakeDRPCClient .postTelemetryCalls ))
500
-
501
- fakeDRPCClient .telemetryError = drpc .ProtocolError .New ("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry" )
502
- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
503
- require .True (t , uut .telemetryUnavailable .Load ())
504
- uut .SendTelemetryEvent (& proto.TelemetryEvent {})
505
- require .Equal (t , int64 (2 ), atomic .LoadInt64 (& fakeDRPCClient .postTelemetryCalls ))
506
- }
507
-
508
428
type fakeTailnetConn struct {}
509
429
510
430
func (* fakeTailnetConn ) UpdatePeers ([]* proto.CoordinateResponse_PeerUpdate ) error {
@@ -524,65 +444,6 @@ func newFakeTailnetConn() *fakeTailnetConn {
524
444
return & fakeTailnetConn {}
525
445
}
526
446
527
- type fakeDRPCClient struct {
528
- postTelemetryCalls int64
529
- refreshTokenFn func (context.Context , * proto.RefreshResumeTokenRequest ) (* proto.RefreshResumeTokenResponse , error )
530
- telemetryError error
531
- fakeDRPPCMapStream
532
- }
533
-
534
- var _ proto.DRPCTailnetClient = & fakeDRPCClient {}
535
-
536
- func newFakeDRPCClient () * fakeDRPCClient {
537
- return & fakeDRPCClient {
538
- postTelemetryCalls : 0 ,
539
- fakeDRPPCMapStream : fakeDRPPCMapStream {
540
- fakeDRPCStream : fakeDRPCStream {
541
- ch : make (chan struct {}),
542
- },
543
- },
544
- }
545
- }
546
-
547
- // Coordinate implements proto.DRPCTailnetClient.
548
- func (f * fakeDRPCClient ) Coordinate (_ context.Context ) (proto.DRPCTailnet_CoordinateClient , error ) {
549
- return & f .fakeDRPCStream , nil
550
- }
551
-
552
- // DRPCConn implements proto.DRPCTailnetClient.
553
- func (* fakeDRPCClient ) DRPCConn () drpc.Conn {
554
- return & fakeDRPCConn {}
555
- }
556
-
557
- // PostTelemetry implements proto.DRPCTailnetClient.
558
- func (f * fakeDRPCClient ) PostTelemetry (_ context.Context , _ * proto.TelemetryRequest ) (* proto.TelemetryResponse , error ) {
559
- atomic .AddInt64 (& f .postTelemetryCalls , 1 )
560
- return nil , f .telemetryError
561
- }
562
-
563
- // StreamDERPMaps implements proto.DRPCTailnetClient.
564
- func (f * fakeDRPCClient ) StreamDERPMaps (_ context.Context , _ * proto.StreamDERPMapsRequest ) (proto.DRPCTailnet_StreamDERPMapsClient , error ) {
565
- return & f .fakeDRPPCMapStream , nil
566
- }
567
-
568
- // RefreshResumeToken implements proto.DRPCTailnetClient.
569
- func (f * fakeDRPCClient ) RefreshResumeToken (_ context.Context , _ * proto.RefreshResumeTokenRequest ) (* proto.RefreshResumeTokenResponse , error ) {
570
- if f .refreshTokenFn != nil {
571
- return f .refreshTokenFn (context .Background (), nil )
572
- }
573
-
574
- return & proto.RefreshResumeTokenResponse {
575
- Token : "test" ,
576
- RefreshIn : durationpb .New (30 * time .Minute ),
577
- ExpiresAt : timestamppb .New (time .Now ().Add (time .Hour )),
578
- }, nil
579
- }
580
-
581
- // WorkspaceUpdates implements proto.DRPCTailnetClient.
582
- func (* fakeDRPCClient ) WorkspaceUpdates (context.Context , * proto.WorkspaceUpdatesRequest ) (proto.DRPCTailnet_WorkspaceUpdatesClient , error ) {
583
- panic ("unimplemented" )
584
- }
585
-
586
447
type fakeDRPCConn struct {}
587
448
588
449
var _ drpc.Conn = & fakeDRPCConn {}
0 commit comments