@@ -38,7 +38,7 @@ type API struct {
38
38
watcherDone chan struct {}
39
39
updaterDone chan struct {}
40
40
initialUpdateDone chan struct {} // Closed after first update in updaterLoop.
41
- refreshTrigger chan chan error // Channel to trigger manual refresh.
41
+ updateTrigger chan chan error // Channel to trigger manual refresh.
42
42
updateInterval time.Duration // Interval for periodic container updates.
43
43
logger slog.Logger
44
44
watcher watcher.Watcher
@@ -164,7 +164,7 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
164
164
watcherDone : make (chan struct {}),
165
165
updaterDone : make (chan struct {}),
166
166
initialUpdateDone : make (chan struct {}),
167
- refreshTrigger : make (chan chan error ),
167
+ updateTrigger : make (chan chan error ),
168
168
updateInterval : defaultUpdateInterval ,
169
169
logger : logger ,
170
170
clock : quartz .NewReal (),
@@ -247,27 +247,12 @@ func (api *API) updaterLoop() {
247
247
defer api .logger .Debug (api .ctx , "updater loop stopped" )
248
248
api .logger .Debug (api .ctx , "updater loop started" )
249
249
250
- // Ensure that only once instance of the updateContainers is running
251
- // at a time. This is a workaround since quartz.Ticker does not
252
- // allow us to know if the routine has completed.
253
- sema := make (chan struct {}, 1 )
254
- sema <- struct {}{}
255
-
256
- // Ensure only one updateContainers is running at a time, others are
257
- // queued.
258
- doUpdate := func () error {
259
- select {
260
- case <- api .ctx .Done ():
261
- return api .ctx .Err ()
262
- case <- sema :
263
- }
264
- defer func () { sema <- struct {}{} }()
265
-
266
- return api .updateContainers (api .ctx )
267
- }
268
-
250
+ // Perform an initial update to populate the container list, this
251
+ // gives us a guarantee that the API has loaded the initial state
252
+ // before returning any responses. This is useful for both tests
253
+ // and anyone looking to interact with the API.
269
254
api .logger .Debug (api .ctx , "performing initial containers update" )
270
- if err := doUpdate ( ); err != nil {
255
+ if err := api . updateContainers ( api . ctx ); err != nil {
271
256
api .logger .Error (api .ctx , "initial containers update failed" , slog .Error (err ))
272
257
} else {
273
258
api .logger .Debug (api .ctx , "initial containers update complete" )
@@ -276,34 +261,39 @@ func (api *API) updaterLoop() {
276
261
// Other services can wait on this if they need the first data to be available.
277
262
close (api .initialUpdateDone )
278
263
279
- // Use a ticker func to ensure that doUpdate has run to completion
280
- // when advancing time.
281
- waiter := api .clock .TickerFunc (api .ctx , api .updateInterval , func () error {
282
- err := doUpdate ()
283
- if err != nil {
284
- api .logger .Error (api .ctx , "periodic containers update failed" , slog .Error (err ))
264
+ // We utilize a TickerFunc here instead of a regular Ticker so that
265
+ // we can guarantee execution of the updateContainers method after
266
+ // advancing the clock.
267
+ ticker := api .clock .TickerFunc (api .ctx , api .updateInterval , func () error {
268
+ done := make (chan error , 1 )
269
+ defer close (done )
270
+
271
+ select {
272
+ case <- api .ctx .Done ():
273
+ return api .ctx .Err ()
274
+ case api .updateTrigger <- done :
275
+ err := <- done
276
+ if err != nil {
277
+ api .logger .Error (api .ctx , "updater loop ticker failed" , slog .Error (err ))
278
+ }
279
+ default :
280
+ api .logger .Debug (api .ctx , "updater loop ticker skipped, update in progress" )
285
281
}
286
- return nil // Always nil, keep going.
287
- })
282
+
283
+ return nil // Always nil to keep the ticker going.
284
+ }, "updaterLoop" )
288
285
defer func () {
289
- if err := waiter .Wait (); err != nil {
286
+ if err := ticker .Wait ("updaterLoop" ); err != nil && ! errors . Is ( err , context . Canceled ) {
290
287
api .logger .Error (api .ctx , "updater loop ticker failed" , slog .Error (err ))
291
288
}
292
289
}()
293
290
294
291
for {
295
292
select {
296
293
case <- api .ctx .Done ():
297
- api .logger .Debug (api .ctx , "updater loop context canceled" )
298
294
return
299
- case ch := <- api .refreshTrigger :
300
- api .logger .Debug (api .ctx , "manual containers update triggered" )
301
- err := doUpdate ()
302
- if err != nil {
303
- api .logger .Error (api .ctx , "manual containers update failed" , slog .Error (err ))
304
- }
305
- ch <- err
306
- close (ch )
295
+ case done := <- api .updateTrigger :
296
+ done <- api .updateContainers (api .ctx )
307
297
}
308
298
}
309
299
}
@@ -506,17 +496,23 @@ func (api *API) processUpdatedContainersLocked(ctx context.Context, updated code
506
496
507
497
// refreshContainers triggers an immediate update of the container list
508
498
// and waits for it to complete.
509
- func (api * API ) refreshContainers (ctx context.Context ) error {
499
+ func (api * API ) refreshContainers (ctx context.Context ) (err error ) {
500
+ defer func () {
501
+ if err != nil {
502
+ err = xerrors .Errorf ("refresh containers failed: %w" , err )
503
+ }
504
+ }()
505
+
510
506
done := make (chan error , 1 )
511
507
select {
512
508
case <- api .ctx .Done ():
513
- return xerrors .Errorf ("API closed, cannot send refresh trigger : %w" , api .ctx .Err ())
509
+ return xerrors .Errorf ("API closed: %w" , api .ctx .Err ())
514
510
case <- ctx .Done ():
515
511
return ctx .Err ()
516
- case api .refreshTrigger <- done :
512
+ case api .updateTrigger <- done :
517
513
select {
518
514
case <- api .ctx .Done ():
519
- return xerrors .Errorf ("API closed, cannot wait for refresh : %w" , api .ctx .Err ())
515
+ return xerrors .Errorf ("API closed: %w" , api .ctx .Err ())
520
516
case <- ctx .Done ():
521
517
return ctx .Err ()
522
518
case err := <- done :
0 commit comments