@@ -21,7 +21,6 @@ import (
21
21
"sort"
22
22
"strings"
23
23
"sync"
24
- "sync/atomic"
25
24
"testing"
26
25
"time"
27
26
@@ -278,86 +277,122 @@ func TestBackpressure(t *testing.T) {
278
277
t .Skip ("This test requires postgres; it relies on business-logic only implemented in the database" )
279
278
}
280
279
281
- // nolint:gocritic // Unit test.
282
- ctx := dbauthz .AsSystemRestricted (testutil .Context (t , testutil .WaitSuperLong ))
283
280
store , _ := dbtestutil .NewDB (t )
284
281
logger := slogtest .Make (t , nil ).Leveled (slog .LevelDebug )
282
+ // nolint:gocritic // Unit test.
283
+ ctx := dbauthz .AsSystemRestricted (testutil .Context (t , testutil .WaitShort ))
285
284
286
- // Mock server to simulate webhook endpoint.
287
- var received atomic.Int32
288
- server := httptest .NewServer (http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
289
- var payload dispatch.WebhookPayload
290
- err := json .NewDecoder (r .Body ).Decode (& payload )
291
- assert .NoError (t , err )
292
-
293
- w .WriteHeader (http .StatusOK )
294
- _ , err = w .Write ([]byte ("noted." ))
295
- assert .NoError (t , err )
296
-
297
- received .Add (1 )
298
- }))
299
- defer server .Close ()
300
-
301
- endpoint , err := url .Parse (server .URL )
302
- require .NoError (t , err )
303
-
304
- method := database .NotificationMethodWebhook
285
+ const method = database .NotificationMethodWebhook
305
286
cfg := defaultNotificationsConfig (method )
306
- cfg .Webhook = codersdk.NotificationsWebhookConfig {
307
- Endpoint : * serpent .URLOf (endpoint ),
308
- }
309
287
310
288
// Tune the queue to fetch often.
311
289
const fetchInterval = time .Millisecond * 200
312
290
const batchSize = 10
313
291
cfg .FetchInterval = serpent .Duration (fetchInterval )
314
292
cfg .LeaseCount = serpent .Int64 (batchSize )
293
+ // never time out for this test
294
+ cfg .LeasePeriod = serpent .Duration (time .Hour )
295
+ cfg .DispatchTimeout = serpent .Duration (time .Hour - time .Millisecond )
315
296
316
297
// Shrink buffers down and increase flush interval to provoke backpressure.
317
298
// Flush buffers every 5 fetch intervals.
318
299
const syncInterval = time .Second
319
300
cfg .StoreSyncInterval = serpent .Duration (syncInterval )
320
301
cfg .StoreSyncBufferSize = serpent .Int64 (2 )
321
302
322
- handler := newDispatchInterceptor ( dispatch . NewWebhookHandler ( cfg . Webhook , logger . Named ( "webhook" )))
303
+ handler := & chanHandler { calls : make ( chan dispatchCall )}
323
304
324
305
// Intercept calls to submit the buffered updates to the store.
325
306
storeInterceptor := & syncInterceptor {Store : store }
326
307
308
+ mClock := quartz .NewMock (t )
309
+ syncTrap := mClock .Trap ().NewTicker ("Manager" , "storeSync" )
310
+ defer syncTrap .Close ()
311
+ fetchTrap := mClock .Trap ().TickerFunc ("notifier" , "fetchInterval" )
312
+ defer fetchTrap .Close ()
313
+
327
314
// GIVEN: a notification manager whose updates will be intercepted
328
- mgr , err := notifications .NewManager (cfg , storeInterceptor , defaultHelpers (), createMetrics (), logger .Named ("manager" ))
315
+ mgr , err := notifications .NewManager (cfg , storeInterceptor , defaultHelpers (), createMetrics (),
316
+ logger .Named ("manager" ), notifications .WithTestClock (mClock ))
329
317
require .NoError (t , err )
330
318
mgr .WithHandlers (map [database.NotificationMethod ]notifications.Handler {method : handler })
331
- enq , err := notifications .NewStoreEnqueuer (cfg , store , defaultHelpers (), logger .Named ("enqueuer" ), quartz . NewReal () )
319
+ enq , err := notifications .NewStoreEnqueuer (cfg , store , defaultHelpers (), logger .Named ("enqueuer" ), mClock )
332
320
require .NoError (t , err )
333
321
334
322
user := createSampleUser (t , store )
335
323
336
324
// WHEN: a set of notifications are enqueued, which causes backpressure due to the batchSize which can be processed per fetch
337
325
const totalMessages = 30
338
- for i := 0 ; i < totalMessages ; i ++ {
326
+ for i := range totalMessages {
339
327
_ , err = enq .Enqueue (ctx , user .ID , notifications .TemplateWorkspaceDeleted , map [string ]string {"i" : fmt .Sprintf ("%d" , i )}, "test" )
340
328
require .NoError (t , err )
341
329
}
342
330
343
331
// Start the notifier.
344
332
mgr .Run (ctx )
333
+ syncTrap .MustWait (ctx ).Release ()
334
+ fetchTrap .MustWait (ctx ).Release ()
345
335
346
336
// THEN:
347
337
348
- // Wait for 3 fetch intervals, then check progress.
349
- time .Sleep (fetchInterval * 3 )
338
+ // Trigger a fetch
339
+ w := mClock .Advance (fetchInterval )
340
+
341
+ // one batch of dispatches is sent
342
+ for range batchSize {
343
+ call := testutil .RequireRecvCtx (ctx , t , handler .calls )
344
+ testutil .RequireSendCtx (ctx , t , call .result , dispatchResult {
345
+ retryable : false ,
346
+ err : nil ,
347
+ })
348
+ }
349
+
350
+ // The first fetch will not complete, because of the short sync buffer of 2. This is the
351
+ // backpressure.
352
+ select {
353
+ case <- time .After (testutil .IntervalMedium ):
354
+ // success
355
+ case <- w .Done ():
356
+ t .Fatal ("fetch completed despite backpressure" )
357
+ }
350
358
351
- // We expect the notifier will have dispatched ONLY the initial batch of messages.
352
- // In other words, the notifier should have dispatched 3 batches by now, but because the buffered updates have not
353
- // been processed: there is backpressure.
354
- require .EqualValues (t , batchSize , handler .sent .Load ()+ handler .err .Load ())
355
359
// We expect that the store will have received NO updates.
356
360
require .EqualValues (t , 0 , storeInterceptor .sent .Load ()+ storeInterceptor .failed .Load ())
357
361
358
362
// However, when we Stop() the manager the backpressure will be relieved and the buffered updates will ALL be flushed,
359
363
// since all the goroutines that were blocked (on writing updates to the buffer) will be unblocked and will complete.
360
- require .NoError (t , mgr .Stop (ctx ))
364
+ // Stop() waits for the in-progress flush to complete, meaning we have to advance the time such that sync triggers
365
+ // a total of (batchSize/StoreSyncBufferSize)-1 times. The -1 is because once we run the penultimate sync, it
366
+ // clears space in the buffer for the last dispatches of the batch, which allows graceful shutdown to continue
367
+ // immediately, without waiting for the last trigger.
368
+ stopErr := make (chan error , 1 )
369
+ go func () {
370
+ stopErr <- mgr .Stop (ctx )
371
+ }()
372
+ elapsed := fetchInterval
373
+ syncEnd := time .Duration (batchSize / cfg .StoreSyncBufferSize .Value ()- 1 ) * cfg .StoreSyncInterval .Value ()
374
+ t .Logf ("will advance until %dms have elapsed" , syncEnd .Milliseconds ())
375
+ for elapsed < syncEnd {
376
+ d , wt := mClock .AdvanceNext ()
377
+ elapsed += d
378
+ t .Logf ("elapsed: %dms" , elapsed .Milliseconds ())
379
+ // fetches complete immediately, since TickerFunc only allows one call to the callback in flight at at time.
380
+ wt .MustWait (ctx )
381
+ if elapsed % cfg .StoreSyncInterval .Value () == 0 {
382
+ numSent := cfg .StoreSyncBufferSize .Value () * int64 (elapsed / cfg .StoreSyncInterval .Value ())
383
+ t .Logf ("waiting for %d messages" , numSent )
384
+ require .Eventually (t , func () bool {
385
+ // need greater or equal because the last set of messages can come immediately due
386
+ // to graceful shut down
387
+ return int64 (storeInterceptor .sent .Load ()) >= numSent
388
+ }, testutil .WaitShort , testutil .IntervalFast )
389
+ }
390
+ }
391
+ t .Logf ("done advancing" )
392
+ // The batch completes
393
+ w .MustWait (ctx )
394
+
395
+ require .NoError (t , testutil .RequireRecvCtx (ctx , t , stopErr ))
361
396
require .EqualValues (t , batchSize , storeInterceptor .sent .Load ()+ storeInterceptor .failed .Load ())
362
397
}
363
398
0 commit comments