Skip to content

fix: remove TestPendingUpdatesMetric flake #13944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions coderd/notifications/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package notifications_test

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -260,7 +259,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
require.EqualValues(t, pending, success+failure)

// Unpause the interceptor so the updates can proceed.
interceptor.proceed.Broadcast()
interceptor.unpause()

// Validate that the store synced the expected number of updates.
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -376,7 +375,7 @@ func fingerprintLabels(lbs ...string) model.Fingerprint {
type updateSignallingInterceptor struct {
notifications.Store

proceed *sync.Cond
pause chan any

updateSuccess chan int
updateFailure chan int
Expand All @@ -386,33 +385,31 @@ func newUpdateSignallingInterceptor(interceptor notifications.Store) *updateSign
return &updateSignallingInterceptor{
Store: interceptor,

proceed: sync.NewCond(&sync.Mutex{}),
pause: make(chan any, 1),

updateSuccess: make(chan int, 1),
updateFailure: make(chan int, 1),
}
}

func (u *updateSignallingInterceptor) unpause() {
close(u.pause)
}

func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
u.updateSuccess <- len(arg.IDs)

u.proceed.L.Lock()
defer u.proceed.L.Unlock()

// Wait until signaled so we have a chance to read the number of pending updates.
u.proceed.Wait()
<-u.pause

return u.Store.BulkMarkNotificationMessagesSent(ctx, arg)
}

func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
u.updateFailure <- len(arg.IDs)

u.proceed.L.Lock()
defer u.proceed.L.Unlock()

// Wait until signaled so we have a chance to read the number of pending updates.
u.proceed.Wait()
<-u.pause

return u.Store.BulkMarkNotificationMessagesFailed(ctx, arg)
}
Expand Down
Loading