@@ -161,11 +161,12 @@ func newPGCoordInternal(
161
161
closed : make (chan struct {}),
162
162
}
163
163
go func () {
164
- // when the main context is canceled, or the coordinator closed, the binder and tunneler
165
- // always eventually stop. Once they stop it's safe to cancel the querier context, which
164
+ // when the main context is canceled, or the coordinator closed, the binder, tunneler, and
165
+ // handshaker always eventually stop. Once they stop it's safe to cancel the querier context, which
166
166
// has the effect of deleting the coordinator from the database and ceasing heartbeats.
167
167
c .binder .workerWG .Wait ()
168
168
c .tunneler .workerWG .Wait ()
169
+ c .handshaker .workerWG .Wait ()
169
170
querierCancel ()
170
171
}()
171
172
logger .Info (ctx , "starting coordinator" )
@@ -231,6 +232,7 @@ func (c *pgCoord) Close() error {
231
232
c .logger .Info (c .ctx , "closing coordinator" )
232
233
c .cancel ()
233
234
c .closeOnce .Do (func () { close (c .closed ) })
235
+ c .querier .wait ()
234
236
return nil
235
237
}
236
238
@@ -795,6 +797,8 @@ type querier struct {
795
797
796
798
workQ * workQ [querierWorkKey ]
797
799
800
+ wg sync.WaitGroup
801
+
798
802
heartbeats * heartbeats
799
803
updates <- chan hbUpdate
800
804
@@ -831,6 +835,7 @@ func newQuerier(ctx context.Context,
831
835
}
832
836
q .subscribe ()
833
837
838
+ q .wg .Add (2 + numWorkers )
834
839
go func () {
835
840
<- firstHeartbeat
836
841
go q .handleIncoming ()
@@ -842,7 +847,13 @@ func newQuerier(ctx context.Context,
842
847
return q
843
848
}
844
849
850
+ func (q * querier ) wait () {
851
+ q .wg .Wait ()
852
+ q .heartbeats .wg .Wait ()
853
+ }
854
+
845
855
func (q * querier ) handleIncoming () {
856
+ defer q .wg .Done ()
846
857
for {
847
858
select {
848
859
case <- q .ctx .Done ():
@@ -919,6 +930,7 @@ func (q *querier) cleanupConn(c *connIO) {
919
930
}
920
931
921
932
func (q * querier ) worker () {
933
+ defer q .wg .Done ()
922
934
eb := backoff .NewExponentialBackOff ()
923
935
eb .MaxElapsedTime = 0 // retry indefinitely
924
936
eb .MaxInterval = dbMaxBackoff
@@ -1204,6 +1216,7 @@ func (q *querier) resyncPeerMappings() {
1204
1216
}
1205
1217
1206
1218
func (q * querier ) handleUpdates () {
1219
+ defer q .wg .Done ()
1207
1220
for {
1208
1221
select {
1209
1222
case <- q .ctx .Done ():
@@ -1451,6 +1464,8 @@ type heartbeats struct {
1451
1464
coordinators map [uuid.UUID ]time.Time
1452
1465
timer * time.Timer
1453
1466
1467
+ wg sync.WaitGroup
1468
+
1454
1469
// overwritten in tests, but otherwise constant
1455
1470
cleanupPeriod time.Duration
1456
1471
}
@@ -1472,6 +1487,7 @@ func newHeartbeats(
1472
1487
coordinators : make (map [uuid.UUID ]time.Time ),
1473
1488
cleanupPeriod : cleanupPeriod ,
1474
1489
}
1490
+ h .wg .Add (3 )
1475
1491
go h .subscribe ()
1476
1492
go h .sendBeats ()
1477
1493
go h .cleanupLoop ()
@@ -1502,6 +1518,7 @@ func (h *heartbeats) filter(mappings []mapping) []mapping {
1502
1518
}
1503
1519
1504
1520
func (h * heartbeats ) subscribe () {
1521
+ defer h .wg .Done ()
1505
1522
eb := backoff .NewExponentialBackOff ()
1506
1523
eb .MaxElapsedTime = 0 // retry indefinitely
1507
1524
eb .MaxInterval = dbMaxBackoff
@@ -1611,6 +1628,7 @@ func (h *heartbeats) checkExpiry() {
1611
1628
}
1612
1629
1613
1630
func (h * heartbeats ) sendBeats () {
1631
+ defer h .wg .Done ()
1614
1632
// send an initial heartbeat so that other coordinators can start using our bindings right away.
1615
1633
h .sendBeat ()
1616
1634
close (h .firstHeartbeat ) // signal binder it can start writing
@@ -1662,6 +1680,7 @@ func (h *heartbeats) sendDelete() {
1662
1680
}
1663
1681
1664
1682
func (h * heartbeats ) cleanupLoop () {
1683
+ defer h .wg .Done ()
1665
1684
h .cleanup ()
1666
1685
tkr := time .NewTicker (h .cleanupPeriod )
1667
1686
defer tkr .Stop ()
0 commit comments