diff --git a/enterprise/tailnet/pgcoord.go b/enterprise/tailnet/pgcoord.go index c8cd4604869d6..d9cd8d37b30a6 100644 --- a/enterprise/tailnet/pgcoord.go +++ b/enterprise/tailnet/pgcoord.go @@ -129,6 +129,10 @@ func newPGCoordInternal( // signals when first heartbeat has been sent, so it's safe to start binding. fHB := make(chan struct{}) + // we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete + // the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are + // still running, they could run afoul of foreign key constraints. + querierCtx, querierCancel := context.WithCancel(dbauthz.As(context.Background(), pgCoordSubject)) c := &pgCoord{ ctx: ctx, cancel: cancel, @@ -142,9 +146,17 @@ func newPGCoordInternal( tunneler: newTunneler(ctx, logger, id, store, sCh, fHB), tunnelerCh: sCh, id: id, - querier: newQuerier(ctx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB), + querier: newQuerier(querierCtx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB), closed: make(chan struct{}), } + go func() { + // when the main context is canceled, or the coordinator closed, the binder and tunneler + // always eventually stop. Once they stop it's safe to cancel the querier context, which + // has the effect of deleting the coordinator from the database and ceasing heartbeats. + c.binder.workerWG.Wait() + c.tunneler.workerWG.Wait() + querierCancel() + }() logger.Info(ctx, "starting coordinator") return c, nil } @@ -255,6 +267,8 @@ type tunneler struct { mu sync.Mutex latest map[uuid.UUID]map[uuid.UUID]tunnel workQ *workQ[tKey] + + workerWG sync.WaitGroup } func newTunneler(ctx context.Context, @@ -274,6 +288,9 @@ func newTunneler(ctx context.Context, workQ: newWorkQ[tKey](ctx), } go s.handle() + // add to the waitgroup immediately to avoid any races waiting for it before + // the workers start. + s.workerWG.Add(numTunnelerWorkers) go func() { <-startWorkers for i := 0; i < numTunnelerWorkers; i++ { @@ -297,6 +314,7 @@ func (t *tunneler) handle() { } func (t *tunneler) worker() { + defer t.workerWG.Done() eb := backoff.NewExponentialBackOff() eb.MaxElapsedTime = 0 // retry indefinitely eb.MaxInterval = dbMaxBackoff @@ -435,6 +453,8 @@ type binder struct { mu sync.Mutex latest map[bKey]binding workQ *workQ[bKey] + + workerWG sync.WaitGroup } func newBinder(ctx context.Context, @@ -454,6 +474,9 @@ func newBinder(ctx context.Context, workQ: newWorkQ[bKey](ctx), } go b.handleBindings() + // add to the waitgroup immediately to avoid any races waiting for it before + // the workers start. + b.workerWG.Add(numBinderWorkers) go func() { <-startWorkers for i := 0; i < numBinderWorkers; i++ { @@ -477,6 +500,7 @@ func (b *binder) handleBindings() { } func (b *binder) worker() { + defer b.workerWG.Done() eb := backoff.NewExponentialBackOff() eb.MaxElapsedTime = 0 // retry indefinitely eb.MaxInterval = dbMaxBackoff