Skip to content

fix: avoid deleting peers on graceful close #14165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Aug 14, 2024
7 changes: 7 additions & 0 deletions coderd/database/dbauthz/dbauthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -3160,6 +3160,13 @@ func (q *querier) UpdateReplica(ctx context.Context, arg database.UpdateReplicaP
return q.db.UpdateReplica(ctx, arg)
}

func (q *querier) UpdateTailnetPeerStatusByCoordinator(ctx context.Context, arg database.UpdateTailnetPeerStatusByCoordinatorParams) error {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceTailnetCoordinator); err != nil {
return err
}
return q.db.UpdateTailnetPeerStatusByCoordinator(ctx, arg)
}

func (q *querier) UpdateTemplateACLByID(ctx context.Context, arg database.UpdateTemplateACLByIDParams) error {
fetch := func(ctx context.Context, arg database.UpdateTemplateACLByIDParams) (database.Template, error) {
return q.db.GetTemplateByID(ctx, arg.ID)
Expand Down
5 changes: 5 additions & 0 deletions coderd/database/dbauthz/dbauthz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,11 @@ func (s *MethodTestSuite) TestTailnetFunctions() {
Asserts(rbac.ResourceTailnetCoordinator, policy.ActionCreate).
Errors(dbmem.ErrUnimplemented)
}))
s.Run("UpdateTailnetPeerStatusByCoordinator", s.Subtest(func(_ database.Store, check *expects) {
check.Args(database.UpdateTailnetPeerStatusByCoordinatorParams{}).
Asserts(rbac.ResourceTailnetCoordinator, policy.ActionUpdate).
Errors(dbmem.ErrUnimplemented)
}))
}

func (s *MethodTestSuite) TestDBCrypt() {
Expand Down
4 changes: 4 additions & 0 deletions coderd/database/dbmem/dbmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -7759,6 +7759,10 @@ func (q *FakeQuerier) UpdateReplica(_ context.Context, arg database.UpdateReplic
return database.Replica{}, sql.ErrNoRows
}

func (*FakeQuerier) UpdateTailnetPeerStatusByCoordinator(context.Context, database.UpdateTailnetPeerStatusByCoordinatorParams) error {
return ErrUnimplemented
}

func (q *FakeQuerier) UpdateTemplateACLByID(_ context.Context, arg database.UpdateTemplateACLByIDParams) error {
if err := validateDatabaseType(arg); err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions coderd/database/dbmetrics/dbmetrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions coderd/database/dbmock/dbmock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions coderd/database/dbtestutil/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type options struct {
dumpOnFailure bool
returnSQLDB func(*sql.DB)
logger slog.Logger
url string
}

type Option func(*options)
Expand All @@ -59,6 +60,12 @@ func WithLogger(logger slog.Logger) Option {
}
}

func WithURL(u string) Option {
return func(o *options) {
o.url = u
}
}

func withReturnSQLDB(f func(*sql.DB)) Option {
return func(o *options) {
o.returnSQLDB = f
Expand Down Expand Up @@ -92,6 +99,9 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
ps := pubsub.NewInMemory()
if WillUsePostgres() {
connectionURL := os.Getenv("CODER_PG_CONNECTION_URL")
if connectionURL == "" && o.url != "" {
connectionURL = o.url
}
if connectionURL == "" {
var (
err error
Expand Down
1 change: 1 addition & 0 deletions coderd/database/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions coderd/database/queries/tailnet.sql
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ DO UPDATE SET
updated_at = now() at time zone 'utc'
RETURNING *;

-- name: UpdateTailnetPeerStatusByCoordinator :exec
UPDATE
tailnet_peers
SET
status = $2
WHERE
coordinator_id = $1;

-- name: DeleteTailnetPeer :one
DELETE
FROM tailnet_peers
Expand Down
58 changes: 31 additions & 27 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,6 @@ func newPGCoordInternal(
// signals when first heartbeat has been sent, so it's safe to start binding.
fHB := make(chan struct{})

// we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
// the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
// still running, they could run afoul of foreign key constraints.
querierCtx, querierCancel := context.WithCancel(dbauthz.As(context.Background(), pgCoordSubject))
c := &pgCoord{
ctx: ctx,
cancel: cancel,
Expand All @@ -163,18 +159,9 @@ func newPGCoordInternal(
handshaker: newHandshaker(ctx, logger, id, ps, rfhCh, fHB),
handshakerCh: rfhCh,
id: id,
querier: newQuerier(querierCtx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB, clk),
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB, clk),
closed: make(chan struct{}),
}
go func() {
// when the main context is canceled, or the coordinator closed, the binder, tunneler, and
// handshaker always eventually stop. Once they stop it's safe to cancel the querier context, which
// has the effect of deleting the coordinator from the database and ceasing heartbeats.
c.binder.workerWG.Wait()
c.tunneler.workerWG.Wait()
c.handshaker.workerWG.Wait()
querierCancel()
}()
logger.Info(ctx, "starting coordinator")
return c, nil
}
Expand Down Expand Up @@ -239,6 +226,9 @@ func (c *pgCoord) Close() error {
c.cancel()
c.closeOnce.Do(func() { close(c.closed) })
c.querier.wait()
c.binder.wait()
c.tunneler.workerWG.Wait()
c.handshaker.workerWG.Wait()
return nil
}

Expand Down Expand Up @@ -485,6 +475,7 @@ type binder struct {
workQ *workQ[bKey]

workerWG sync.WaitGroup
close chan struct{}
}

func newBinder(ctx context.Context,
Expand All @@ -502,6 +493,7 @@ func newBinder(ctx context.Context,
bindings: bindings,
latest: make(map[bKey]binding),
workQ: newWorkQ[bKey](ctx),
close: make(chan struct{}),
}
go b.handleBindings()
// add to the waitgroup immediately to avoid any races waiting for it before
Expand All @@ -513,14 +505,34 @@ func newBinder(ctx context.Context,
go b.worker()
}
}()

go func() {
defer close(b.close)
<-b.ctx.Done()
b.logger.Debug(b.ctx, "binder exiting, waiting for workers")

b.workerWG.Wait()

b.logger.Debug(b.ctx, "updating peers to lost")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
err := b.store.UpdateTailnetPeerStatusByCoordinator(ctx, database.UpdateTailnetPeerStatusByCoordinatorParams{
CoordinatorID: b.coordinatorID,
Status: database.TailnetStatusLost,
})
if err != nil {
b.logger.Error(b.ctx, "update peer status to lost", slog.Error(err))
}
}()
return b
}

func (b *binder) handleBindings() {
for {
select {
case <-b.ctx.Done():
b.logger.Debug(b.ctx, "binder exiting", slog.Error(b.ctx.Err()))
b.logger.Debug(b.ctx, "binder exiting")
return
case bnd := <-b.bindings:
b.storeBinding(bnd)
Expand Down Expand Up @@ -632,6 +644,10 @@ func (b *binder) retrieveBinding(bk bKey) binding {
return bnd
}

func (b *binder) wait() {
<-b.close
}

// mapper tracks data sent to a peer, and sends updates based on changes read from the database.
type mapper struct {
ctx context.Context
Expand Down Expand Up @@ -1646,7 +1662,6 @@ func (h *heartbeats) sendBeats() {
// send an initial heartbeat so that other coordinators can start using our bindings right away.
h.sendBeat()
close(h.firstHeartbeat) // signal binder it can start writing
defer h.sendDelete()
tkr := h.clock.TickerFunc(h.ctx, HeartbeatPeriod, func() error {
h.sendBeat()
return nil
Expand Down Expand Up @@ -1677,17 +1692,6 @@ func (h *heartbeats) sendBeat() {
h.failedHeartbeats = 0
}

func (h *heartbeats) sendDelete() {
// here we don't want to use the main context, since it will have been canceled
ctx := dbauthz.As(context.Background(), pgCoordSubject)
err := h.store.DeleteCoordinator(ctx, h.self)
if err != nil {
h.logger.Error(h.ctx, "failed to send coordinator delete", slog.Error(err))
return
}
h.logger.Debug(h.ctx, "deleted coordinator")
}

func (h *heartbeats) cleanupLoop() {
defer h.wg.Done()
h.cleanup()
Expand Down
5 changes: 1 addition & 4 deletions enterprise/tailnet/pgcoord_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,17 +396,14 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {
UpsertTailnetCoordinator(gomock.Any(), gomock.Any()).
Times(3).
Return(database.TailnetCoordinator{}, xerrors.New("badness"))
mStore.EXPECT().
DeleteCoordinator(gomock.Any(), gomock.Any()).
Times(1).
Return(nil)
// But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is
// unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923

// these cleanup queries run, but we don't care for this test
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil)
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil)
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil)
mStore.EXPECT().UpdateTailnetPeerStatusByCoordinator(gomock.Any(), gomock.Any())

coordinator, err := newPGCoordInternal(ctx, logger, ps, mStore, mClock)
require.NoError(t, err)
Expand Down
Loading
Loading