@@ -221,13 +221,16 @@ func TestPendingUpdatesMetric(t *testing.T) {
221
221
222
222
// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
223
223
cfg := defaultNotificationsConfig (method )
224
- cfg .FetchInterval = serpent .Duration (time .Millisecond * 50 )
225
224
cfg .RetryInterval = serpent .Duration (time .Hour ) // Delay retries so they don't interfere.
226
225
cfg .StoreSyncInterval = serpent .Duration (time .Millisecond * 100 )
227
226
228
227
syncer := & syncInterceptor {Store : api .Database }
229
228
interceptor := newUpdateSignallingInterceptor (syncer )
230
- mgr , err := notifications .NewManager (cfg , interceptor , defaultHelpers (), metrics , api .Logger .Named ("manager" ))
229
+ mClock := quartz .NewMock (t )
230
+ trap := mClock .Trap ().NewTicker ("Manager" , "storeSync" )
231
+ defer trap .Close ()
232
+ mgr , err := notifications .NewManager (cfg , interceptor , defaultHelpers (), metrics , api .Logger .Named ("manager" ),
233
+ notifications .WithTestClock (mClock ))
231
234
require .NoError (t , err )
232
235
t .Cleanup (func () {
233
236
assert .NoError (t , mgr .Stop (ctx ))
@@ -249,6 +252,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
249
252
require .NoError (t , err )
250
253
251
254
mgr .Run (ctx )
255
+ trap .MustWait (ctx ).Release () // ensures ticker has been set
252
256
253
257
// THEN:
254
258
// Wait until the handler has dispatched the given notifications.
@@ -259,17 +263,20 @@ func TestPendingUpdatesMetric(t *testing.T) {
259
263
return len (handler .succeeded ) == 1 && len (handler .failed ) == 1
260
264
}, testutil .WaitShort , testutil .IntervalFast )
261
265
262
- // Wait until we intercept the calls to sync the pending updates to the store.
263
- success := testutil .RequireRecvCtx (testutil .Context (t , testutil .WaitShort ), t , interceptor .updateSuccess )
264
- failure := testutil .RequireRecvCtx (testutil .Context (t , testutil .WaitShort ), t , interceptor .updateFailure )
265
-
266
- // Wait for the metric to be updated with the expected count of metrics.
266
+ // Both handler calls should be pending in the metrics.
267
267
require .Eventually (t , func () bool {
268
- return promtest .ToFloat64 (metrics .PendingUpdates ) == float64 (success + failure )
268
+ return promtest .ToFloat64 (metrics .PendingUpdates ) == float64 (2 )
269
269
}, testutil .WaitShort , testutil .IntervalFast )
270
270
271
- // Unpause the interceptor so the updates can proceed.
272
- interceptor .unpause ()
271
+ // THEN:
272
+ // Trigger syncing updates
273
+ mClock .Advance (cfg .StoreSyncInterval .Value ()).MustWait (ctx )
274
+
275
+ // Wait until we intercept the calls to sync the pending updates to the store.
276
+ success := testutil .RequireRecvCtx (testutil .Context (t , testutil .WaitShort ), t , interceptor .updateSuccess )
277
+ require .EqualValues (t , 1 , success )
278
+ failure := testutil .RequireRecvCtx (testutil .Context (t , testutil .WaitShort ), t , interceptor .updateFailure )
279
+ require .EqualValues (t , 1 , failure )
273
280
274
281
// Validate that the store synced the expected number of updates.
275
282
require .Eventually (t , func () bool {
@@ -464,43 +471,25 @@ func fingerprintLabels(lbs ...string) model.Fingerprint {
464
471
// signaled by the caller so it can continue.
465
472
type updateSignallingInterceptor struct {
466
473
notifications.Store
467
-
468
- pause chan any
469
-
470
474
updateSuccess chan int
471
475
updateFailure chan int
472
476
}
473
477
474
478
func newUpdateSignallingInterceptor (interceptor notifications.Store ) * updateSignallingInterceptor {
475
479
return & updateSignallingInterceptor {
476
- Store : interceptor ,
477
-
478
- pause : make (chan any , 1 ),
479
-
480
+ Store : interceptor ,
480
481
updateSuccess : make (chan int , 1 ),
481
482
updateFailure : make (chan int , 1 ),
482
483
}
483
484
}
484
485
485
- func (u * updateSignallingInterceptor ) unpause () {
486
- close (u .pause )
487
- }
488
-
489
486
func (u * updateSignallingInterceptor ) BulkMarkNotificationMessagesSent (ctx context.Context , arg database.BulkMarkNotificationMessagesSentParams ) (int64 , error ) {
490
487
u .updateSuccess <- len (arg .IDs )
491
-
492
- // Wait until signaled so we have a chance to read the number of pending updates.
493
- <- u .pause
494
-
495
488
return u .Store .BulkMarkNotificationMessagesSent (ctx , arg )
496
489
}
497
490
498
491
func (u * updateSignallingInterceptor ) BulkMarkNotificationMessagesFailed (ctx context.Context , arg database.BulkMarkNotificationMessagesFailedParams ) (int64 , error ) {
499
492
u .updateFailure <- len (arg .IDs )
500
-
501
- // Wait until signaled so we have a chance to read the number of pending updates.
502
- <- u .pause
503
-
504
493
return u .Store .BulkMarkNotificationMessagesFailed (ctx , arg )
505
494
}
506
495
0 commit comments