Skip to content

Commit 63e03b5

Browse files
committed
review p1
1 parent 7a42c42 commit 63e03b5

File tree

2 files changed

+337
-348
lines changed

2 files changed

+337
-348
lines changed

enterprise/coderd/coderd_test.go

Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,37 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"io"
8+
"net"
79
"net/http"
810
"net/http/httptest"
11+
"net/url"
912
"reflect"
1013
"strings"
14+
"sync"
1115
"testing"
1216
"time"
1317

1418
"github.com/google/uuid"
19+
"github.com/moby/moby/pkg/namesgenerator"
1520
"github.com/stretchr/testify/assert"
1621
"github.com/stretchr/testify/require"
1722
"go.uber.org/goleak"
1823

24+
"cdr.dev/slog"
1925
"cdr.dev/slog/sloggers/slogtest"
26+
"github.com/coder/coder/v2/agent"
27+
"github.com/coder/coder/v2/agent/agenttest"
2028
"github.com/coder/coder/v2/coderd/httpapi"
2129
"github.com/coder/coder/v2/coderd/rbac/policy"
30+
"github.com/coder/coder/v2/coderd/util/ptr"
2231
"github.com/coder/coder/v2/tailnet/tailnettest"
2332

2433
agplaudit "github.com/coder/coder/v2/coderd/audit"
2534
"github.com/coder/coder/v2/coderd/coderdtest"
2635
"github.com/coder/coder/v2/coderd/database"
2736
"github.com/coder/coder/v2/coderd/database/dbauthz"
37+
"github.com/coder/coder/v2/coderd/database/dbfake"
2838
"github.com/coder/coder/v2/coderd/database/dbmem"
2939
"github.com/coder/coder/v2/coderd/database/dbtestutil"
3040
"github.com/coder/coder/v2/coderd/database/dbtime"
@@ -522,3 +532,330 @@ func testDBAuthzRole(ctx context.Context) context.Context {
522532
Scope: rbac.ScopeAll,
523533
})
524534
}
535+
536+
// restartableListener is a TCP listener that can have all of it's connections
537+
// severed on demand.
538+
type restartableListener struct {
539+
net.Listener
540+
mu sync.Mutex
541+
conns []net.Conn
542+
}
543+
544+
func (l *restartableListener) Accept() (net.Conn, error) {
545+
conn, err := l.Listener.Accept()
546+
if err != nil {
547+
return nil, err
548+
}
549+
l.mu.Lock()
550+
l.conns = append(l.conns, conn)
551+
l.mu.Unlock()
552+
return conn, nil
553+
}
554+
555+
func (l *restartableListener) CloseConnections() {
556+
l.mu.Lock()
557+
defer l.mu.Unlock()
558+
for _, conn := range l.conns {
559+
_ = conn.Close()
560+
}
561+
l.conns = nil
562+
}
563+
564+
type restartableTestServer struct {
565+
options *coderdenttest.Options
566+
rl *restartableListener
567+
568+
mu sync.Mutex
569+
api *coderd.API
570+
closer io.Closer
571+
}
572+
573+
func newRestartableTestServer(t *testing.T, options *coderdenttest.Options) (*codersdk.Client, codersdk.CreateFirstUserResponse, *restartableTestServer) {
574+
t.Helper()
575+
if options == nil {
576+
options = &coderdenttest.Options{}
577+
}
578+
579+
s := &restartableTestServer{
580+
options: options,
581+
}
582+
srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
583+
s.mu.Lock()
584+
api := s.api
585+
s.mu.Unlock()
586+
587+
if api == nil {
588+
w.WriteHeader(http.StatusBadGateway)
589+
_, _ = w.Write([]byte("server is not started"))
590+
return
591+
}
592+
api.AGPL.RootHandler.ServeHTTP(w, r)
593+
}))
594+
s.rl = &restartableListener{Listener: srv.Listener}
595+
srv.Listener = s.rl
596+
srv.Start()
597+
t.Cleanup(srv.Close)
598+
599+
u, err := url.Parse(srv.URL)
600+
require.NoError(t, err, "failed to parse server URL")
601+
s.options.AccessURL = u
602+
603+
client, firstUser := s.startWithFirstUser(t)
604+
client.URL = u
605+
return client, firstUser, s
606+
}
607+
608+
func (s *restartableTestServer) Stop(t *testing.T) {
609+
t.Helper()
610+
611+
s.mu.Lock()
612+
closer := s.closer
613+
s.closer = nil
614+
api := s.api
615+
s.api = nil
616+
s.mu.Unlock()
617+
618+
if closer != nil {
619+
err := closer.Close()
620+
require.NoError(t, err)
621+
}
622+
if api != nil {
623+
err := api.Close()
624+
require.NoError(t, err)
625+
}
626+
627+
s.rl.CloseConnections()
628+
}
629+
630+
func (s *restartableTestServer) Start(t *testing.T) {
631+
t.Helper()
632+
_, _ = s.startWithFirstUser(t)
633+
}
634+
635+
func (s *restartableTestServer) startWithFirstUser(t *testing.T) (client *codersdk.Client, firstUser codersdk.CreateFirstUserResponse) {
636+
t.Helper()
637+
s.mu.Lock()
638+
defer s.mu.Unlock()
639+
640+
if s.closer != nil || s.api != nil {
641+
t.Fatal("server already started, close must be called first")
642+
}
643+
// This creates it's own TCP listener unfortunately, but it's not being
644+
// used in this test.
645+
client, s.closer, s.api, firstUser = coderdenttest.NewWithAPI(t, s.options)
646+
647+
// Never add the first user or license on subsequent restarts.
648+
s.options.DontAddFirstUser = true
649+
s.options.DontAddLicense = true
650+
651+
return client, firstUser
652+
}
653+
654+
// Test_CoordinatorRollingRestart tests that two peers can maintain a connection
655+
// without forgetting about each other when a HA coordinator does a rolling
656+
// restart.
657+
//
658+
// We had a few issues with this in the past:
659+
// 1. We didn't allow clients to maintain their peer ID after a reconnect,
660+
// which resulted in the other peer thinking the client was a new peer.
661+
// (This is fixed and independently tested in AGPL code)
662+
// 2. HA coordinators would delete all peers (via FK constraints) when they
663+
// were closed, which meant tunnels would be deleted and peers would be
664+
// notified that the other peer was permanently gone.
665+
// (This is fixed and independently tested above)
666+
//
667+
// This test uses a real server and real clients.
668+
func TestConn_CoordinatorRollingRestart(t *testing.T) {
669+
t.Parallel()
670+
671+
if !dbtestutil.WillUsePostgres() {
672+
t.Skip("test only with postgres")
673+
}
674+
675+
// Although DERP will have connection issues until the connection is
676+
// reestablished, any open connections should be maintained.
677+
//
678+
// Direct connections should be able to transmit packets throughout the
679+
// restart without issue.
680+
for _, direct := range []bool{true, false} {
681+
direct := direct
682+
name := "DERP"
683+
if direct {
684+
name = "Direct"
685+
}
686+
687+
t.Run(name, func(t *testing.T) {
688+
t.Parallel()
689+
690+
store, ps := dbtestutil.NewDB(t)
691+
dv := coderdtest.DeploymentValues(t, func(dv *codersdk.DeploymentValues) {
692+
dv.DERP.Config.BlockDirect = serpent.Bool(!direct)
693+
})
694+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
695+
696+
// Create two restartable test servers with the same database.
697+
client1, user, s1 := newRestartableTestServer(t, &coderdenttest.Options{
698+
DontAddFirstUser: false,
699+
DontAddLicense: false,
700+
Options: &coderdtest.Options{
701+
Logger: ptr.Ref(logger.Named("server1")),
702+
Database: store,
703+
Pubsub: ps,
704+
DeploymentValues: dv,
705+
IncludeProvisionerDaemon: true,
706+
},
707+
LicenseOptions: &coderdenttest.LicenseOptions{
708+
Features: license.Features{
709+
codersdk.FeatureHighAvailability: 1,
710+
},
711+
},
712+
})
713+
client2, _, s2 := newRestartableTestServer(t, &coderdenttest.Options{
714+
DontAddFirstUser: true,
715+
DontAddLicense: true,
716+
Options: &coderdtest.Options{
717+
Logger: ptr.Ref(logger.Named("server2")),
718+
Database: store,
719+
Pubsub: ps,
720+
DeploymentValues: dv,
721+
},
722+
})
723+
client2.SetSessionToken(client1.SessionToken())
724+
725+
workspace := dbfake.WorkspaceBuild(t, store, database.Workspace{
726+
OrganizationID: user.OrganizationID,
727+
OwnerID: user.UserID,
728+
}).WithAgent().Do()
729+
730+
// Agent connects via the first coordinator.
731+
_ = agenttest.New(t, client1.URL, workspace.AgentToken, func(o *agent.Options) {
732+
o.Logger = logger.Named("agent1")
733+
})
734+
resources := coderdtest.NewWorkspaceAgentWaiter(t, client1, workspace.Workspace.ID).Wait()
735+
736+
agentID := uuid.Nil
737+
for _, r := range resources {
738+
for _, a := range r.Agents {
739+
agentID = a.ID
740+
break
741+
}
742+
}
743+
require.NotEqual(t, uuid.Nil, agentID)
744+
745+
// Client connects via the second coordinator.
746+
ctx := testutil.Context(t, testutil.WaitSuperLong)
747+
workspaceClient2 := workspacesdk.New(client2)
748+
conn, err := workspaceClient2.DialAgent(ctx, agentID, &workspacesdk.DialAgentOptions{
749+
Logger: logger.Named("client"),
750+
})
751+
require.NoError(t, err)
752+
defer conn.Close()
753+
754+
require.Eventually(t, func() bool {
755+
_, p2p, _, err := conn.Ping(ctx)
756+
assert.NoError(t, err)
757+
return p2p == direct
758+
}, testutil.WaitShort, testutil.IntervalFast)
759+
760+
// Open a TCP server and connection to it through the tunnel that
761+
// should be maintained throughout the restart.
762+
tcpServerAddr := tcpEchoServer(t)
763+
tcpConn, err := conn.DialContext(ctx, "tcp", tcpServerAddr)
764+
require.NoError(t, err)
765+
defer tcpConn.Close()
766+
writeReadEcho(t, ctx, tcpConn)
767+
768+
// Stop the first server.
769+
logger.Info(ctx, "test: stopping server 1")
770+
s1.Stop(t)
771+
772+
// Pings should fail on DERP but succeed on direct connections.
773+
pingCtx, pingCancel := context.WithTimeout(ctx, 2*time.Second) //nolint:gocritic // it's going to hang and timeout for DERP, so this needs to be short
774+
defer pingCancel()
775+
_, p2p, _, err := conn.Ping(pingCtx)
776+
if direct {
777+
require.NoError(t, err)
778+
require.True(t, p2p, "expected direct connection")
779+
} else {
780+
require.ErrorIs(t, err, context.DeadlineExceeded)
781+
}
782+
783+
// The existing TCP connection should still be working if we're
784+
// using direct connections.
785+
if direct {
786+
writeReadEcho(t, ctx, tcpConn)
787+
}
788+
789+
// Start the first server again.
790+
logger.Info(ctx, "test: starting server 1")
791+
s1.Start(t)
792+
793+
// Restart the second server.
794+
logger.Info(ctx, "test: stopping server 2")
795+
s2.Stop(t)
796+
logger.Info(ctx, "test: starting server 2")
797+
s2.Start(t)
798+
799+
// Pings should eventually succeed on both DERP and direct
800+
// connections.
801+
require.True(t, conn.AwaitReachable(ctx))
802+
_, p2p, _, err = conn.Ping(ctx)
803+
require.NoError(t, err)
804+
require.Equal(t, direct, p2p, "mismatched p2p state")
805+
806+
// The existing TCP connection should still be working.
807+
writeReadEcho(t, ctx, tcpConn)
808+
})
809+
}
810+
}
811+
812+
func tcpEchoServer(t *testing.T) string {
813+
var listenerWg sync.WaitGroup
814+
tcpListener, err := net.Listen("tcp", "127.0.0.1:0")
815+
require.NoError(t, err)
816+
t.Cleanup(func() {
817+
_ = tcpListener.Close()
818+
listenerWg.Wait()
819+
})
820+
listenerWg.Add(1)
821+
go func() {
822+
defer listenerWg.Done()
823+
for {
824+
conn, err := tcpListener.Accept()
825+
if err != nil {
826+
return
827+
}
828+
listenerWg.Add(1)
829+
go func() {
830+
defer listenerWg.Done()
831+
defer conn.Close()
832+
_, _ = io.Copy(conn, conn)
833+
}()
834+
}
835+
}()
836+
837+
return tcpListener.Addr().String()
838+
}
839+
840+
// nolint:revive // t takes precedence.
841+
func writeReadEcho(t *testing.T, ctx context.Context, conn net.Conn) {
842+
msg := namesgenerator.GetRandomName(0)
843+
844+
deadline, ok := ctx.Deadline()
845+
if ok {
846+
_ = conn.SetWriteDeadline(deadline)
847+
defer conn.SetWriteDeadline(time.Time{})
848+
_ = conn.SetReadDeadline(deadline)
849+
defer conn.SetReadDeadline(time.Time{})
850+
}
851+
852+
// Write a message
853+
_, err := conn.Write([]byte(msg))
854+
require.NoError(t, err)
855+
856+
// Read the message back
857+
buf := make([]byte, 1024)
858+
n, err := conn.Read(buf)
859+
require.NoError(t, err)
860+
require.Equal(t, msg, string(buf[:n]))
861+
}

0 commit comments

Comments
 (0)