Skip to content

Commit 627232e

Browse files
authored
fix: fix pgcoord to delete coordinator row last (coder#12155)
Fixes coder#12141 Fixes coder#11750 PGCoord shutdown was uncoordinated, so an update at an inopportune time during shutdown would be rejected because the coordinator row was already deleted. This PR ensures that the PGCoord subcomponents that write updates are shut down before we take down the heartbeats, which is responsible for deleting the coordinator row.
1 parent 7a45360 commit 627232e

File tree

1 file changed

+25
-1
lines changed

1 file changed

+25
-1
lines changed

enterprise/tailnet/pgcoord.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ func newPGCoordInternal(
129129
// signals when first heartbeat has been sent, so it's safe to start binding.
130130
fHB := make(chan struct{})
131131

132+
// we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
133+
// the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
134+
// still running, they could run afoul of foreign key constraints.
135+
querierCtx, querierCancel := context.WithCancel(dbauthz.As(context.Background(), pgCoordSubject))
132136
c := &pgCoord{
133137
ctx: ctx,
134138
cancel: cancel,
@@ -142,9 +146,17 @@ func newPGCoordInternal(
142146
tunneler: newTunneler(ctx, logger, id, store, sCh, fHB),
143147
tunnelerCh: sCh,
144148
id: id,
145-
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB),
149+
querier: newQuerier(querierCtx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB),
146150
closed: make(chan struct{}),
147151
}
152+
go func() {
153+
// when the main context is canceled, or the coordinator closed, the binder and tunneler
154+
// always eventually stop. Once they stop it's safe to cancel the querier context, which
155+
// has the effect of deleting the coordinator from the database and ceasing heartbeats.
156+
c.binder.workerWG.Wait()
157+
c.tunneler.workerWG.Wait()
158+
querierCancel()
159+
}()
148160
logger.Info(ctx, "starting coordinator")
149161
return c, nil
150162
}
@@ -255,6 +267,8 @@ type tunneler struct {
255267
mu sync.Mutex
256268
latest map[uuid.UUID]map[uuid.UUID]tunnel
257269
workQ *workQ[tKey]
270+
271+
workerWG sync.WaitGroup
258272
}
259273

260274
func newTunneler(ctx context.Context,
@@ -274,6 +288,9 @@ func newTunneler(ctx context.Context,
274288
workQ: newWorkQ[tKey](ctx),
275289
}
276290
go s.handle()
291+
// add to the waitgroup immediately to avoid any races waiting for it before
292+
// the workers start.
293+
s.workerWG.Add(numTunnelerWorkers)
277294
go func() {
278295
<-startWorkers
279296
for i := 0; i < numTunnelerWorkers; i++ {
@@ -297,6 +314,7 @@ func (t *tunneler) handle() {
297314
}
298315

299316
func (t *tunneler) worker() {
317+
defer t.workerWG.Done()
300318
eb := backoff.NewExponentialBackOff()
301319
eb.MaxElapsedTime = 0 // retry indefinitely
302320
eb.MaxInterval = dbMaxBackoff
@@ -435,6 +453,8 @@ type binder struct {
435453
mu sync.Mutex
436454
latest map[bKey]binding
437455
workQ *workQ[bKey]
456+
457+
workerWG sync.WaitGroup
438458
}
439459

440460
func newBinder(ctx context.Context,
@@ -454,6 +474,9 @@ func newBinder(ctx context.Context,
454474
workQ: newWorkQ[bKey](ctx),
455475
}
456476
go b.handleBindings()
477+
// add to the waitgroup immediately to avoid any races waiting for it before
478+
// the workers start.
479+
b.workerWG.Add(numBinderWorkers)
457480
go func() {
458481
<-startWorkers
459482
for i := 0; i < numBinderWorkers; i++ {
@@ -477,6 +500,7 @@ func (b *binder) handleBindings() {
477500
}
478501

479502
func (b *binder) worker() {
503+
defer b.workerWG.Done()
480504
eb := backoff.NewExponentialBackOff()
481505
eb.MaxElapsedTime = 0 // retry indefinitely
482506
eb.MaxInterval = dbMaxBackoff

0 commit comments

Comments
 (0)