@@ -129,6 +129,10 @@ func newPGCoordInternal(
129
129
// signals when first heartbeat has been sent, so it's safe to start binding.
130
130
fHB := make (chan struct {})
131
131
132
+ // we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
133
+ // the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
134
+ // still running, they could run afoul of foreign key constraints.
135
+ querierCtx , querierCancel := context .WithCancel (dbauthz .As (context .Background (), pgCoordSubject ))
132
136
c := & pgCoord {
133
137
ctx : ctx ,
134
138
cancel : cancel ,
@@ -142,9 +146,17 @@ func newPGCoordInternal(
142
146
tunneler : newTunneler (ctx , logger , id , store , sCh , fHB ),
143
147
tunnelerCh : sCh ,
144
148
id : id ,
145
- querier : newQuerier (ctx , logger , id , ps , store , id , cCh , ccCh , numQuerierWorkers , fHB ),
149
+ querier : newQuerier (querierCtx , logger , id , ps , store , id , cCh , ccCh , numQuerierWorkers , fHB ),
146
150
closed : make (chan struct {}),
147
151
}
152
+ go func () {
153
+ // when the main context is canceled, or the coordinator closed, the binder and tunneler
154
+ // always eventually stop. Once they stop it's safe to cancel the querier context, which
155
+ // has the effect of deleting the coordinator from the database and ceasing heartbeats.
156
+ c .binder .workerWG .Wait ()
157
+ c .tunneler .workerWG .Wait ()
158
+ querierCancel ()
159
+ }()
148
160
logger .Info (ctx , "starting coordinator" )
149
161
return c , nil
150
162
}
@@ -255,6 +267,8 @@ type tunneler struct {
255
267
mu sync.Mutex
256
268
latest map [uuid.UUID ]map [uuid.UUID ]tunnel
257
269
workQ * workQ [tKey ]
270
+
271
+ workerWG sync.WaitGroup
258
272
}
259
273
260
274
func newTunneler (ctx context.Context ,
@@ -274,6 +288,9 @@ func newTunneler(ctx context.Context,
274
288
workQ : newWorkQ [tKey ](ctx ),
275
289
}
276
290
go s .handle ()
291
+ // add to the waitgroup immediately to avoid any races waiting for it before
292
+ // the workers start.
293
+ s .workerWG .Add (numTunnelerWorkers )
277
294
go func () {
278
295
<- startWorkers
279
296
for i := 0 ; i < numTunnelerWorkers ; i ++ {
@@ -297,6 +314,7 @@ func (t *tunneler) handle() {
297
314
}
298
315
299
316
func (t * tunneler ) worker () {
317
+ defer t .workerWG .Done ()
300
318
eb := backoff .NewExponentialBackOff ()
301
319
eb .MaxElapsedTime = 0 // retry indefinitely
302
320
eb .MaxInterval = dbMaxBackoff
@@ -435,6 +453,8 @@ type binder struct {
435
453
mu sync.Mutex
436
454
latest map [bKey ]binding
437
455
workQ * workQ [bKey ]
456
+
457
+ workerWG sync.WaitGroup
438
458
}
439
459
440
460
func newBinder (ctx context.Context ,
@@ -454,6 +474,9 @@ func newBinder(ctx context.Context,
454
474
workQ : newWorkQ [bKey ](ctx ),
455
475
}
456
476
go b .handleBindings ()
477
+ // add to the waitgroup immediately to avoid any races waiting for it before
478
+ // the workers start.
479
+ b .workerWG .Add (numBinderWorkers )
457
480
go func () {
458
481
<- startWorkers
459
482
for i := 0 ; i < numBinderWorkers ; i ++ {
@@ -477,6 +500,7 @@ func (b *binder) handleBindings() {
477
500
}
478
501
479
502
func (b * binder ) worker () {
503
+ defer b .workerWG .Done ()
480
504
eb := backoff .NewExponentialBackOff ()
481
505
eb .MaxElapsedTime = 0 // retry indefinitely
482
506
eb .MaxInterval = dbMaxBackoff
0 commit comments