Skip to content

Commit e225434

Browse files
committed
refactor ticker and refresh
1 parent 0122e43 commit e225434

File tree

1 file changed

+40
-44
lines changed

1 file changed

+40
-44
lines changed

agent/agentcontainers/api.go

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type API struct {
3838
watcherDone chan struct{}
3939
updaterDone chan struct{}
4040
initialUpdateDone chan struct{} // Closed after first update in updaterLoop.
41-
refreshTrigger chan chan error // Channel to trigger manual refresh.
41+
updateTrigger chan chan error // Channel to trigger manual refresh.
4242
updateInterval time.Duration // Interval for periodic container updates.
4343
logger slog.Logger
4444
watcher watcher.Watcher
@@ -164,7 +164,7 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
164164
watcherDone: make(chan struct{}),
165165
updaterDone: make(chan struct{}),
166166
initialUpdateDone: make(chan struct{}),
167-
refreshTrigger: make(chan chan error),
167+
updateTrigger: make(chan chan error),
168168
updateInterval: defaultUpdateInterval,
169169
logger: logger,
170170
clock: quartz.NewReal(),
@@ -247,27 +247,12 @@ func (api *API) updaterLoop() {
247247
defer api.logger.Debug(api.ctx, "updater loop stopped")
248248
api.logger.Debug(api.ctx, "updater loop started")
249249

250-
// Ensure that only once instance of the updateContainers is running
251-
// at a time. This is a workaround since quartz.Ticker does not
252-
// allow us to know if the routine has completed.
253-
sema := make(chan struct{}, 1)
254-
sema <- struct{}{}
255-
256-
// Ensure only one updateContainers is running at a time, others are
257-
// queued.
258-
doUpdate := func() error {
259-
select {
260-
case <-api.ctx.Done():
261-
return api.ctx.Err()
262-
case <-sema:
263-
}
264-
defer func() { sema <- struct{}{} }()
265-
266-
return api.updateContainers(api.ctx)
267-
}
268-
250+
// Perform an initial update to populate the container list, this
251+
// gives us a guarantee that the API has loaded the initial state
252+
// before returning any responses. This is useful for both tests
253+
// and anyone looking to interact with the API.
269254
api.logger.Debug(api.ctx, "performing initial containers update")
270-
if err := doUpdate(); err != nil {
255+
if err := api.updateContainers(api.ctx); err != nil {
271256
api.logger.Error(api.ctx, "initial containers update failed", slog.Error(err))
272257
} else {
273258
api.logger.Debug(api.ctx, "initial containers update complete")
@@ -276,34 +261,39 @@ func (api *API) updaterLoop() {
276261
// Other services can wait on this if they need the first data to be available.
277262
close(api.initialUpdateDone)
278263

279-
// Use a ticker func to ensure that doUpdate has run to completion
280-
// when advancing time.
281-
waiter := api.clock.TickerFunc(api.ctx, api.updateInterval, func() error {
282-
err := doUpdate()
283-
if err != nil {
284-
api.logger.Error(api.ctx, "periodic containers update failed", slog.Error(err))
264+
// We utilize a TickerFunc here instead of a regular Ticker so that
265+
// we can guarantee execution of the updateContainers method after
266+
// advancing the clock.
267+
ticker := api.clock.TickerFunc(api.ctx, api.updateInterval, func() error {
268+
done := make(chan error, 1)
269+
defer close(done)
270+
271+
select {
272+
case <-api.ctx.Done():
273+
return api.ctx.Err()
274+
case api.updateTrigger <- done:
275+
err := <-done
276+
if err != nil {
277+
api.logger.Error(api.ctx, "updater loop ticker failed", slog.Error(err))
278+
}
279+
default:
280+
api.logger.Debug(api.ctx, "updater loop ticker skipped, update in progress")
285281
}
286-
return nil // Always nil, keep going.
287-
})
282+
283+
return nil // Always nil to keep the ticker going.
284+
}, "updaterLoop")
288285
defer func() {
289-
if err := waiter.Wait(); err != nil {
286+
if err := ticker.Wait("updaterLoop"); err != nil && !errors.Is(err, context.Canceled) {
290287
api.logger.Error(api.ctx, "updater loop ticker failed", slog.Error(err))
291288
}
292289
}()
293290

294291
for {
295292
select {
296293
case <-api.ctx.Done():
297-
api.logger.Debug(api.ctx, "updater loop context canceled")
298294
return
299-
case ch := <-api.refreshTrigger:
300-
api.logger.Debug(api.ctx, "manual containers update triggered")
301-
err := doUpdate()
302-
if err != nil {
303-
api.logger.Error(api.ctx, "manual containers update failed", slog.Error(err))
304-
}
305-
ch <- err
306-
close(ch)
295+
case done := <-api.updateTrigger:
296+
done <- api.updateContainers(api.ctx)
307297
}
308298
}
309299
}
@@ -506,17 +496,23 @@ func (api *API) processUpdatedContainersLocked(ctx context.Context, updated code
506496

507497
// refreshContainers triggers an immediate update of the container list
508498
// and waits for it to complete.
509-
func (api *API) refreshContainers(ctx context.Context) error {
499+
func (api *API) refreshContainers(ctx context.Context) (err error) {
500+
defer func() {
501+
if err != nil {
502+
err = xerrors.Errorf("refresh containers failed: %w", err)
503+
}
504+
}()
505+
510506
done := make(chan error, 1)
511507
select {
512508
case <-api.ctx.Done():
513-
return xerrors.Errorf("API closed, cannot send refresh trigger: %w", api.ctx.Err())
509+
return xerrors.Errorf("API closed: %w", api.ctx.Err())
514510
case <-ctx.Done():
515511
return ctx.Err()
516-
case api.refreshTrigger <- done:
512+
case api.updateTrigger <- done:
517513
select {
518514
case <-api.ctx.Done():
519-
return xerrors.Errorf("API closed, cannot wait for refresh: %w", api.ctx.Err())
515+
return xerrors.Errorf("API closed: %w", api.ctx.Err())
520516
case <-ctx.Done():
521517
return ctx.Err()
522518
case err := <-done:

0 commit comments

Comments
 (0)