Skip to content

fix: avoid deleting peers on graceful close #14165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Aug 14, 2024
7 changes: 7 additions & 0 deletions coderd/database/dbauthz/dbauthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -3160,6 +3160,13 @@ func (q *querier) UpdateReplica(ctx context.Context, arg database.UpdateReplicaP
return q.db.UpdateReplica(ctx, arg)
}

func (q *querier) UpdateTailnetPeerStatusByCoordinator(ctx context.Context, arg database.UpdateTailnetPeerStatusByCoordinatorParams) error {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceTailnetCoordinator); err != nil {
return err
}
return q.db.UpdateTailnetPeerStatusByCoordinator(ctx, arg)
}

func (q *querier) UpdateTemplateACLByID(ctx context.Context, arg database.UpdateTemplateACLByIDParams) error {
fetch := func(ctx context.Context, arg database.UpdateTemplateACLByIDParams) (database.Template, error) {
return q.db.GetTemplateByID(ctx, arg.ID)
Expand Down
5 changes: 5 additions & 0 deletions coderd/database/dbauthz/dbauthz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,11 @@ func (s *MethodTestSuite) TestTailnetFunctions() {
Asserts(rbac.ResourceTailnetCoordinator, policy.ActionCreate).
Errors(dbmem.ErrUnimplemented)
}))
s.Run("UpdateTailnetPeerStatusByCoordinator", s.Subtest(func(_ database.Store, check *expects) {
check.Args(database.UpdateTailnetPeerStatusByCoordinatorParams{}).
Asserts(rbac.ResourceTailnetCoordinator, policy.ActionUpdate).
Errors(dbmem.ErrUnimplemented)
}))
}

func (s *MethodTestSuite) TestDBCrypt() {
Expand Down
4 changes: 4 additions & 0 deletions coderd/database/dbmem/dbmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -7759,6 +7759,10 @@ func (q *FakeQuerier) UpdateReplica(_ context.Context, arg database.UpdateReplic
return database.Replica{}, sql.ErrNoRows
}

func (*FakeQuerier) UpdateTailnetPeerStatusByCoordinator(context.Context, database.UpdateTailnetPeerStatusByCoordinatorParams) error {
return ErrUnimplemented
}

func (q *FakeQuerier) UpdateTemplateACLByID(_ context.Context, arg database.UpdateTemplateACLByIDParams) error {
if err := validateDatabaseType(arg); err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions coderd/database/dbmetrics/dbmetrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions coderd/database/dbmock/dbmock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions coderd/database/dbtestutil/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type options struct {
dumpOnFailure bool
returnSQLDB func(*sql.DB)
logger slog.Logger
url string
}

type Option func(*options)
Expand All @@ -59,6 +60,12 @@ func WithLogger(logger slog.Logger) Option {
}
}

func WithURL(u string) Option {
return func(o *options) {
o.url = u
}
}

func withReturnSQLDB(f func(*sql.DB)) Option {
return func(o *options) {
o.returnSQLDB = f
Expand Down Expand Up @@ -92,6 +99,9 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
ps := pubsub.NewInMemory()
if WillUsePostgres() {
connectionURL := os.Getenv("CODER_PG_CONNECTION_URL")
if connectionURL == "" && o.url != "" {
connectionURL = o.url
}
if connectionURL == "" {
var (
err error
Expand Down
1 change: 1 addition & 0 deletions coderd/database/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions coderd/database/queries/tailnet.sql
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ DO UPDATE SET
updated_at = now() at time zone 'utc'
RETURNING *;

-- name: UpdateTailnetPeerStatusByCoordinator :exec
UPDATE
tailnet_peers
SET
status = $2
WHERE
coordinator_id = $1;

-- name: DeleteTailnetPeer :one
DELETE
FROM tailnet_peers
Expand Down
29 changes: 11 additions & 18 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,6 @@ func newPGCoordInternal(
// signals when first heartbeat has been sent, so it's safe to start binding.
fHB := make(chan struct{})

// we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
// the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
// still running, they could run afoul of foreign key constraints.
querierCtx, querierCancel := context.WithCancel(dbauthz.As(context.Background(), pgCoordSubject))
c := &pgCoord{
ctx: ctx,
Expand All @@ -168,8 +165,9 @@ func newPGCoordInternal(
}
go func() {
// when the main context is canceled, or the coordinator closed, the binder, tunneler, and
// handshaker always eventually stop. Once they stop it's safe to cancel the querier context, which
// has the effect of deleting the coordinator from the database and ceasing heartbeats.
// handshaker always eventually stop. When the
// binder stops it updates all the peers handled
// by this coordinator to LOST.
c.binder.workerWG.Wait()
c.tunneler.workerWG.Wait()
c.handshaker.workerWG.Wait()
Expand Down Expand Up @@ -520,7 +518,14 @@ func (b *binder) handleBindings() {
for {
select {
case <-b.ctx.Done():
b.logger.Debug(b.ctx, "binder exiting", slog.Error(b.ctx.Err()))
b.logger.Debug(b.ctx, "binder exiting, updating peers to lost", slog.Error(b.ctx.Err()))
err := b.store.UpdateTailnetPeerStatusByCoordinator(context.Background(), database.UpdateTailnetPeerStatusByCoordinatorParams{
CoordinatorID: b.coordinatorID,
Status: database.TailnetStatusLost,
})
if err != nil {
b.logger.Error(b.ctx, "update peer status to lost", slog.Error(err))
}
return
case bnd := <-b.bindings:
b.storeBinding(bnd)
Expand Down Expand Up @@ -1646,7 +1651,6 @@ func (h *heartbeats) sendBeats() {
// send an initial heartbeat so that other coordinators can start using our bindings right away.
h.sendBeat()
close(h.firstHeartbeat) // signal binder it can start writing
defer h.sendDelete()
tkr := h.clock.TickerFunc(h.ctx, HeartbeatPeriod, func() error {
h.sendBeat()
return nil
Expand Down Expand Up @@ -1677,17 +1681,6 @@ func (h *heartbeats) sendBeat() {
h.failedHeartbeats = 0
}

func (h *heartbeats) sendDelete() {
// here we don't want to use the main context, since it will have been canceled
ctx := dbauthz.As(context.Background(), pgCoordSubject)
err := h.store.DeleteCoordinator(ctx, h.self)
if err != nil {
h.logger.Error(h.ctx, "failed to send coordinator delete", slog.Error(err))
return
}
h.logger.Debug(h.ctx, "deleted coordinator")
}

func (h *heartbeats) cleanupLoop() {
defer h.wg.Done()
h.cleanup()
Expand Down
9 changes: 5 additions & 4 deletions enterprise/tailnet/pgcoord_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,6 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {
UpsertTailnetCoordinator(gomock.Any(), gomock.Any()).
Times(3).
Return(database.TailnetCoordinator{}, xerrors.New("badness"))
mStore.EXPECT().
DeleteCoordinator(gomock.Any(), gomock.Any()).
Times(1).
Return(nil)
// But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is
// unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923

Expand All @@ -411,6 +407,11 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {
coordinator, err := newPGCoordInternal(ctx, logger, ps, mStore, mClock)
require.NoError(t, err)

mStore.EXPECT().UpdateTailnetPeerStatusByCoordinator(gomock.Any(), database.UpdateTailnetPeerStatusByCoordinatorParams{
CoordinatorID: coordinator.id,
Status: database.TailnetStatusLost,
})

expectedPeriod := HeartbeatPeriod
tfCall, err := tfTrap.Wait(ctx)
require.NoError(t, err)
Expand Down
Loading
Loading