@@ -6,12 +6,9 @@ import (
6
6
"fmt"
7
7
"net"
8
8
"net/http"
9
- "net/http/httptest"
10
- "net/url"
11
9
"runtime"
12
10
"strconv"
13
11
"strings"
14
- "sync"
15
12
"sync/atomic"
16
13
"testing"
17
14
"time"
@@ -21,6 +18,7 @@ import (
21
18
"github.com/stretchr/testify/require"
22
19
"golang.org/x/xerrors"
23
20
"google.golang.org/protobuf/types/known/timestamppb"
21
+ "nhooyr.io/websocket"
24
22
"tailscale.com/tailcfg"
25
23
26
24
"cdr.dev/slog"
@@ -43,6 +41,8 @@ import (
43
41
"github.com/coder/coder/v2/codersdk/workspacesdk"
44
42
"github.com/coder/coder/v2/provisioner/echo"
45
43
"github.com/coder/coder/v2/provisionersdk/proto"
44
+ "github.com/coder/coder/v2/tailnet"
45
+ tailnetproto "github.com/coder/coder/v2/tailnet/proto"
46
46
"github.com/coder/coder/v2/tailnet/tailnettest"
47
47
"github.com/coder/coder/v2/testutil"
48
48
)
@@ -512,111 +512,138 @@ func TestWorkspaceAgentClientCoordinate_BadVersion(t *testing.T) {
512
512
require .Equal (t , "version" , sdkErr .Validations [0 ].Field )
513
513
}
514
514
515
+ type resumeTokenTestFakeCoordinator struct {
516
+ tailnet.Coordinator
517
+ lastPeerID uuid.UUID
518
+ }
519
+
520
+ var _ tailnet.Coordinator = & resumeTokenTestFakeCoordinator {}
521
+
522
+ func (c * resumeTokenTestFakeCoordinator ) ServeClient (conn net.Conn , id uuid.UUID , agentID uuid.UUID ) error {
523
+ c .lastPeerID = id
524
+ return c .Coordinator .ServeClient (conn , id , agentID )
525
+ }
526
+
527
+ func (c * resumeTokenTestFakeCoordinator ) Coordinate (ctx context.Context , id uuid.UUID , name string , a tailnet.CoordinateeAuth ) (chan <- * tailnetproto.CoordinateRequest , <- chan * tailnetproto.CoordinateResponse ) {
528
+ c .lastPeerID = id
529
+ return c .Coordinator .Coordinate (ctx , id , name , a )
530
+ }
531
+
515
532
func TestWorkspaceAgentClientCoordinate_ResumeToken (t * testing.T ) {
516
533
t .Parallel ()
517
534
518
535
logger := slogtest .Make (t , nil ).Leveled (slog .LevelDebug )
519
-
520
- // We block direct in this test to ensure that even if there's no direct
521
- // connection, no shenanigans happen with the peer IDs on either side.
522
- dv := coderdtest .DeploymentValues (t )
523
- err := dv .DERP .Config .BlockDirect .Set ("true" )
524
- require .NoError (t , err )
536
+ coordinator := & resumeTokenTestFakeCoordinator {
537
+ Coordinator : tailnet .NewCoordinator (logger ),
538
+ }
525
539
client , closer , api := coderdtest .NewWithAPI (t , & coderdtest.Options {
526
- DeploymentValues : dv ,
540
+ Coordinator : coordinator ,
527
541
})
528
542
defer closer .Close ()
529
543
user := coderdtest .CreateFirstUser (t , client )
530
544
531
- // Change the DERP mapper to our custom one.
532
- var currentDerpMap atomic.Pointer [tailcfg.DERPMap ]
533
- originalDerpMap , _ := tailnettest .RunDERPAndSTUN (t )
534
- currentDerpMap .Store (originalDerpMap )
535
- derpMapFn := func (_ * tailcfg.DERPMap ) * tailcfg.DERPMap {
536
- return currentDerpMap .Load ().Clone ()
537
- }
538
- api .DERPMapper .Store (& derpMapFn )
539
-
540
- // Start workspace a workspace agent.
545
+ // Create a workspace with an agent. No need to connect it since clients can
546
+ // still connect to the coordinator while the agent isn't connected.
541
547
r := dbfake .WorkspaceBuild (t , api .Database , database.Workspace {
542
548
OrganizationID : user .OrganizationID ,
543
549
OwnerID : user .UserID ,
544
550
}).WithAgent ().Do ()
545
-
546
- agentCloser := agenttest .New (t , client .URL , r .AgentToken )
547
- resources := coderdtest .AwaitWorkspaceAgents (t , client , r .Workspace .ID )
548
- agentID := resources [0 ].Agents [0 ].ID
549
-
550
- // Create a new "proxy" server that we can use to kill the connection
551
- // whenever we want.
552
- l , err := netListenDroppable ("tcp" , "localhost:0" )
551
+ agentTokenUUID , err := uuid .Parse (r .AgentToken )
553
552
require .NoError (t , err )
554
- defer l .Close ()
555
- srv := & httptest.Server {
556
- Listener : l ,
557
- //nolint:gosec
558
- Config : & http.Server {Handler : api .RootHandler },
559
- }
560
- srv .Start ()
561
- proxyURL , err := url .Parse (srv .URL )
553
+ ctx := testutil .Context (t , testutil .WaitLong )
554
+ agentAndBuild , err := api .Database .GetWorkspaceAgentAndLatestBuildByAuthToken (dbauthz .AsSystemRestricted (ctx ), agentTokenUUID ) //nolint
562
555
require .NoError (t , err )
563
- proxyClient := codersdk .New (proxyURL )
564
- proxyClient .SetSessionToken (client .SessionToken ())
565
-
566
- ctx , cancel := context .WithTimeout (context .Background (), testutil .WaitLong )
567
- defer cancel ()
568
556
569
- // Connect from a client.
570
- conn , err := workspacesdk .New (proxyClient ).
571
- DialAgent (ctx , agentID , & workspacesdk.DialAgentOptions {
572
- Logger : logger .Named ("client" ),
573
- })
557
+ // Connect with no resume token, and ensure that the peer ID is set to a
558
+ // random value.
559
+ coordinator .lastPeerID = uuid .Nil
560
+ originalResumeToken , err := connectToCoordinatorAndFetchResumeToken (ctx , logger , client , agentAndBuild .WorkspaceAgent .ID , "" )
574
561
require .NoError (t , err )
575
- defer conn .Close ()
562
+ originalPeerID := coordinator .lastPeerID
563
+ require .NotEqual (t , originalPeerID , uuid .Nil )
576
564
577
- ok := conn .AwaitReachable (ctx )
578
- require .True (t , ok )
579
- originalAgentPeers := agentCloser .TailnetConn ().GetKnownPeerIDs ()
565
+ // Connect with a valid resume token, and ensure that the peer ID is set to
566
+ // the stored value.
567
+ coordinator .lastPeerID = uuid .Nil
568
+ newResumeToken , err := connectToCoordinatorAndFetchResumeToken (ctx , logger , client , agentAndBuild .WorkspaceAgent .ID , originalResumeToken )
569
+ require .NoError (t , err )
570
+ require .Equal (t , originalPeerID , coordinator .lastPeerID )
571
+ require .NotEqual (t , originalResumeToken , newResumeToken )
580
572
581
- // Drop client conn's coordinator connection.
582
- l .DropAllConns ()
573
+ // Connect with an invalid resume token, and ensure that the request is
574
+ // rejected.
575
+ coordinator .lastPeerID = uuid .Nil
576
+ _ , err = connectToCoordinatorAndFetchResumeToken (ctx , logger , client , agentAndBuild .WorkspaceAgent .ID , "invalid" )
577
+ require .Error (t , err )
578
+ var sdkErr * codersdk.Error
579
+ require .ErrorAs (t , err , & sdkErr )
580
+ require .Equal (t , http .StatusUnauthorized , sdkErr .StatusCode ())
581
+ require .Len (t , sdkErr .Validations , 1 )
582
+ require .Equal (t , "resume_token" , sdkErr .Validations [0 ].Field )
583
+ require .Equal (t , uuid .Nil , coordinator .lastPeerID )
584
+ }
583
585
584
- // HACK: Change the DERP map and add a second "marker" region so we know
585
- // when the client has reconnected to the coordinator.
586
- //
587
- // With some refactoring of the client connection to expose the
588
- // coordinator connection status, this wouldn't be needed, but this
589
- // also works.
590
- derpMap := currentDerpMap .Load ().Clone ()
591
- newDerpMap , _ := tailnettest .RunDERPAndSTUN (t )
592
- derpMap .Regions [2 ] = newDerpMap .Regions [1 ]
593
- currentDerpMap .Store (derpMap )
586
+ // connectToCoordinatorAndFetchResumeToken connects to the tailnet coordinator
587
+ // with a given resume token. It returns an error if the connection is rejected.
588
+ // If the connection is accepted, it is immediately closed and no error is
589
+ // returned.
590
+ func connectToCoordinatorAndFetchResumeToken (ctx context.Context , logger slog.Logger , sdkClient * codersdk.Client , agentID uuid.UUID , resumeToken string ) (string , error ) {
591
+ u , err := sdkClient .URL .Parse (fmt .Sprintf ("/api/v2/workspaceagents/%s/coordinate" , agentID ))
592
+ if err != nil {
593
+ return "" , xerrors .Errorf ("parse URL: %w" , err )
594
+ }
595
+ q := u .Query ()
596
+ q .Set ("version" , "2.0" )
597
+ if resumeToken != "" {
598
+ q .Set ("resume_token" , resumeToken )
599
+ }
600
+ u .RawQuery = q .Encode ()
594
601
595
- // Wait for the agent's DERP map to be updated.
596
- require .Eventually (t , func () bool {
597
- conn := agentCloser .TailnetConn ()
598
- if conn == nil {
599
- return false
602
+ //nolint:bodyclose
603
+ wsConn , resp , err := websocket .Dial (ctx , u .String (), & websocket.DialOptions {
604
+ HTTPHeader : http.Header {
605
+ "Coder-Session-Token" : []string {sdkClient .SessionToken ()},
606
+ },
607
+ })
608
+ if err != nil {
609
+ if resp .StatusCode != http .StatusSwitchingProtocols {
610
+ err = codersdk .ReadBodyAsError (resp )
600
611
}
601
- regionIDs := conn .DERPMap ().RegionIDs ()
602
- return len (regionIDs ) == 2 && regionIDs [1 ] == 2
603
- }, testutil .WaitLong , testutil .IntervalFast )
604
-
605
- // Wait for the DERP map to be updated on the client. This means that the
606
- // client has reconnected to the coordinator.
607
- require .Eventually (t , func () bool {
608
- regionIDs := conn .Conn .DERPMap ().RegionIDs ()
609
- return len (regionIDs ) == 2 && regionIDs [1 ] == 2
610
- }, testutil .WaitLong , testutil .IntervalFast )
612
+ return "" , xerrors .Errorf ("websocket dial: %w" , err )
613
+ }
614
+ defer wsConn .Close (websocket .StatusNormalClosure , "done" )
615
+
616
+ // Send a request to the server to ensure that we're plumbed all the way
617
+ // through.
618
+ rpcClient , err := tailnet .NewDRPCClient (
619
+ websocket .NetConn (ctx , wsConn , websocket .MessageBinary ),
620
+ logger ,
621
+ )
622
+ if err != nil {
623
+ return "" , xerrors .Errorf ("new dRPC client: %w" , err )
624
+ }
611
625
612
- // The first client should still be able to reach the agent.
613
- ok = conn .AwaitReachable (ctx )
614
- require .True (t , ok )
615
- _ , err = conn .ListeningPorts (ctx )
616
- require .NoError (t , err )
626
+ // Send an empty coordination request. This will do nothing on the server,
627
+ // but ensures our wrapped coordinator can record the peer ID.
628
+ coordinateClient , err := rpcClient .Coordinate (ctx )
629
+ if err != nil {
630
+ return "" , xerrors .Errorf ("coordinate: %w" , err )
631
+ }
632
+ err = coordinateClient .Send (& tailnetproto.CoordinateRequest {})
633
+ if err != nil {
634
+ return "" , xerrors .Errorf ("send empty coordination request: %w" , err )
635
+ }
636
+ err = coordinateClient .Close ()
637
+ if err != nil {
638
+ return "" , xerrors .Errorf ("close coordination request: %w" , err )
639
+ }
617
640
618
- // The agent should not see any new peers.
619
- require .ElementsMatch (t , originalAgentPeers , agentCloser .TailnetConn ().GetKnownPeerIDs ())
641
+ // Fetch a resume token.
642
+ newResumeToken , err := rpcClient .RefreshResumeToken (ctx , & tailnetproto.RefreshResumeTokenRequest {})
643
+ if err != nil {
644
+ return "" , xerrors .Errorf ("fetch resume token: %w" , err )
645
+ }
646
+ return newResumeToken .Token , nil
620
647
}
621
648
622
649
func TestWorkspaceAgentTailnetDirectDisabled (t * testing.T ) {
@@ -1832,40 +1859,3 @@ func postStartup(ctx context.Context, t testing.TB, client agent.Client, startup
1832
1859
_ , err = aAPI .UpdateStartup (ctx , & agentproto.UpdateStartupRequest {Startup : startup })
1833
1860
return err
1834
1861
}
1835
-
1836
- type droppableTCPListener struct {
1837
- net.Listener
1838
- mu sync.Mutex
1839
- conns []net.Conn
1840
- }
1841
-
1842
- var _ net.Listener = & droppableTCPListener {}
1843
-
1844
- func netListenDroppable (network , addr string ) (* droppableTCPListener , error ) {
1845
- l , err := net .Listen (network , addr )
1846
- if err != nil {
1847
- return nil , err
1848
- }
1849
- return & droppableTCPListener {Listener : l }, nil
1850
- }
1851
-
1852
- func (l * droppableTCPListener ) Accept () (net.Conn , error ) {
1853
- conn , err := l .Listener .Accept ()
1854
- if err != nil {
1855
- return nil , err
1856
- }
1857
-
1858
- l .mu .Lock ()
1859
- defer l .mu .Unlock ()
1860
- l .conns = append (l .conns , conn )
1861
- return conn , nil
1862
- }
1863
-
1864
- func (l * droppableTCPListener ) DropAllConns () {
1865
- l .mu .Lock ()
1866
- defer l .mu .Unlock ()
1867
- for _ , c := range l .conns {
1868
- _ = c .Close ()
1869
- }
1870
- l .conns = nil
1871
- }
0 commit comments