Skip to content
Prev Previous commit
Next Next commit
wait for workers to exit
  • Loading branch information
sreya committed Aug 13, 2024
commit eb52aa21423a2b75ddcfb5a1cee6f1ccb5f3c6ce
29 changes: 16 additions & 13 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,7 @@ func newPGCoordInternal(
closed: make(chan struct{}),
}
go func() {
// when the main context is canceled, or the coordinator closed, the binder, tunneler, and
// handshaker always eventually stop. When the
// binder stops it updates all the peers handled
// by this coordinator to LOST.
c.binder.workerWG.Wait()
c.binder.wait()
c.tunneler.workerWG.Wait()
c.handshaker.workerWG.Wait()
querierCancel()
Expand Down Expand Up @@ -518,14 +514,7 @@ func (b *binder) handleBindings() {
for {
select {
case <-b.ctx.Done():
b.logger.Debug(b.ctx, "binder exiting, updating peers to lost", slog.Error(b.ctx.Err()))
err := b.store.UpdateTailnetPeerStatusByCoordinator(context.Background(), database.UpdateTailnetPeerStatusByCoordinatorParams{
CoordinatorID: b.coordinatorID,
Status: database.TailnetStatusLost,
})
if err != nil {
b.logger.Error(b.ctx, "update peer status to lost", slog.Error(err))
}
b.logger.Debug(b.ctx, "binder exiting")
return
case bnd := <-b.bindings:
b.storeBinding(bnd)
Expand Down Expand Up @@ -637,6 +626,20 @@ func (b *binder) retrieveBinding(bk bKey) binding {
return bnd
}

func (b *binder) wait() {
b.workerWG.Wait()

b.logger.Debug(b.ctx, "binder exiting, updating peers to lost", slog.Error(b.ctx.Err()))

err := b.store.UpdateTailnetPeerStatusByCoordinator(context.Background(), database.UpdateTailnetPeerStatusByCoordinatorParams{
CoordinatorID: b.coordinatorID,
Status: database.TailnetStatusLost,
})
if err != nil {
b.logger.Error(b.ctx, "update peer status to lost", slog.Error(err))
}
}

// mapper tracks data sent to a peer, and sends updates based on changes read from the database.
type mapper struct {
ctx context.Context
Expand Down
Loading