Skip to content

fix: reduce excessive logging when database is unreachable #17363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 15, 2025
4 changes: 4 additions & 0 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,10 @@ func New(options *Options) *API {
DERPFn: api.DERPMap,
Logger: options.Logger,
ClientID: uuid.New(),
DatabaseHealthcheckFn: func(ctx context.Context) error {
_, err := api.Database.Ping(ctx)
return err
},
}
stn, err := NewServerTailnet(api.ctx,
options.Logger,
Expand Down
19 changes: 14 additions & 5 deletions coderd/tailnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"tailscale.com/tailcfg"

"cdr.dev/slog"

"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspaceapps/appurl"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/site"
"github.com/coder/coder/v2/tailnet"
Expand Down Expand Up @@ -537,13 +539,20 @@ func NewMultiAgentController(ctx context.Context, logger slog.Logger, tracer tra
// InmemTailnetDialer is a tailnet.ControlProtocolDialer that connects to a Coordinator and DERPMap
// service running in the same memory space.
type InmemTailnetDialer struct {
CoordPtr *atomic.Pointer[tailnet.Coordinator]
DERPFn func() *tailcfg.DERPMap
Logger slog.Logger
ClientID uuid.UUID
CoordPtr *atomic.Pointer[tailnet.Coordinator]
DERPFn func() *tailcfg.DERPMap
Logger slog.Logger
ClientID uuid.UUID
DatabaseHealthcheckFn func(ctx context.Context) error
}

func (a *InmemTailnetDialer) Dial(_ context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) {
func (a *InmemTailnetDialer) Dial(ctx context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) {
if a.DatabaseHealthcheckFn != nil {
if err := a.DatabaseHealthcheckFn(ctx); err != nil {
return tailnet.ControlProtocolClients{}, xerrors.Errorf("%s: %w", codersdk.DatabaseNotReachable, err)
}
}

coord := a.CoordPtr.Load()
if coord == nil {
return tailnet.ControlProtocolClients{}, xerrors.Errorf("tailnet coordinator not initialized")
Expand Down
83 changes: 74 additions & 9 deletions coderd/tailnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"
"tailscale.com/tailcfg"

"github.com/coder/coder/v2/agent"
Expand Down Expand Up @@ -56,8 +57,7 @@ func TestServerTailnet_AgentConn_NoSTUN(t *testing.T) {
defer cancel()

// Connect through the ServerTailnet
agents, serverTailnet := setupServerTailnetAgent(t, 1,
tailnettest.DisableSTUN, tailnettest.DERPIsEmbedded)
agents, serverTailnet := setupServerTailnetAgent(t, 1, withDERPAndStunOptions(tailnettest.DisableSTUN, tailnettest.DERPIsEmbedded))
a := agents[0]

conn, release, err := serverTailnet.AgentConn(ctx, a.id)
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestServerTailnet_ReverseProxy(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

agents, serverTailnet := setupServerTailnetAgent(t, 1, tailnettest.DisableSTUN)
agents, serverTailnet := setupServerTailnetAgent(t, 1, withDERPAndStunOptions(tailnettest.DisableSTUN))
a := agents[0]

require.True(t, serverTailnet.Conn().GetBlockEndpoints(), "expected BlockEndpoints to be set")
Expand All @@ -365,6 +365,43 @@ func TestServerTailnet_ReverseProxy(t *testing.T) {
})
}

func TestServerTailnet_Healthcheck(t *testing.T) {
t.Parallel()

// Verifies that a non-nil healthcheck which returns a non-error response behaves as expected.
t.Run("Passing", func(t *testing.T) {
t.Parallel()

ctx := testutil.Context(t, testutil.WaitMedium)
fn := func(ctx context.Context) error { return nil }

agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthcheckFn(fn))

a := agents[0]
conn, release, err := serverTailnet.AgentConn(ctx, a.id)
t.Cleanup(release)
require.NoError(t, err)
assert.True(t, conn.AwaitReachable(ctx))
})

// If the healthcheck fails, we have no insight into this at this level.
// The dial against the control plane is retried, so we wait for the context to timeout as an indication that the
// healthcheck is performing as expected.
t.Run("Failing", func(t *testing.T) {
t.Parallel()

ctx := testutil.Context(t, testutil.WaitMedium)
fn := func(ctx context.Context) error { return xerrors.Errorf("oops, db gone") }

agents, serverTailnet := setupServerTailnetAgent(t, 1, withHealthcheckFn(fn))

a := agents[0]
_, release, err := serverTailnet.AgentConn(ctx, a.id)
require.Nil(t, release)
require.ErrorContains(t, err, "agent is unreachable")
})
}

type wrappedListener struct {
net.Listener
dials int32
Expand All @@ -389,9 +426,36 @@ type agentWithID struct {
agent.Agent
}

func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...tailnettest.DERPAndStunOption) ([]agentWithID, *coderd.ServerTailnet) {
type serverOption struct {
HealthcheckFn func(ctx context.Context) error
DERPAndStunOptions []tailnettest.DERPAndStunOption
}

func withHealthcheckFn(fn func(ctx context.Context) error) serverOption {
return serverOption{
HealthcheckFn: fn,
}
}

func withDERPAndStunOptions(opts ...tailnettest.DERPAndStunOption) serverOption {
return serverOption{
DERPAndStunOptions: opts,
}
}

func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...serverOption) ([]agentWithID, *coderd.ServerTailnet) {
logger := testutil.Logger(t)
derpMap, derpServer := tailnettest.RunDERPAndSTUN(t, opts...)

var healthcheckFn func(ctx context.Context) error
var derpAndStunOptions []tailnettest.DERPAndStunOption
for _, opt := range opts {
derpAndStunOptions = append(derpAndStunOptions, opt.DERPAndStunOptions...)
if opt.HealthcheckFn != nil {
healthcheckFn = opt.HealthcheckFn
}
}

derpMap, derpServer := tailnettest.RunDERPAndSTUN(t, derpAndStunOptions...)

coord := tailnet.NewCoordinator(logger)
t.Cleanup(func() {
Expand Down Expand Up @@ -431,10 +495,11 @@ func setupServerTailnetAgent(t *testing.T, agentNum int, opts ...tailnettest.DER
}

dialer := &coderd.InmemTailnetDialer{
CoordPtr: &coordPtr,
DERPFn: func() *tailcfg.DERPMap { return derpMap },
Logger: logger,
ClientID: uuid.UUID{5},
CoordPtr: &coordPtr,
DERPFn: func() *tailcfg.DERPMap { return derpMap },
Logger: logger,
ClientID: uuid.UUID{5},
DatabaseHealthcheckFn: healthcheckFn,
}
serverTailnet, err := coderd.NewServerTailnet(
context.Background(),
Expand Down
10 changes: 10 additions & 0 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,16 @@ func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
func (api *API) workspaceAgentClientCoordinate(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// Ensure the database is reachable before proceeding.
_, err := api.Database.Ping(ctx)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: codersdk.DatabaseNotReachable,
Detail: err.Error(),
})
return
}

// This route accepts user API key auth and workspace proxy auth. The moon actor has
// full permissions so should be able to pass this authz check.
workspace := httpmw.WorkspaceParam(r)
Expand Down
60 changes: 60 additions & 0 deletions coderd/workspaceagents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/coder/coder/v2/coderd/database/dbfake"
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbmem"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
Expand Down Expand Up @@ -537,6 +538,46 @@ func TestWorkspaceAgentTailnet(t *testing.T) {
require.Equal(t, "test", strings.TrimSpace(string(output)))
}

// TestWorkspaceAgentDialFailure validates that the tailnet controller will retry connecting to the control plane until
// its context times out, when the dialer fails its healthcheck.
func TestWorkspaceAgentDialFailure(t *testing.T) {
t.Parallel()

store, ps := dbtestutil.NewDB(t)

// Given: a database which will fail its Ping(ctx) call.
// NOTE: The Ping(ctx) call is made by the Dialer.
pdb := &pingFailingDB{
Store: store,
}
client := coderdtest.New(t, &coderdtest.Options{
Database: pdb,
Pubsub: ps,
IncludeProvisionerDaemon: true,
})
user := coderdtest.CreateFirstUser(t, client)

// When: a workspace agent is setup and we try dial it.
r := dbfake.WorkspaceBuild(t, pdb, database.WorkspaceTable{
OrganizationID: user.OrganizationID,
OwnerID: user.UserID,
}).WithAgent().Do()
_ = agenttest.New(t, client.URL, r.AgentToken)
resources := coderdtest.AwaitWorkspaceAgents(t, client, r.Workspace.ID)
require.Len(t, resources, 1)
require.Len(t, resources[0].Agents, 1)

// When: the db is marked as unhealthy (i.e. will fail its Ping).
// This needs to be done *after* the server "starts" otherwise it'll fail straight away when trying to initialize.
pdb.MarkUnhealthy()

// Then: the tailnet controller will continually try to dial the coordination endpoint, exceeding its context timeout.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is wrong, we don't continually retry because DialAgent only waits until we hit a dial error. Once the first error is returned the test is complete and we tear down the context.

Furthermore, I don't think the SDK DialAgent is really the thing that you care about testing here. It doesn't handle the retries anyways, tailnet does. Maybe simplify this and just use the WebsocketDialer and ensure it returns an error.

ctx := testutil.Context(t, testutil.WaitMedium)
conn, err := workspacesdk.New(client).DialAgent(ctx, resources[0].Agents[0].ID, nil)
require.ErrorContains(t, err, codersdk.DatabaseNotReachable)
require.Nil(t, conn)
}

func TestWorkspaceAgentClientCoordinate_BadVersion(t *testing.T) {
t.Parallel()
client, db := coderdtest.NewWithDatabase(t, nil)
Expand Down Expand Up @@ -2591,3 +2632,22 @@ func TestAgentConnectionInfo(t *testing.T) {
require.True(t, info.DisableDirectConnections)
require.True(t, info.DERPForceWebSockets)
}

type pingFailingDB struct {
database.Store

unhealthy bool
}

func (p *pingFailingDB) Ping(context.Context) (time.Duration, error) {
if !p.unhealthy {
return time.Nanosecond, nil
}

// Simulate a database connection error.
return 0, xerrors.New("oops")
}

func (p *pingFailingDB) MarkUnhealthy() {
p.unhealthy = true
}
7 changes: 7 additions & 0 deletions codersdk/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package codersdk

import "golang.org/x/xerrors"

const DatabaseNotReachable = "database not reachable"

var ErrDatabaseNotReachable = xerrors.New(DatabaseNotReachable)
15 changes: 11 additions & 4 deletions codersdk/workspacesdk/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ import (
"golang.org/x/xerrors"

"cdr.dev/slog"
"github.com/coder/websocket"

"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/websocket"
)

var permanentErrorStatuses = []int{
http.StatusConflict, // returned if client/agent connections disabled (browser only)
http.StatusBadRequest, // returned if API mismatch
http.StatusNotFound, // returned if user doesn't have permission or agent doesn't exist
http.StatusConflict, // returned if client/agent connections disabled (browser only)
http.StatusBadRequest, // returned if API mismatch
http.StatusNotFound, // returned if user doesn't have permission or agent doesn't exist
http.StatusInternalServerError, // returned if database is not reachable,
}

type WebsocketDialer struct {
Expand Down Expand Up @@ -89,6 +91,11 @@ func (w *WebsocketDialer) Dial(ctx context.Context, r tailnet.ResumeTokenControl
"Ensure your client release version (%s, different than the API version) matches the server release version",
buildinfo.Version())
}

if sdkErr.Message == codersdk.DatabaseNotReachable &&
sdkErr.StatusCode() == http.StatusInternalServerError {
err = xerrors.Errorf("%s: %w", codersdk.DatabaseNotReachable, err)
}
}
w.connected <- err
return tailnet.ControlProtocolClients{}, err
Expand Down
21 changes: 13 additions & 8 deletions provisionerd/provisionerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"golang.org/x/xerrors"

"cdr.dev/slog"
"github.com/coder/retry"

"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionerd/runner"
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/retry"
)

// Dialer represents the function to create a daemon client connection.
Expand Down Expand Up @@ -290,7 +291,7 @@ func (p *Server) acquireLoop() {
defer p.wg.Done()
defer func() { close(p.acquireDoneCh) }()
ctx := p.closeContext
for {
for retrier := retry.New(10*time.Millisecond, 1*time.Second); retrier.Wait(ctx); {
if p.acquireExit() {
return
}
Expand All @@ -299,7 +300,10 @@ func (p *Server) acquireLoop() {
p.opts.Logger.Debug(ctx, "shut down before client (re) connected")
return
}
p.acquireAndRunOne(client)
err := p.acquireAndRunOne(client)
if err != nil && ctx.Err() == nil { // Only log if context is not done.
p.opts.Logger.Debug(ctx, "retrying to acquire job", slog.F("retry_in_ms", retrier.Delay.Milliseconds()), slog.Error(err))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-review: acquireAndRunOne already logs its own warning - specifically the provisionerd was unable to acquire job one is logged when the db is unreachable - so Debug is what felt most appropriate to me.

}
}
}

Expand All @@ -318,7 +322,7 @@ func (p *Server) acquireExit() bool {
return false
}

func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) error {
ctx := p.closeContext
p.opts.Logger.Debug(ctx, "start of acquireAndRunOne")
job, err := p.acquireGraceful(client)
Expand All @@ -327,15 +331,15 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
if errors.Is(err, context.Canceled) ||
errors.Is(err, yamux.ErrSessionShutdown) ||
errors.Is(err, fasthttputil.ErrInmemoryListenerClosed) {
return
return err
}

p.opts.Logger.Warn(ctx, "provisionerd was unable to acquire job", slog.Error(err))
return
return xerrors.Errorf("failed to acquire job: %w", err)
}
if job.JobId == "" {
p.opts.Logger.Debug(ctx, "acquire job successfully canceled")
return
return xerrors.New("canceled")
}

if len(job.TraceMetadata) > 0 {
Expand Down Expand Up @@ -392,7 +396,7 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
if err != nil {
p.opts.Logger.Error(ctx, "provisioner job failed", slog.F("job_id", job.JobId), slog.Error(err))
}
return
return xerrors.Errorf("provisioner job failed: %w", err)
}

p.mutex.Lock()
Expand All @@ -416,6 +420,7 @@ func (p *Server) acquireAndRunOne(client proto.DRPCProvisionerDaemonClient) {
p.mutex.Lock()
p.activeJob = nil
p.mutex.Unlock()
return nil
}

// acquireGraceful attempts to acquire a job from the server, handling canceling the acquisition if we gracefully shut
Expand Down
Loading
Loading