From 3f95841aff3058b47f2e1495e8661c1b749b78a2 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 11 Apr 2025 13:37:14 +0000 Subject: [PATCH 1/7] Backoff acquiring provisioner jobs when the database is unreachable Signed-off-by: Danny Kopping --- provisionerd/provisionerd.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/provisionerd/provisionerd.go b/provisionerd/provisionerd.go index b461bc593ee36..2df7f9e78db87 100644 --- a/provisionerd/provisionerd.go +++ b/provisionerd/provisionerd.go @@ -20,12 +20,13 @@ import ( "golang.org/x/xerrors" "cdr.dev/slog" + "github.com/coder/retry" + "github.com/coder/coder/v2/coderd/tracing" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/provisionerd/proto" "github.com/coder/coder/v2/provisionerd/runner" sdkproto "github.com/coder/coder/v2/provisionersdk/proto" - "github.com/coder/retry" ) // Dialer represents the function to create a daemon client connection. @@ -290,7 +291,7 @@ func (p *Server) acquireLoop() { defer p.wg.Done() defer func() { close(p.acquireDoneCh) }() ctx := p.closeContext - for { + for retrier := retry.New(10*time.Millisecond, 1*time.Second); retrier.Wait(ctx); { if p.acquireExit() { return } @@ -299,7 +300,10 @@ func (p *Server) acquireLoop() { p.opts.Logger.Debug(ctx, "shut down before client (re) connected") return } - p.acquireAndRunOne(client) + err := p.acquireAndRunOne(client) + if err != nil && ctx.Err() == nil { // Only log if context is not done. + p.opts.Logger.Debug(ctx, "retrying to acquire job", slog.F("retry_in_ms", retrier.Delay.Milliseconds()), slog.Error(err)) + } } } @@ -318,7 +322,7 @@ func (p *Server) acquireExit() bool { return false } -func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) { +func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) error { ctx := p.closeContext p.opts.Logger.Debug(ctx, "start of acquireAndRunOne") job, err := p.acquireGraceful(client) @@ -327,15 +331,15 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) { if errors.Is(err, context.Canceled) || errors.Is(err, yamux.ErrSessionShutdown) || errors.Is(err, fasthttputil.ErrInmemoryListenerClosed) { - return + return err } p.opts.Logger.Warn(ctx, "provisionerd was unable to acquire job", slog.Error(err)) - return + return xerrors.Errorf("failed to acquire job: %w", err) } if job.JobId == "" { p.opts.Logger.Debug(ctx, "acquire job successfully canceled") - return + return xerrors.New("canceled") } if len(job.TraceMetadata) > 0 { @@ -392,7 +396,7 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) { if err != nil { p.opts.Logger.Error(ctx, "provisioner job failed", slog.F("job_id", job.JobId), slog.Error(err)) } - return + return xerrors.Errorf("provisioner job failed: %w", err) } p.mutex.Lock() @@ -416,6 +420,7 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) { p.mutex.Lock() p.activeJob = nil p.mutex.Unlock() + return nil } // acquireGraceful attempts to acquire a job from the server, handling canceling the acquisition if we gracefully shut From 0448a7427b1a8bb81bc225008717b2e61fde8ce7 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 11 Apr 2025 14:02:56 +0000 Subject: [PATCH 2/7] Checking for, and specifically handling, database unreachability in tailnet control protocol dialer Signed-off-by: Danny Kopping --- coderd/coderd.go | 4 ++ coderd/tailnet.go | 19 ++++++-- coderd/tailnet_test.go | 83 +++++++++++++++++++++++++++++---- coderd/workspaceagents.go | 10 ++++ coderd/workspaceagents_test.go | 58 +++++++++++++++++++++++ codersdk/database.go | 7 +++ codersdk/workspacesdk/dialer.go | 15 ++++-- site/src/api/typesGenerated.ts | 3 ++ tailnet/controllers.go | 13 +++++- 9 files changed, 192 insertions(+), 20 deletions(-) create mode 100644 codersdk/database.go diff --git a/coderd/coderd.go b/coderd/coderd.go index 43caf8b344edc..a00b2b72f3cce 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -679,6 +679,10 @@ func New(options *Options) *API { DERPFn: api.DERPMap, Logger: options.Logger, ClientID: uuid.New(), + DatabaseHealthcheckFn: func(ctx context.Context) error { + _, err := api.Database.Ping(ctx) + return err + }, } stn, err := NewServerTailnet(api.ctx, options.Logger, diff --git a/coderd/tailnet.go b/coderd/tailnet.go index b06219db40a78..c6ff0b870d449 100644 --- a/coderd/tailnet.go +++ b/coderd/tailnet.go @@ -24,9 +24,11 @@ import ( "tailscale.com/tailcfg" "cdr.dev/slog" + "github.com/coder/coder/v2/coderd/tracing" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" + "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/site" "github.com/coder/coder/v2/tailnet" @@ -537,13 +539,20 @@ func NewMultiAgentController(ctx context.Context, logger slog.Logger, tracer tra // InmemTailnetDialer is a tailnet.ControlProtocolDialer that connects to a Coordinator and DERPMap // service running in the same memory space. type InmemTailnetDialer struct { - CoordPtr *atomic.Pointer[tailnet.Coordinator] - DERPFn func() *tailcfg.DERPMap - Logger slog.Logger - ClientID uuid.UUID + CoordPtr *atomic.Pointer[tailnet.Coordinator] + DERPFn func() *tailcfg.DERPMap + Logger slog.Logger + ClientID uuid.UUID + DatabaseHealthcheckFn func(ctx context.Context) error } -func (a *InmemTailnetDialer) Dial(_ context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) { +func (a *InmemTailnetDialer) Dial(ctx context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) { + if a.DatabaseHealthcheckFn != nil { + if err := a.DatabaseHealthcheckFn(ctx); err != nil { + return tailnet.ControlProtocolClients{}, xerrors.Errorf("%s: %w", codersdk.DatabaseNotReachable, err) + } + } + coord := a.CoordPtr.Load() if coord == nil { return tailnet.ControlProtocolClients{}, xerrors.Errorf("tailnet coordinator not initialized") diff --git a/coderd/tailnet_test.go b/coderd/tailnet_test.go index b0aaaedc769c0..652a28e30fe38 100644 --- a/coderd/tailnet_test.go +++ b/coderd/tailnet_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace" + "golang.org/x/xerrors" "tailscale.com/tailcfg" "github.com/coder/coder/v2/agent" @@ -56,8 +57,7 @@ func TestServerTailnet_AgentConn_NoSTUN(t *testing.T) { defer cancel() // Connect through the ServerTailnet - agents, serverTailnet := setupServerTailnetAgent(t, 1, - tailnettest.DisableSTUN, tailnettest.DERPIsEmbedded) + agents, serverTailnet := setupServerTailnetAgent(t, 1, withDERPAndStunOptions(tailnettest.DisableSTUN, tailnettest.DERPIsEmbedded)) a := agents[0] conn, release, err := serverTailnet.AgentConn(ctx, a.id) @@ -340,7 +340,7 @@ func TestServerTailnet_ReverseProxy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - agents, serverTailnet := setupServerTailnetAgent(t, 1, tailnettest.DisableSTUN) + agents, serverTailnet := setupServerTailnetAgent(t, 1, withDERPAndStunOptions(tailnettest.DisableSTUN)) a := agents[0] require.True(t, serverTailnet.Conn().GetBlockEndpoints(), "expected BlockEndpoints to be set") @@ -365,6 +365,43 @@ func TestServerTailnet_ReverseProxy(t *testing.T) { }) } +func TestServerTailnet_Healthcheck(t *testing.T) { + t.Parallel() + + // Verifies that a non-nil healthcheck which returns a non-error response behaves as expected. + t.Run("Passing", func(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitMedium) + fn := func(ctx context.Context) error { return nil } + + agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthcheckFn(fn)) + + a := agents[0] + conn, release, err := serverTailnet.AgentConn(ctx, a.id) + t.Cleanup(release) + require.NoError(t, err) + assert.True(t, conn.AwaitReachable(ctx)) + }) + + // If the healthcheck fails, we have no insight into this at this level. + // The dial against the control plane is retried, so we wait for the context to timeout as an indication that the + // healthcheck is performing as expected. + t.Run("Failing", func(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitMedium) + fn := func(ctx context.Context) error { return xerrors.Errorf("oops, db gone") } + + agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthcheckFn(fn)) + + a := agents[0] + _, release, err := serverTailnet.AgentConn(ctx, a.id) + require.Nil(t, release) + require.ErrorContains(t, err, "agent is unreachable") + }) +} + type wrappedListener struct { net.Listener dials int32 @@ -389,9 +426,36 @@ type agentWithID struct { agent.Agent } -func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...tailnettest.DERPAndStunOption) ([]agentWithID, *coderd.ServerTailnet) { +type serverOption struct { + HealthcheckFn func(ctx context.Context) error + DERPAndStunOptions []tailnettest.DERPAndStunOption +} + +func withHealthcheckFn(fn func(ctx context.Context) error) serverOption { + return serverOption{ + HealthcheckFn: fn, + } +} + +func withDERPAndStunOptions(opts ...tailnettest.DERPAndStunOption) serverOption { + return serverOption{ + DERPAndStunOptions: opts, + } +} + +func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...serverOption) ([]agentWithID, *coderd.ServerTailnet) { logger := testutil.Logger(t) - derpMap, derpServer := tailnettest.RunDERPAndSTUN(t, opts...) + + var healthcheckFn func(ctx context.Context) error + var derpAndStunOptions []tailnettest.DERPAndStunOption + for _, opt := range opts { + derpAndStunOptions = append(derpAndStunOptions, opt.DERPAndStunOptions...) + if opt.HealthcheckFn != nil { + healthcheckFn = opt.HealthcheckFn + } + } + + derpMap, derpServer := tailnettest.RunDERPAndSTUN(t, derpAndStunOptions...) coord := tailnet.NewCoordinator(logger) t.Cleanup(func() { @@ -431,10 +495,11 @@ func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...tailnettest.DER } dialer := &coderd.InmemTailnetDialer{ - CoordPtr: &coordPtr, - DERPFn: func() *tailcfg.DERPMap { return derpMap }, - Logger: logger, - ClientID: uuid.UUID{5}, + CoordPtr: &coordPtr, + DERPFn: func() *tailcfg.DERPMap { return derpMap }, + Logger: logger, + ClientID: uuid.UUID{5}, + DatabaseHealthcheckFn: healthcheckFn, } serverTailnet, err := coderd.NewServerTailnet( context.Background(), diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index a4f8ed297b77a..4af12fa228713 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -997,6 +997,16 @@ func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) { func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() + // Ensure the database is reachable before proceeding. + _, err := api.Database.Ping(ctx) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: codersdk.DatabaseNotReachable, + Detail: err.Error(), + }) + return + } + // This route accepts user API key auth and workspace proxy auth. The moon actor has // full permissions so should be able to pass this authz check. workspace := httpmw.WorkspaceParam(r) diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index a8fe7718f4385..94c73da74816a 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -45,6 +45,7 @@ import ( "github.com/coder/coder/v2/coderd/database/dbfake" "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/database/dbmem" + "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/externalauth" @@ -537,6 +538,44 @@ func TestWorkspaceAgentTailnet(t *testing.T) { require.Equal(t, "test", strings.TrimSpace(string(output))) } +// TestWorkspaceAgentDialFailure validates that the tailnet controller will retry connecting to the control plane until +// its context times out, when the dialer fails its healthcheck. +func TestWorkspaceAgentDialFailure(t *testing.T) { + t.Parallel() + + store, ps := dbtestutil.NewDB(t) + + // Given: a database which will fail its Ping(ctx) call. + // NOTE: The Ping(ctx) call is made by the Dialer. + pdb := &pingFailingDB{ + Store: store, + } + client := coderdtest.New(t, &coderdtest.Options{ + Database: pdb, + Pubsub: ps, + IncludeProvisionerDaemon: true, + }) + user := coderdtest.CreateFirstUser(t, client) + + // When: a workspace agent is setup and we try dial it. + r := dbfake.WorkspaceBuild(t, pdb, database.WorkspaceTable{ + OrganizationID: user.OrganizationID, + OwnerID: user.UserID, + }).WithAgent().Do() + _ = agenttest.New(t, client.URL, r.AgentToken) + resources := coderdtest.AwaitWorkspaceAgents(t, client, r.Workspace.ID) + + // When: the db is marked as unhealthy (i.e. will fail its Ping). + // This needs to be done *after* the server "starts" otherwise it'll fail straight away when trying to initialize. + pdb.MarkUnhealthy() + + // Then: the tailnet controller will continually try to dial the coordination endpoint, exceeding its context timeout. + ctx := testutil.Context(t, testutil.WaitMedium) + conn, err := workspacesdk.New(client).DialAgent(ctx, resources[0].Agents[0].ID, nil) + require.ErrorContains(t, err, codersdk.DatabaseNotReachable) + require.Nil(t, conn) +} + func TestWorkspaceAgentClientCoordinate_BadVersion(t *testing.T) { t.Parallel() client, db := coderdtest.NewWithDatabase(t, nil) @@ -2591,3 +2630,22 @@ func TestAgentConnectionInfo(t *testing.T) { require.True(t, info.DisableDirectConnections) require.True(t, info.DERPForceWebSockets) } + +type pingFailingDB struct { + database.Store + + unhealthy bool +} + +func (p *pingFailingDB) Ping(context.Context) (time.Duration, error) { + if !p.unhealthy { + return time.Nanosecond, nil + } + + // Simulate a database connection error. + return 0, xerrors.New("oops") +} + +func (p *pingFailingDB) MarkUnhealthy() { + p.unhealthy = true +} diff --git a/codersdk/database.go b/codersdk/database.go new file mode 100644 index 0000000000000..1a33da6362e0d --- /dev/null +++ b/codersdk/database.go @@ -0,0 +1,7 @@ +package codersdk + +import "golang.org/x/xerrors" + +const DatabaseNotReachable = "database not reachable" + +var ErrDatabaseNotReachable = xerrors.New(DatabaseNotReachable) diff --git a/codersdk/workspacesdk/dialer.go b/codersdk/workspacesdk/dialer.go index 23d618761b807..0cc2708c1b830 100644 --- a/codersdk/workspacesdk/dialer.go +++ b/codersdk/workspacesdk/dialer.go @@ -11,17 +11,19 @@ import ( "golang.org/x/xerrors" "cdr.dev/slog" + "github.com/coder/websocket" + "github.com/coder/coder/v2/buildinfo" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet/proto" - "github.com/coder/websocket" ) var permanentErrorStatuses = []int{ - http.StatusConflict, // returned if client/agent connections disabled (browser only) - http.StatusBadRequest, // returned if API mismatch - http.StatusNotFound, // returned if user doesn't have permission or agent doesn't exist + http.StatusConflict, // returned if client/agent connections disabled (browser only) + http.StatusBadRequest, // returned if API mismatch + http.StatusNotFound, // returned if user doesn't have permission or agent doesn't exist + http.StatusInternalServerError, // returned if database is not reachable, } type WebsocketDialer struct { @@ -89,6 +91,11 @@ func (w *WebsocketDialer) Dial(ctx context.Context, r tailnet.ResumeTokenControl "Ensure your client release version (%s, different than the API version) matches the server release version", buildinfo.Version()) } + + if sdkErr.Message == codersdk.DatabaseNotReachable && + sdkErr.StatusCode() == http.StatusInternalServerError { + err = xerrors.Errorf("%s: %w", codersdk.DatabaseNotReachable, err) + } } w.connected <- err return tailnet.ControlProtocolClients{}, err diff --git a/site/src/api/typesGenerated.ts b/site/src/api/typesGenerated.ts index d1f38243988a3..efc54813c85a8 100644 --- a/site/src/api/typesGenerated.ts +++ b/site/src/api/typesGenerated.ts @@ -589,6 +589,9 @@ export interface DangerousConfig { readonly allow_all_cors: boolean; } +// From codersdk/database.go +export const DatabaseNotReachable = "database not reachable"; + // From healthsdk/healthsdk.go export interface DatabaseReport extends BaseReport { readonly healthy: boolean; diff --git a/tailnet/controllers.go b/tailnet/controllers.go index 1d2a348b985f3..c751f82477259 100644 --- a/tailnet/controllers.go +++ b/tailnet/controllers.go @@ -21,11 +21,12 @@ import ( "tailscale.com/util/dnsname" "cdr.dev/slog" + "github.com/coder/quartz" + "github.com/coder/retry" + "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/tailnet/proto" - "github.com/coder/quartz" - "github.com/coder/retry" ) // A Controller connects to the tailnet control plane, and then uses the control protocols to @@ -1381,6 +1382,14 @@ func (c *Controller) Run(ctx context.Context) { if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) { return } + + // If the database is unreachable by the control plane, there's not much we can do, so we'll just retry later. + if strings.Contains(err.Error(), codersdk.DatabaseNotReachable) { + c.logger.Warn(c.ctx, "control plane lost connection to database, retrying", + slog.Error(err), slog.F("retry_in_ms", retrier.Delay.Milliseconds())) + continue + } + errF := slog.Error(err) var sdkErr *codersdk.Error if xerrors.As(err, &sdkErr) { From 0136b70667cd2cf70c58347b18ab8ae7438928b3 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 11 Apr 2025 14:49:29 +0000 Subject: [PATCH 3/7] Add len checks for returned resources Signed-off-by: Danny Kopping --- coderd/workspaceagents_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 94c73da74816a..9a0f1f858a283 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -564,6 +564,8 @@ func TestWorkspaceAgentDialFailure(t *testing.T) { }).WithAgent().Do() _ = agenttest.New(t, client.URL, r.AgentToken) resources := coderdtest.AwaitWorkspaceAgents(t, client, r.Workspace.ID) + require.Len(t, resources, 1) + require.Len(t, resources[0].Agents, 1) // When: the db is marked as unhealthy (i.e. will fail its Ping). // This needs to be done *after* the server "starts" otherwise it'll fail straight away when trying to initialize. From f92e85281333bb958e8500839c87069cdc8dbe7a Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Mon, 14 Apr 2025 09:20:26 +0000 Subject: [PATCH 4/7] Reset retrier after each successful job acquisition Signed-off-by: Danny Kopping --- provisionerd/provisionerd.go | 11 +++++++++-- tailnet/controllers.go | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/provisionerd/provisionerd.go b/provisionerd/provisionerd.go index 2df7f9e78db87..59c028ea204e5 100644 --- a/provisionerd/provisionerd.go +++ b/provisionerd/provisionerd.go @@ -301,8 +301,15 @@ func (p *Server) acquireLoop() { return } err := p.acquireAndRunOne(client) - if err != nil && ctx.Err() == nil { // Only log if context is not done. - p.opts.Logger.Debug(ctx, "retrying to acquire job", slog.F("retry_in_ms", retrier.Delay.Milliseconds()), slog.Error(err)) + if err != nil { // Only log if context is not done. + // Short-circuit: don't wait for the retry delay to exit, if required. + if p.acquireExit() { + return + } + p.opts.Logger.Warn(ctx, "failed to acquire job, retrying...", slog.F("delay", fmt.Sprintf("%vms", retrier.Delay.Milliseconds())), slog.Error(err)) + } else { + // Reset the retrier after each successful acquisition. + retrier.Reset() } } } diff --git a/tailnet/controllers.go b/tailnet/controllers.go index c751f82477259..cc29ceb134cd0 100644 --- a/tailnet/controllers.go +++ b/tailnet/controllers.go @@ -1386,7 +1386,7 @@ func (c *Controller) Run(ctx context.Context) { // If the database is unreachable by the control plane, there's not much we can do, so we'll just retry later. if strings.Contains(err.Error(), codersdk.DatabaseNotReachable) { c.logger.Warn(c.ctx, "control plane lost connection to database, retrying", - slog.Error(err), slog.F("retry_in_ms", retrier.Delay.Milliseconds())) + slog.Error(err), slog.F("delay", fmt.Sprintf("%vms", retrier.Delay.Milliseconds()))) continue } From 4b0fd6a6c68642a9ae49408a40154ef6c4b8471b Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Mon, 14 Apr 2025 09:35:26 +0000 Subject: [PATCH 5/7] Wrap received error into codersdk.ErrDatabaseNotReachable This has a downside of losing the details of the received error, but in this case it seems justified since we need to conditionalize responses based on codersdk.ErrDatabaseNotReachable Signed-off-by: Danny Kopping --- coderd/tailnet.go | 2 +- coderd/workspaceagents_test.go | 4 ++-- codersdk/workspacesdk/dialer.go | 2 +- tailnet/controllers.go | 3 ++- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/coderd/tailnet.go b/coderd/tailnet.go index c6ff0b870d449..3a19cff9814bb 100644 --- a/coderd/tailnet.go +++ b/coderd/tailnet.go @@ -549,7 +549,7 @@ type InmemTailnetDialer struct { func (a *InmemTailnetDialer) Dial(ctx context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) { if a.DatabaseHealthcheckFn != nil { if err := a.DatabaseHealthcheckFn(ctx); err != nil { - return tailnet.ControlProtocolClients{}, xerrors.Errorf("%s: %w", codersdk.DatabaseNotReachable, err) + return tailnet.ControlProtocolClients{}, xerrors.Errorf("%w: %v", codersdk.ErrDatabaseNotReachable, err) } } diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 9a0f1f858a283..740691dbcfb37 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -557,7 +557,7 @@ func TestWorkspaceAgentDialFailure(t *testing.T) { }) user := coderdtest.CreateFirstUser(t, client) - // When: a workspace agent is setup and we try dial it. + // Given: a workspace agent is setup. r := dbfake.WorkspaceBuild(t, pdb, database.WorkspaceTable{ OrganizationID: user.OrganizationID, OwnerID: user.UserID, @@ -574,7 +574,7 @@ func TestWorkspaceAgentDialFailure(t *testing.T) { // Then: the tailnet controller will continually try to dial the coordination endpoint, exceeding its context timeout. ctx := testutil.Context(t, testutil.WaitMedium) conn, err := workspacesdk.New(client).DialAgent(ctx, resources[0].Agents[0].ID, nil) - require.ErrorContains(t, err, codersdk.DatabaseNotReachable) + require.ErrorIs(t, err, codersdk.ErrDatabaseNotReachable) require.Nil(t, conn) } diff --git a/codersdk/workspacesdk/dialer.go b/codersdk/workspacesdk/dialer.go index 0cc2708c1b830..71cac0c5f04b1 100644 --- a/codersdk/workspacesdk/dialer.go +++ b/codersdk/workspacesdk/dialer.go @@ -94,7 +94,7 @@ func (w *WebsocketDialer) Dial(ctx context.Context, r tailnet.ResumeTokenControl if sdkErr.Message == codersdk.DatabaseNotReachable && sdkErr.StatusCode() == http.StatusInternalServerError { - err = xerrors.Errorf("%s: %w", codersdk.DatabaseNotReachable, err) + err = xerrors.Errorf("%w: %v", codersdk.ErrDatabaseNotReachable, err) } } w.connected <- err diff --git a/tailnet/controllers.go b/tailnet/controllers.go index cc29ceb134cd0..a257667fbe7a9 100644 --- a/tailnet/controllers.go +++ b/tailnet/controllers.go @@ -2,6 +2,7 @@ package tailnet import ( "context" + "errors" "fmt" "io" "maps" @@ -1384,7 +1385,7 @@ func (c *Controller) Run(ctx context.Context) { } // If the database is unreachable by the control plane, there's not much we can do, so we'll just retry later. - if strings.Contains(err.Error(), codersdk.DatabaseNotReachable) { + if errors.Is(err, codersdk.ErrDatabaseNotReachable) { c.logger.Warn(c.ctx, "control plane lost connection to database, retrying", slog.Error(err), slog.F("delay", fmt.Sprintf("%vms", retrier.Delay.Milliseconds()))) continue From 68867da2ecc58eb068b2cfd93e81c5be03084593 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Mon, 14 Apr 2025 10:02:03 +0000 Subject: [PATCH 6/7] Replace DatabaseHealthcheckFn with interface Signed-off-by: Danny Kopping --- coderd/coderd.go | 13 +++++-------- coderd/tailnet.go | 19 ++++++++++++------- coderd/tailnet_test.go | 37 ++++++++++++++++++++++--------------- 3 files changed, 39 insertions(+), 30 deletions(-) diff --git a/coderd/coderd.go b/coderd/coderd.go index a00b2b72f3cce..a5886061ac4dc 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -675,14 +675,11 @@ func New(options *Options) *API { api.Auditor.Store(&options.Auditor) api.TailnetCoordinator.Store(&options.TailnetCoordinator) dialer := &InmemTailnetDialer{ - CoordPtr: &api.TailnetCoordinator, - DERPFn: api.DERPMap, - Logger: options.Logger, - ClientID: uuid.New(), - DatabaseHealthcheckFn: func(ctx context.Context) error { - _, err := api.Database.Ping(ctx) - return err - }, + CoordPtr: &api.TailnetCoordinator, + DERPFn: api.DERPMap, + Logger: options.Logger, + ClientID: uuid.New(), + DatabaseHealthCheck: api.Database, } stn, err := NewServerTailnet(api.ctx, options.Logger, diff --git a/coderd/tailnet.go b/coderd/tailnet.go index 3a19cff9814bb..cfdc667f4da0f 100644 --- a/coderd/tailnet.go +++ b/coderd/tailnet.go @@ -536,19 +536,24 @@ func NewMultiAgentController(ctx context.Context, logger slog.Logger, tracer tra return m } +type Pinger interface { + Ping(context.Context) (time.Duration, error) +} + // InmemTailnetDialer is a tailnet.ControlProtocolDialer that connects to a Coordinator and DERPMap // service running in the same memory space. type InmemTailnetDialer struct { - CoordPtr *atomic.Pointer[tailnet.Coordinator] - DERPFn func() *tailcfg.DERPMap - Logger slog.Logger - ClientID uuid.UUID - DatabaseHealthcheckFn func(ctx context.Context) error + CoordPtr *atomic.Pointer[tailnet.Coordinator] + DERPFn func() *tailcfg.DERPMap + Logger slog.Logger + ClientID uuid.UUID + // DatabaseHealthCheck is used to validate that the store is reachable. + DatabaseHealthCheck Pinger } func (a *InmemTailnetDialer) Dial(ctx context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) { - if a.DatabaseHealthcheckFn != nil { - if err := a.DatabaseHealthcheckFn(ctx); err != nil { + if a.DatabaseHealthCheck != nil { + if _, err := a.DatabaseHealthCheck.Ping(ctx); err != nil { return tailnet.ControlProtocolClients{}, xerrors.Errorf("%w: %v", codersdk.ErrDatabaseNotReachable, err) } } diff --git a/coderd/tailnet_test.go b/coderd/tailnet_test.go index 652a28e30fe38..37392cd70eae8 100644 --- a/coderd/tailnet_test.go +++ b/coderd/tailnet_test.go @@ -11,6 +11,7 @@ import ( "strconv" "sync/atomic" "testing" + "time" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" @@ -365,6 +366,14 @@ func TestServerTailnet_ReverseProxy(t *testing.T) { }) } +type fakePing struct { + err error +} + +func (f *fakePing) Ping(context.Context) (time.Duration, error) { + return time.Duration(0), f.err +} + func TestServerTailnet_Healthcheck(t *testing.T) { t.Parallel() @@ -373,9 +382,8 @@ func TestServerTailnet_Healthcheck(t *testing.T) { t.Parallel() ctx := testutil.Context(t, testutil.WaitMedium) - fn := func(ctx context.Context) error { return nil } - agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthcheckFn(fn)) + agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthChecker(&fakePing{})) a := agents[0] conn, release, err := serverTailnet.AgentConn(ctx, a.id) @@ -391,9 +399,8 @@ func TestServerTailnet_Healthcheck(t *testing.T) { t.Parallel() ctx := testutil.Context(t, testutil.WaitMedium) - fn := func(ctx context.Context) error { return xerrors.Errorf("oops, db gone") } - agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthcheckFn(fn)) + agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthChecker(&fakePing{err: xerrors.New("oops")})) a := agents[0] _, release, err := serverTailnet.AgentConn(ctx, a.id) @@ -427,13 +434,13 @@ type agentWithID struct { } type serverOption struct { - HealthcheckFn func(ctx context.Context) error + HealthCheck coderd.Pinger DERPAndStunOptions []tailnettest.DERPAndStunOption } -func withHealthcheckFn(fn func(ctx context.Context) error) serverOption { +func withHealthChecker(p coderd.Pinger) serverOption { return serverOption{ - HealthcheckFn: fn, + HealthCheck: p, } } @@ -446,12 +453,12 @@ func withDERPAndStunOptions(opts ...tailnettest.DERPAndStunOption) serverOption func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...serverOption) ([]agentWithID, *coderd.ServerTailnet) { logger := testutil.Logger(t) - var healthcheckFn func(ctx context.Context) error + var healthChecker coderd.Pinger var derpAndStunOptions []tailnettest.DERPAndStunOption for _, opt := range opts { derpAndStunOptions = append(derpAndStunOptions, opt.DERPAndStunOptions...) - if opt.HealthcheckFn != nil { - healthcheckFn = opt.HealthcheckFn + if opt.HealthCheck != nil { + healthChecker = opt.HealthCheck } } @@ -495,11 +502,11 @@ func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...serverOption) ( } dialer := &coderd.InmemTailnetDialer{ - CoordPtr: &coordPtr, - DERPFn: func() *tailcfg.DERPMap { return derpMap }, - Logger: logger, - ClientID: uuid.UUID{5}, - DatabaseHealthcheckFn: healthcheckFn, + CoordPtr: &coordPtr, + DERPFn: func() *tailcfg.DERPMap { return derpMap }, + Logger: logger, + ClientID: uuid.UUID{5}, + DatabaseHealthCheck: healthChecker, } serverTailnet, err := coderd.NewServerTailnet( context.Background(), From 6f60cbc1398d884a90836a8acbae368cebd6c4cc Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Mon, 14 Apr 2025 11:59:50 +0000 Subject: [PATCH 7/7] Review suggestions Signed-off-by: Danny Kopping --- coderd/tailnet_test.go | 107 ++++++++------------- coderd/workspaceagents_test.go | 60 ------------ codersdk/workspacesdk/workspacesdk_test.go | 35 +++++++ provisionerd/provisionerd.go | 10 +- 4 files changed, 78 insertions(+), 134 deletions(-) diff --git a/coderd/tailnet_test.go b/coderd/tailnet_test.go index 37392cd70eae8..28265404c3eae 100644 --- a/coderd/tailnet_test.go +++ b/coderd/tailnet_test.go @@ -27,6 +27,7 @@ import ( "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/coderd" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" + "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/tailnet" @@ -58,7 +59,8 @@ func TestServerTailnet_AgentConn_NoSTUN(t *testing.T) { defer cancel() // Connect through the ServerTailnet - agents, serverTailnet := setupServerTailnetAgent(t, 1, withDERPAndStunOptions(tailnettest.DisableSTUN, tailnettest.DERPIsEmbedded)) + agents, serverTailnet := setupServerTailnetAgent(t, 1, + tailnettest.DisableSTUN, tailnettest.DERPIsEmbedded) a := agents[0] conn, release, err := serverTailnet.AgentConn(ctx, a.id) @@ -341,7 +343,7 @@ func TestServerTailnet_ReverseProxy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - agents, serverTailnet := setupServerTailnetAgent(t, 1, withDERPAndStunOptions(tailnettest.DisableSTUN)) + agents, serverTailnet := setupServerTailnetAgent(t, 1, tailnettest.DisableSTUN) a := agents[0] require.True(t, serverTailnet.Conn().GetBlockEndpoints(), "expected BlockEndpoints to be set") @@ -366,47 +368,42 @@ func TestServerTailnet_ReverseProxy(t *testing.T) { }) } -type fakePing struct { - err error -} - -func (f *fakePing) Ping(context.Context) (time.Duration, error) { - return time.Duration(0), f.err -} - -func TestServerTailnet_Healthcheck(t *testing.T) { +func TestDialFailure(t *testing.T) { t.Parallel() - // Verifies that a non-nil healthcheck which returns a non-error response behaves as expected. - t.Run("Passing", func(t *testing.T) { - t.Parallel() - - ctx := testutil.Context(t, testutil.WaitMedium) - - agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthChecker(&fakePing{})) + // Setup. + ctx := testutil.Context(t, testutil.WaitShort) + logger := testutil.Logger(t) - a := agents[0] - conn, release, err := serverTailnet.AgentConn(ctx, a.id) - t.Cleanup(release) - require.NoError(t, err) - assert.True(t, conn.AwaitReachable(ctx)) + // Given: a tailnet coordinator. + coord := tailnet.NewCoordinator(logger) + t.Cleanup(func() { + _ = coord.Close() }) + coordPtr := atomic.Pointer[tailnet.Coordinator]{} + coordPtr.Store(&coord) - // If the healthcheck fails, we have no insight into this at this level. - // The dial against the control plane is retried, so we wait for the context to timeout as an indication that the - // healthcheck is performing as expected. - t.Run("Failing", func(t *testing.T) { - t.Parallel() + // Given: a fake DB healthchecker which will always fail. + fch := &failingHealthcheck{} + + // When: dialing the in-memory coordinator. + dialer := &coderd.InmemTailnetDialer{ + CoordPtr: &coordPtr, + Logger: logger, + ClientID: uuid.UUID{5}, + DatabaseHealthCheck: fch, + } + _, err := dialer.Dial(ctx, nil) - ctx := testutil.Context(t, testutil.WaitMedium) + // Then: the error returned reflects the database has failed its healthcheck. + require.ErrorIs(t, err, codersdk.ErrDatabaseNotReachable) +} - agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthChecker(&fakePing{err: xerrors.New("oops")})) +type failingHealthcheck struct{} - a := agents[0] - _, release, err := serverTailnet.AgentConn(ctx, a.id) - require.Nil(t, release) - require.ErrorContains(t, err, "agent is unreachable") - }) +func (failingHealthcheck) Ping(context.Context) (time.Duration, error) { + // Simulate a database connection error. + return 0, xerrors.New("oops") } type wrappedListener struct { @@ -433,36 +430,9 @@ type agentWithID struct { agent.Agent } -type serverOption struct { - HealthCheck coderd.Pinger - DERPAndStunOptions []tailnettest.DERPAndStunOption -} - -func withHealthChecker(p coderd.Pinger) serverOption { - return serverOption{ - HealthCheck: p, - } -} - -func withDERPAndStunOptions(opts ...tailnettest.DERPAndStunOption) serverOption { - return serverOption{ - DERPAndStunOptions: opts, - } -} - -func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...serverOption) ([]agentWithID, *coderd.ServerTailnet) { +func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...tailnettest.DERPAndStunOption) ([]agentWithID, *coderd.ServerTailnet) { logger := testutil.Logger(t) - - var healthChecker coderd.Pinger - var derpAndStunOptions []tailnettest.DERPAndStunOption - for _, opt := range opts { - derpAndStunOptions = append(derpAndStunOptions, opt.DERPAndStunOptions...) - if opt.HealthCheck != nil { - healthChecker = opt.HealthCheck - } - } - - derpMap, derpServer := tailnettest.RunDERPAndSTUN(t, derpAndStunOptions...) + derpMap, derpServer := tailnettest.RunDERPAndSTUN(t, opts...) coord := tailnet.NewCoordinator(logger) t.Cleanup(func() { @@ -502,11 +472,10 @@ func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...serverOption) ( } dialer := &coderd.InmemTailnetDialer{ - CoordPtr: &coordPtr, - DERPFn: func() *tailcfg.DERPMap { return derpMap }, - Logger: logger, - ClientID: uuid.UUID{5}, - DatabaseHealthCheck: healthChecker, + CoordPtr: &coordPtr, + DERPFn: func() *tailcfg.DERPMap { return derpMap }, + Logger: logger, + ClientID: uuid.UUID{5}, } serverTailnet, err := coderd.NewServerTailnet( context.Background(), diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 740691dbcfb37..a8fe7718f4385 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -45,7 +45,6 @@ import ( "github.com/coder/coder/v2/coderd/database/dbfake" "github.com/coder/coder/v2/coderd/database/dbgen" "github.com/coder/coder/v2/coderd/database/dbmem" - "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/externalauth" @@ -538,46 +537,6 @@ func TestWorkspaceAgentTailnet(t *testing.T) { require.Equal(t, "test", strings.TrimSpace(string(output))) } -// TestWorkspaceAgentDialFailure validates that the tailnet controller will retry connecting to the control plane until -// its context times out, when the dialer fails its healthcheck. -func TestWorkspaceAgentDialFailure(t *testing.T) { - t.Parallel() - - store, ps := dbtestutil.NewDB(t) - - // Given: a database which will fail its Ping(ctx) call. - // NOTE: The Ping(ctx) call is made by the Dialer. - pdb := &pingFailingDB{ - Store: store, - } - client := coderdtest.New(t, &coderdtest.Options{ - Database: pdb, - Pubsub: ps, - IncludeProvisionerDaemon: true, - }) - user := coderdtest.CreateFirstUser(t, client) - - // Given: a workspace agent is setup. - r := dbfake.WorkspaceBuild(t, pdb, database.WorkspaceTable{ - OrganizationID: user.OrganizationID, - OwnerID: user.UserID, - }).WithAgent().Do() - _ = agenttest.New(t, client.URL, r.AgentToken) - resources := coderdtest.AwaitWorkspaceAgents(t, client, r.Workspace.ID) - require.Len(t, resources, 1) - require.Len(t, resources[0].Agents, 1) - - // When: the db is marked as unhealthy (i.e. will fail its Ping). - // This needs to be done *after* the server "starts" otherwise it'll fail straight away when trying to initialize. - pdb.MarkUnhealthy() - - // Then: the tailnet controller will continually try to dial the coordination endpoint, exceeding its context timeout. - ctx := testutil.Context(t, testutil.WaitMedium) - conn, err := workspacesdk.New(client).DialAgent(ctx, resources[0].Agents[0].ID, nil) - require.ErrorIs(t, err, codersdk.ErrDatabaseNotReachable) - require.Nil(t, conn) -} - func TestWorkspaceAgentClientCoordinate_BadVersion(t *testing.T) { t.Parallel() client, db := coderdtest.NewWithDatabase(t, nil) @@ -2632,22 +2591,3 @@ func TestAgentConnectionInfo(t *testing.T) { require.True(t, info.DisableDirectConnections) require.True(t, info.DERPForceWebSockets) } - -type pingFailingDB struct { - database.Store - - unhealthy bool -} - -func (p *pingFailingDB) Ping(context.Context) (time.Duration, error) { - if !p.unhealthy { - return time.Nanosecond, nil - } - - // Simulate a database connection error. - return 0, xerrors.New("oops") -} - -func (p *pingFailingDB) MarkUnhealthy() { - p.unhealthy = true -} diff --git a/codersdk/workspacesdk/workspacesdk_test.go b/codersdk/workspacesdk/workspacesdk_test.go index 317db4471319f..e7ccd96e208fa 100644 --- a/codersdk/workspacesdk/workspacesdk_test.go +++ b/codersdk/workspacesdk/workspacesdk_test.go @@ -1,13 +1,21 @@ package workspacesdk_test import ( + "net/http" + "net/http/httptest" "net/url" "testing" "github.com/stretchr/testify/require" "tailscale.com/tailcfg" + "github.com/coder/websocket" + + "github.com/coder/coder/v2/coderd/httpapi" + "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/coder/coder/v2/codersdk/workspacesdk" + "github.com/coder/coder/v2/testutil" ) func TestWorkspaceRewriteDERPMap(t *testing.T) { @@ -37,3 +45,30 @@ func TestWorkspaceRewriteDERPMap(t *testing.T) { require.Equal(t, "coconuts.org", node.HostName) require.Equal(t, 44558, node.DERPPort) } + +func TestWorkspaceDialerFailure(t *testing.T) { + t.Parallel() + + // Setup. + ctx := testutil.Context(t, testutil.WaitShort) + logger := testutil.Logger(t) + + // Given: a mock HTTP server which mimicks an unreachable database when calling the coordination endpoint. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{ + Message: codersdk.DatabaseNotReachable, + Detail: "oops", + }) + })) + t.Cleanup(srv.Close) + + u, err := url.Parse(srv.URL) + require.NoError(t, err) + + // When: calling the coordination endpoint. + dialer := workspacesdk.NewWebsocketDialer(logger, u, &websocket.DialOptions{}) + _, err = dialer.Dial(ctx, nil) + + // Then: an error indicating a database issue is returned, to conditionalize the behavior of the caller. + require.ErrorIs(t, err, codersdk.ErrDatabaseNotReachable) +} diff --git a/provisionerd/provisionerd.go b/provisionerd/provisionerd.go index 59c028ea204e5..9adf9951fa488 100644 --- a/provisionerd/provisionerd.go +++ b/provisionerd/provisionerd.go @@ -301,12 +301,12 @@ func (p *Server) acquireLoop() { return } err := p.acquireAndRunOne(client) - if err != nil { // Only log if context is not done. + if err != nil && ctx.Err() == nil { // Only log if context is not done. // Short-circuit: don't wait for the retry delay to exit, if required. if p.acquireExit() { return } - p.opts.Logger.Warn(ctx, "failed to acquire job, retrying...", slog.F("delay", fmt.Sprintf("%vms", retrier.Delay.Milliseconds())), slog.Error(err)) + p.opts.Logger.Warn(ctx, "failed to acquire job, retrying", slog.F("delay", fmt.Sprintf("%vms", retrier.Delay.Milliseconds())), slog.Error(err)) } else { // Reset the retrier after each successful acquisition. retrier.Reset() @@ -346,7 +346,7 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) erro } if job.JobId == "" { p.opts.Logger.Debug(ctx, "acquire job successfully canceled") - return xerrors.New("canceled") + return nil } if len(job.TraceMetadata) > 0 { @@ -401,9 +401,9 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) erro Error: fmt.Sprintf("failed to connect to provisioner: %s", resp.Error), }) if err != nil { - p.opts.Logger.Error(ctx, "provisioner job failed", slog.F("job_id", job.JobId), slog.Error(err)) + p.opts.Logger.Error(ctx, "failed to report provisioner job failed", slog.F("job_id", job.JobId), slog.Error(err)) } - return xerrors.Errorf("provisioner job failed: %w", err) + return xerrors.Errorf("failed to report provisioner job failed: %w", err) } p.mutex.Lock()