Skip to content

Commit 4fc0479

Browse files
authored
fix: avoid deleting peers on graceful close (coder#14165)
* fix: avoid deleting peers on graceful close - Fixes an issue where a coordinator deletes all its peers on shutdown. This can cause disconnects whenever a coderd is redeployed.
1 parent 6f1951e commit 4fc0479

File tree

13 files changed

+328
-102
lines changed

13 files changed

+328
-102
lines changed

coderd/database/dbauthz/dbauthz.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3324,6 +3324,13 @@ func (q *querier) UpdateReplica(ctx context.Context, arg database.UpdateReplicaP
33243324
return q.db.UpdateReplica(ctx, arg)
33253325
}
33263326

3327+
func (q *querier) UpdateTailnetPeerStatusByCoordinator(ctx context.Context, arg database.UpdateTailnetPeerStatusByCoordinatorParams) error {
3328+
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceTailnetCoordinator); err != nil {
3329+
return err
3330+
}
3331+
return q.db.UpdateTailnetPeerStatusByCoordinator(ctx, arg)
3332+
}
3333+
33273334
func (q *querier) UpdateTemplateACLByID(ctx context.Context, arg database.UpdateTemplateACLByIDParams) error {
33283335
fetch := func(ctx context.Context, arg database.UpdateTemplateACLByIDParams) (database.Template, error) {
33293336
return q.db.GetTemplateByID(ctx, arg.ID)

coderd/database/dbauthz/dbauthz_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2152,6 +2152,11 @@ func (s *MethodTestSuite) TestTailnetFunctions() {
21522152
Asserts(rbac.ResourceTailnetCoordinator, policy.ActionCreate).
21532153
Errors(dbmem.ErrUnimplemented)
21542154
}))
2155+
s.Run("UpdateTailnetPeerStatusByCoordinator", s.Subtest(func(_ database.Store, check *expects) {
2156+
check.Args(database.UpdateTailnetPeerStatusByCoordinatorParams{}).
2157+
Asserts(rbac.ResourceTailnetCoordinator, policy.ActionUpdate).
2158+
Errors(dbmem.ErrUnimplemented)
2159+
}))
21552160
}
21562161

21572162
func (s *MethodTestSuite) TestDBCrypt() {

coderd/database/dbmem/dbmem.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7917,6 +7917,10 @@ func (q *FakeQuerier) UpdateReplica(_ context.Context, arg database.UpdateReplic
79177917
return database.Replica{}, sql.ErrNoRows
79187918
}
79197919

7920+
func (*FakeQuerier) UpdateTailnetPeerStatusByCoordinator(context.Context, database.UpdateTailnetPeerStatusByCoordinatorParams) error {
7921+
return ErrUnimplemented
7922+
}
7923+
79207924
func (q *FakeQuerier) UpdateTemplateACLByID(_ context.Context, arg database.UpdateTemplateACLByIDParams) error {
79217925
if err := validateDatabaseType(arg); err != nil {
79227926
return err

coderd/database/dbmetrics/dbmetrics.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbtestutil/db.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type options struct {
3535
dumpOnFailure bool
3636
returnSQLDB func(*sql.DB)
3737
logger slog.Logger
38+
url string
3839
}
3940

4041
type Option func(*options)
@@ -59,6 +60,12 @@ func WithLogger(logger slog.Logger) Option {
5960
}
6061
}
6162

63+
func WithURL(u string) Option {
64+
return func(o *options) {
65+
o.url = u
66+
}
67+
}
68+
6269
func withReturnSQLDB(f func(*sql.DB)) Option {
6370
return func(o *options) {
6471
o.returnSQLDB = f
@@ -92,6 +99,9 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
9299
ps := pubsub.NewInMemory()
93100
if WillUsePostgres() {
94101
connectionURL := os.Getenv("CODER_PG_CONNECTION_URL")
102+
if connectionURL == "" && o.url != "" {
103+
connectionURL = o.url
104+
}
95105
if connectionURL == "" {
96106
var (
97107
err error

coderd/database/querier.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/tailnet.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,14 @@ DO UPDATE SET
149149
updated_at = now() at time zone 'utc'
150150
RETURNING *;
151151

152+
-- name: UpdateTailnetPeerStatusByCoordinator :exec
153+
UPDATE
154+
tailnet_peers
155+
SET
156+
status = $2
157+
WHERE
158+
coordinator_id = $1;
159+
152160
-- name: DeleteTailnetPeer :one
153161
DELETE
154162
FROM tailnet_peers

enterprise/tailnet/pgcoord.go

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,6 @@ func newPGCoordInternal(
144144
// signals when first heartbeat has been sent, so it's safe to start binding.
145145
fHB := make(chan struct{})
146146

147-
// we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
148-
// the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
149-
// still running, they could run afoul of foreign key constraints.
150-
querierCtx, querierCancel := context.WithCancel(dbauthz.As(context.Background(), pgCoordSubject))
151147
c := &pgCoord{
152148
ctx: ctx,
153149
cancel: cancel,
@@ -163,18 +159,9 @@ func newPGCoordInternal(
163159
handshaker: newHandshaker(ctx, logger, id, ps, rfhCh, fHB),
164160
handshakerCh: rfhCh,
165161
id: id,
166-
querier: newQuerier(querierCtx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB, clk),
162+
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB, clk),
167163
closed: make(chan struct{}),
168164
}
169-
go func() {
170-
// when the main context is canceled, or the coordinator closed, the binder, tunneler, and
171-
// handshaker always eventually stop. Once they stop it's safe to cancel the querier context, which
172-
// has the effect of deleting the coordinator from the database and ceasing heartbeats.
173-
c.binder.workerWG.Wait()
174-
c.tunneler.workerWG.Wait()
175-
c.handshaker.workerWG.Wait()
176-
querierCancel()
177-
}()
178165
logger.Info(ctx, "starting coordinator")
179166
return c, nil
180167
}
@@ -239,6 +226,9 @@ func (c *pgCoord) Close() error {
239226
c.cancel()
240227
c.closeOnce.Do(func() { close(c.closed) })
241228
c.querier.wait()
229+
c.binder.wait()
230+
c.tunneler.workerWG.Wait()
231+
c.handshaker.workerWG.Wait()
242232
return nil
243233
}
244234

@@ -485,6 +475,7 @@ type binder struct {
485475
workQ *workQ[bKey]
486476

487477
workerWG sync.WaitGroup
478+
close chan struct{}
488479
}
489480

490481
func newBinder(ctx context.Context,
@@ -502,6 +493,7 @@ func newBinder(ctx context.Context,
502493
bindings: bindings,
503494
latest: make(map[bKey]binding),
504495
workQ: newWorkQ[bKey](ctx),
496+
close: make(chan struct{}),
505497
}
506498
go b.handleBindings()
507499
// add to the waitgroup immediately to avoid any races waiting for it before
@@ -513,14 +505,34 @@ func newBinder(ctx context.Context,
513505
go b.worker()
514506
}
515507
}()
508+
509+
go func() {
510+
defer close(b.close)
511+
<-b.ctx.Done()
512+
b.logger.Debug(b.ctx, "binder exiting, waiting for workers")
513+
514+
b.workerWG.Wait()
515+
516+
b.logger.Debug(b.ctx, "updating peers to lost")
517+
518+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
519+
defer cancel()
520+
err := b.store.UpdateTailnetPeerStatusByCoordinator(ctx, database.UpdateTailnetPeerStatusByCoordinatorParams{
521+
CoordinatorID: b.coordinatorID,
522+
Status: database.TailnetStatusLost,
523+
})
524+
if err != nil {
525+
b.logger.Error(b.ctx, "update peer status to lost", slog.Error(err))
526+
}
527+
}()
516528
return b
517529
}
518530

519531
func (b *binder) handleBindings() {
520532
for {
521533
select {
522534
case <-b.ctx.Done():
523-
b.logger.Debug(b.ctx, "binder exiting", slog.Error(b.ctx.Err()))
535+
b.logger.Debug(b.ctx, "binder exiting")
524536
return
525537
case bnd := <-b.bindings:
526538
b.storeBinding(bnd)
@@ -632,6 +644,10 @@ func (b *binder) retrieveBinding(bk bKey) binding {
632644
return bnd
633645
}
634646

647+
func (b *binder) wait() {
648+
<-b.close
649+
}
650+
635651
// mapper tracks data sent to a peer, and sends updates based on changes read from the database.
636652
type mapper struct {
637653
ctx context.Context
@@ -1646,7 +1662,6 @@ func (h *heartbeats) sendBeats() {
16461662
// send an initial heartbeat so that other coordinators can start using our bindings right away.
16471663
h.sendBeat()
16481664
close(h.firstHeartbeat) // signal binder it can start writing
1649-
defer h.sendDelete()
16501665
tkr := h.clock.TickerFunc(h.ctx, HeartbeatPeriod, func() error {
16511666
h.sendBeat()
16521667
return nil
@@ -1677,17 +1692,6 @@ func (h *heartbeats) sendBeat() {
16771692
h.failedHeartbeats = 0
16781693
}
16791694

1680-
func (h *heartbeats) sendDelete() {
1681-
// here we don't want to use the main context, since it will have been canceled
1682-
ctx := dbauthz.As(context.Background(), pgCoordSubject)
1683-
err := h.store.DeleteCoordinator(ctx, h.self)
1684-
if err != nil {
1685-
h.logger.Error(h.ctx, "failed to send coordinator delete", slog.Error(err))
1686-
return
1687-
}
1688-
h.logger.Debug(h.ctx, "deleted coordinator")
1689-
}
1690-
16911695
func (h *heartbeats) cleanupLoop() {
16921696
defer h.wg.Done()
16931697
h.cleanup()

enterprise/tailnet/pgcoord_internal_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,17 +396,14 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {
396396
UpsertTailnetCoordinator(gomock.Any(), gomock.Any()).
397397
Times(3).
398398
Return(database.TailnetCoordinator{}, xerrors.New("badness"))
399-
mStore.EXPECT().
400-
DeleteCoordinator(gomock.Any(), gomock.Any()).
401-
Times(1).
402-
Return(nil)
403399
// But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is
404400
// unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923
405401

406402
// these cleanup queries run, but we don't care for this test
407403
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil)
408404
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil)
409405
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil)
406+
mStore.EXPECT().UpdateTailnetPeerStatusByCoordinator(gomock.Any(), gomock.Any())
410407

411408
coordinator, err := newPGCoordInternal(ctx, logger, ps, mStore, mClock)
412409
require.NoError(t, err)

0 commit comments

Comments
 (0)