@@ -4,27 +4,37 @@ import (
4
4
"bytes"
5
5
"context"
6
6
"fmt"
7
+ "io"
8
+ "net"
7
9
"net/http"
8
10
"net/http/httptest"
11
+ "net/url"
9
12
"reflect"
10
13
"strings"
14
+ "sync"
11
15
"testing"
12
16
"time"
13
17
14
18
"github.com/google/uuid"
19
+ "github.com/moby/moby/pkg/namesgenerator"
15
20
"github.com/stretchr/testify/assert"
16
21
"github.com/stretchr/testify/require"
17
22
"go.uber.org/goleak"
18
23
24
+ "cdr.dev/slog"
19
25
"cdr.dev/slog/sloggers/slogtest"
26
+ "github.com/coder/coder/v2/agent"
27
+ "github.com/coder/coder/v2/agent/agenttest"
20
28
"github.com/coder/coder/v2/coderd/httpapi"
21
29
"github.com/coder/coder/v2/coderd/rbac/policy"
30
+ "github.com/coder/coder/v2/coderd/util/ptr"
22
31
"github.com/coder/coder/v2/tailnet/tailnettest"
23
32
24
33
agplaudit "github.com/coder/coder/v2/coderd/audit"
25
34
"github.com/coder/coder/v2/coderd/coderdtest"
26
35
"github.com/coder/coder/v2/coderd/database"
27
36
"github.com/coder/coder/v2/coderd/database/dbauthz"
37
+ "github.com/coder/coder/v2/coderd/database/dbfake"
28
38
"github.com/coder/coder/v2/coderd/database/dbmem"
29
39
"github.com/coder/coder/v2/coderd/database/dbtestutil"
30
40
"github.com/coder/coder/v2/coderd/database/dbtime"
@@ -522,3 +532,330 @@ func testDBAuthzRole(ctx context.Context) context.Context {
522
532
Scope : rbac .ScopeAll ,
523
533
})
524
534
}
535
+
536
+ // restartableListener is a TCP listener that can have all of it's connections
537
+ // severed on demand.
538
+ type restartableListener struct {
539
+ net.Listener
540
+ mu sync.Mutex
541
+ conns []net.Conn
542
+ }
543
+
544
+ func (l * restartableListener ) Accept () (net.Conn , error ) {
545
+ conn , err := l .Listener .Accept ()
546
+ if err != nil {
547
+ return nil , err
548
+ }
549
+ l .mu .Lock ()
550
+ l .conns = append (l .conns , conn )
551
+ l .mu .Unlock ()
552
+ return conn , nil
553
+ }
554
+
555
+ func (l * restartableListener ) CloseConnections () {
556
+ l .mu .Lock ()
557
+ defer l .mu .Unlock ()
558
+ for _ , conn := range l .conns {
559
+ _ = conn .Close ()
560
+ }
561
+ l .conns = nil
562
+ }
563
+
564
+ type restartableTestServer struct {
565
+ options * coderdenttest.Options
566
+ rl * restartableListener
567
+
568
+ mu sync.Mutex
569
+ api * coderd.API
570
+ closer io.Closer
571
+ }
572
+
573
+ func newRestartableTestServer (t * testing.T , options * coderdenttest.Options ) (* codersdk.Client , codersdk.CreateFirstUserResponse , * restartableTestServer ) {
574
+ t .Helper ()
575
+ if options == nil {
576
+ options = & coderdenttest.Options {}
577
+ }
578
+
579
+ s := & restartableTestServer {
580
+ options : options ,
581
+ }
582
+ srv := httptest .NewUnstartedServer (http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
583
+ s .mu .Lock ()
584
+ api := s .api
585
+ s .mu .Unlock ()
586
+
587
+ if api == nil {
588
+ w .WriteHeader (http .StatusBadGateway )
589
+ _ , _ = w .Write ([]byte ("server is not started" ))
590
+ return
591
+ }
592
+ api .AGPL .RootHandler .ServeHTTP (w , r )
593
+ }))
594
+ s .rl = & restartableListener {Listener : srv .Listener }
595
+ srv .Listener = s .rl
596
+ srv .Start ()
597
+ t .Cleanup (srv .Close )
598
+
599
+ u , err := url .Parse (srv .URL )
600
+ require .NoError (t , err , "failed to parse server URL" )
601
+ s .options .AccessURL = u
602
+
603
+ client , firstUser := s .startWithFirstUser (t )
604
+ client .URL = u
605
+ return client , firstUser , s
606
+ }
607
+
608
+ func (s * restartableTestServer ) Stop (t * testing.T ) {
609
+ t .Helper ()
610
+
611
+ s .mu .Lock ()
612
+ closer := s .closer
613
+ s .closer = nil
614
+ api := s .api
615
+ s .api = nil
616
+ s .mu .Unlock ()
617
+
618
+ if closer != nil {
619
+ err := closer .Close ()
620
+ require .NoError (t , err )
621
+ }
622
+ if api != nil {
623
+ err := api .Close ()
624
+ require .NoError (t , err )
625
+ }
626
+
627
+ s .rl .CloseConnections ()
628
+ }
629
+
630
+ func (s * restartableTestServer ) Start (t * testing.T ) {
631
+ t .Helper ()
632
+ _ , _ = s .startWithFirstUser (t )
633
+ }
634
+
635
+ func (s * restartableTestServer ) startWithFirstUser (t * testing.T ) (client * codersdk.Client , firstUser codersdk.CreateFirstUserResponse ) {
636
+ t .Helper ()
637
+ s .mu .Lock ()
638
+ defer s .mu .Unlock ()
639
+
640
+ if s .closer != nil || s .api != nil {
641
+ t .Fatal ("server already started, close must be called first" )
642
+ }
643
+ // This creates it's own TCP listener unfortunately, but it's not being
644
+ // used in this test.
645
+ client , s .closer , s .api , firstUser = coderdenttest .NewWithAPI (t , s .options )
646
+
647
+ // Never add the first user or license on subsequent restarts.
648
+ s .options .DontAddFirstUser = true
649
+ s .options .DontAddLicense = true
650
+
651
+ return client , firstUser
652
+ }
653
+
654
+ // Test_CoordinatorRollingRestart tests that two peers can maintain a connection
655
+ // without forgetting about each other when a HA coordinator does a rolling
656
+ // restart.
657
+ //
658
+ // We had a few issues with this in the past:
659
+ // 1. We didn't allow clients to maintain their peer ID after a reconnect,
660
+ // which resulted in the other peer thinking the client was a new peer.
661
+ // (This is fixed and independently tested in AGPL code)
662
+ // 2. HA coordinators would delete all peers (via FK constraints) when they
663
+ // were closed, which meant tunnels would be deleted and peers would be
664
+ // notified that the other peer was permanently gone.
665
+ // (This is fixed and independently tested above)
666
+ //
667
+ // This test uses a real server and real clients.
668
+ func TestConn_CoordinatorRollingRestart (t * testing.T ) {
669
+ t .Parallel ()
670
+
671
+ if ! dbtestutil .WillUsePostgres () {
672
+ t .Skip ("test only with postgres" )
673
+ }
674
+
675
+ // Although DERP will have connection issues until the connection is
676
+ // reestablished, any open connections should be maintained.
677
+ //
678
+ // Direct connections should be able to transmit packets throughout the
679
+ // restart without issue.
680
+ for _ , direct := range []bool {true , false } {
681
+ direct := direct
682
+ name := "DERP"
683
+ if direct {
684
+ name = "Direct"
685
+ }
686
+
687
+ t .Run (name , func (t * testing.T ) {
688
+ t .Parallel ()
689
+
690
+ store , ps := dbtestutil .NewDB (t )
691
+ dv := coderdtest .DeploymentValues (t , func (dv * codersdk.DeploymentValues ) {
692
+ dv .DERP .Config .BlockDirect = serpent .Bool (! direct )
693
+ })
694
+ logger := slogtest .Make (t , nil ).Leveled (slog .LevelDebug )
695
+
696
+ // Create two restartable test servers with the same database.
697
+ client1 , user , s1 := newRestartableTestServer (t , & coderdenttest.Options {
698
+ DontAddFirstUser : false ,
699
+ DontAddLicense : false ,
700
+ Options : & coderdtest.Options {
701
+ Logger : ptr .Ref (logger .Named ("server1" )),
702
+ Database : store ,
703
+ Pubsub : ps ,
704
+ DeploymentValues : dv ,
705
+ IncludeProvisionerDaemon : true ,
706
+ },
707
+ LicenseOptions : & coderdenttest.LicenseOptions {
708
+ Features : license.Features {
709
+ codersdk .FeatureHighAvailability : 1 ,
710
+ },
711
+ },
712
+ })
713
+ client2 , _ , s2 := newRestartableTestServer (t , & coderdenttest.Options {
714
+ DontAddFirstUser : true ,
715
+ DontAddLicense : true ,
716
+ Options : & coderdtest.Options {
717
+ Logger : ptr .Ref (logger .Named ("server2" )),
718
+ Database : store ,
719
+ Pubsub : ps ,
720
+ DeploymentValues : dv ,
721
+ },
722
+ })
723
+ client2 .SetSessionToken (client1 .SessionToken ())
724
+
725
+ workspace := dbfake .WorkspaceBuild (t , store , database.Workspace {
726
+ OrganizationID : user .OrganizationID ,
727
+ OwnerID : user .UserID ,
728
+ }).WithAgent ().Do ()
729
+
730
+ // Agent connects via the first coordinator.
731
+ _ = agenttest .New (t , client1 .URL , workspace .AgentToken , func (o * agent.Options ) {
732
+ o .Logger = logger .Named ("agent1" )
733
+ })
734
+ resources := coderdtest .NewWorkspaceAgentWaiter (t , client1 , workspace .Workspace .ID ).Wait ()
735
+
736
+ agentID := uuid .Nil
737
+ for _ , r := range resources {
738
+ for _ , a := range r .Agents {
739
+ agentID = a .ID
740
+ break
741
+ }
742
+ }
743
+ require .NotEqual (t , uuid .Nil , agentID )
744
+
745
+ // Client connects via the second coordinator.
746
+ ctx := testutil .Context (t , testutil .WaitSuperLong )
747
+ workspaceClient2 := workspacesdk .New (client2 )
748
+ conn , err := workspaceClient2 .DialAgent (ctx , agentID , & workspacesdk.DialAgentOptions {
749
+ Logger : logger .Named ("client" ),
750
+ })
751
+ require .NoError (t , err )
752
+ defer conn .Close ()
753
+
754
+ require .Eventually (t , func () bool {
755
+ _ , p2p , _ , err := conn .Ping (ctx )
756
+ assert .NoError (t , err )
757
+ return p2p == direct
758
+ }, testutil .WaitShort , testutil .IntervalFast )
759
+
760
+ // Open a TCP server and connection to it through the tunnel that
761
+ // should be maintained throughout the restart.
762
+ tcpServerAddr := tcpEchoServer (t )
763
+ tcpConn , err := conn .DialContext (ctx , "tcp" , tcpServerAddr )
764
+ require .NoError (t , err )
765
+ defer tcpConn .Close ()
766
+ writeReadEcho (t , ctx , tcpConn )
767
+
768
+ // Stop the first server.
769
+ logger .Info (ctx , "test: stopping server 1" )
770
+ s1 .Stop (t )
771
+
772
+ // Pings should fail on DERP but succeed on direct connections.
773
+ pingCtx , pingCancel := context .WithTimeout (ctx , 2 * time .Second ) //nolint:gocritic // it's going to hang and timeout for DERP, so this needs to be short
774
+ defer pingCancel ()
775
+ _ , p2p , _ , err := conn .Ping (pingCtx )
776
+ if direct {
777
+ require .NoError (t , err )
778
+ require .True (t , p2p , "expected direct connection" )
779
+ } else {
780
+ require .ErrorIs (t , err , context .DeadlineExceeded )
781
+ }
782
+
783
+ // The existing TCP connection should still be working if we're
784
+ // using direct connections.
785
+ if direct {
786
+ writeReadEcho (t , ctx , tcpConn )
787
+ }
788
+
789
+ // Start the first server again.
790
+ logger .Info (ctx , "test: starting server 1" )
791
+ s1 .Start (t )
792
+
793
+ // Restart the second server.
794
+ logger .Info (ctx , "test: stopping server 2" )
795
+ s2 .Stop (t )
796
+ logger .Info (ctx , "test: starting server 2" )
797
+ s2 .Start (t )
798
+
799
+ // Pings should eventually succeed on both DERP and direct
800
+ // connections.
801
+ require .True (t , conn .AwaitReachable (ctx ))
802
+ _ , p2p , _ , err = conn .Ping (ctx )
803
+ require .NoError (t , err )
804
+ require .Equal (t , direct , p2p , "mismatched p2p state" )
805
+
806
+ // The existing TCP connection should still be working.
807
+ writeReadEcho (t , ctx , tcpConn )
808
+ })
809
+ }
810
+ }
811
+
812
+ func tcpEchoServer (t * testing.T ) string {
813
+ var listenerWg sync.WaitGroup
814
+ tcpListener , err := net .Listen ("tcp" , "127.0.0.1:0" )
815
+ require .NoError (t , err )
816
+ t .Cleanup (func () {
817
+ _ = tcpListener .Close ()
818
+ listenerWg .Wait ()
819
+ })
820
+ listenerWg .Add (1 )
821
+ go func () {
822
+ defer listenerWg .Done ()
823
+ for {
824
+ conn , err := tcpListener .Accept ()
825
+ if err != nil {
826
+ return
827
+ }
828
+ listenerWg .Add (1 )
829
+ go func () {
830
+ defer listenerWg .Done ()
831
+ defer conn .Close ()
832
+ _ , _ = io .Copy (conn , conn )
833
+ }()
834
+ }
835
+ }()
836
+
837
+ return tcpListener .Addr ().String ()
838
+ }
839
+
840
+ // nolint:revive // t takes precedence.
841
+ func writeReadEcho (t * testing.T , ctx context.Context , conn net.Conn ) {
842
+ msg := namesgenerator .GetRandomName (0 )
843
+
844
+ deadline , ok := ctx .Deadline ()
845
+ if ok {
846
+ _ = conn .SetWriteDeadline (deadline )
847
+ defer conn .SetWriteDeadline (time.Time {})
848
+ _ = conn .SetReadDeadline (deadline )
849
+ defer conn .SetReadDeadline (time.Time {})
850
+ }
851
+
852
+ // Write a message
853
+ _ , err := conn .Write ([]byte (msg ))
854
+ require .NoError (t , err )
855
+
856
+ // Read the message back
857
+ buf := make ([]byte , 1024 )
858
+ n , err := conn .Read (buf )
859
+ require .NoError (t , err )
860
+ require .Equal (t , msg , string (buf [:n ]))
861
+ }
0 commit comments