Skip to content

fix: fix pgcoord to delete coordinator row last #12155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func newPGCoordInternal(
// signals when first heartbeat has been sent, so it's safe to start binding.
fHB := make(chan struct{})

// we need to arrange for the querier to stop _after_ the tunneler and binder, since we delete
// the coordinator when the querier stops (via the heartbeats). If the tunneler and binder are
// still running, they could run afoul of foreign key constraints.
querierCtx, querierCancel := context.WithCancel(dbauthz.As(context.Background(), pgCoordSubject))
c := &pgCoord{
ctx: ctx,
cancel: cancel,
Expand All @@ -142,9 +146,17 @@ func newPGCoordInternal(
tunneler: newTunneler(ctx, logger, id, store, sCh, fHB),
tunnelerCh: sCh,
id: id,
querier: newQuerier(ctx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB),
querier: newQuerier(querierCtx, logger, id, ps, store, id, cCh, ccCh, numQuerierWorkers, fHB),
closed: make(chan struct{}),
}
go func() {
// when the main context is canceled, or the coordinator closed, the binder and tunneler
// always eventually stop. Once they stop it's safe to cancel the querier context, which
// has the effect of deleting the coordinator from the database and ceasing heartbeats.
c.binder.workerWG.Wait()
c.tunneler.workerWG.Wait()
querierCancel()
}()
logger.Info(ctx, "starting coordinator")
return c, nil
}
Expand Down Expand Up @@ -255,6 +267,8 @@ type tunneler struct {
mu sync.Mutex
latest map[uuid.UUID]map[uuid.UUID]tunnel
workQ *workQ[tKey]

workerWG sync.WaitGroup
}

func newTunneler(ctx context.Context,
Expand All @@ -274,6 +288,9 @@ func newTunneler(ctx context.Context,
workQ: newWorkQ[tKey](ctx),
}
go s.handle()
// add to the waitgroup immediately to avoid any races waiting for it before
// the workers start.
s.workerWG.Add(numTunnelerWorkers)
go func() {
<-startWorkers
for i := 0; i < numTunnelerWorkers; i++ {
Expand All @@ -297,6 +314,7 @@ func (t *tunneler) handle() {
}

func (t *tunneler) worker() {
defer t.workerWG.Done()
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 0 // retry indefinitely
eb.MaxInterval = dbMaxBackoff
Expand Down Expand Up @@ -435,6 +453,8 @@ type binder struct {
mu sync.Mutex
latest map[bKey]binding
workQ *workQ[bKey]

workerWG sync.WaitGroup
}

func newBinder(ctx context.Context,
Expand All @@ -454,6 +474,9 @@ func newBinder(ctx context.Context,
workQ: newWorkQ[bKey](ctx),
}
go b.handleBindings()
// add to the waitgroup immediately to avoid any races waiting for it before
// the workers start.
b.workerWG.Add(numBinderWorkers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance that <-startWorkers below (i.e. fHB) doesn't get closed (e.g. some error during startup), and thus, these waitgroups never resolving?

(I didn't try to dig in as to how or where fHB is closed as it's not obvious from this PR.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets closed unconditionally after we send the first heartbeat (success or fail).

go func() {
<-startWorkers
for i := 0; i < numBinderWorkers; i++ {
Expand All @@ -477,6 +500,7 @@ func (b *binder) handleBindings() {
}

func (b *binder) worker() {
defer b.workerWG.Done()
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 0 // retry indefinitely
eb.MaxInterval = dbMaxBackoff
Expand Down