diff --git a/coderd/database/pubsub.go b/coderd/database/pubsub.go index 3a2be7d79cd58..1995cd7203510 100644 --- a/coderd/database/pubsub.go +++ b/coderd/database/pubsub.go @@ -185,6 +185,13 @@ func (p *pgPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (can func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) { p.mut.Lock() defer p.mut.Unlock() + defer func() { + if err != nil { + // if we hit an error, we need to close the queue so we don't + // leak its goroutine. + newQ.close() + } + }() err = p.pgListener.Listen(event) if errors.Is(err, pq.ErrChannelAlreadyOpen) { diff --git a/enterprise/replicasync/replicasync_test.go b/enterprise/replicasync/replicasync_test.go index 49890524c9d78..eb3a7175bee6e 100644 --- a/enterprise/replicasync/replicasync_test.go +++ b/enterprise/replicasync/replicasync_test.go @@ -38,7 +38,9 @@ func TestReplica(t *testing.T) { }) require.NoError(t, err) defer cancel() - server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, nil) + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, nil) require.NoError(t, err) <-closeChan _ = server.Close() @@ -62,7 +64,9 @@ func TestReplica(t *testing.T) { RelayAddress: srv.URL, }) require.NoError(t, err) - server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, &replicasync.Options{ + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, &replicasync.Options{ RelayAddress: "http://169.254.169.254", }) require.NoError(t, err) @@ -102,7 +106,9 @@ func TestReplica(t *testing.T) { RelayAddress: srv.URL, }) require.NoError(t, err) - server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, &replicasync.Options{ + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, &replicasync.Options{ RelayAddress: "http://169.254.169.254", TLSConfig: tlsConfig, }) @@ -125,7 +131,9 @@ func TestReplica(t *testing.T) { RelayAddress: "http://127.0.0.1:1", }) require.NoError(t, err) - server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, &replicasync.Options{ + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, &replicasync.Options{ PeerTimeout: 1 * time.Millisecond, RelayAddress: "http://127.0.0.1:1", }) @@ -140,13 +148,15 @@ func TestReplica(t *testing.T) { // Refresh when a new replica appears! t.Parallel() db, pubsub := dbtestutil.NewDB(t) - server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, nil) + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, nil) require.NoError(t, err) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) defer srv.Close() - peer, err := db.InsertReplica(context.Background(), database.InsertReplicaParams{ + peer, err := db.InsertReplica(ctx, database.InsertReplicaParams{ ID: uuid.New(), RelayAddress: srv.URL, UpdatedAt: database.Now(), @@ -170,7 +180,9 @@ func TestReplica(t *testing.T) { UpdatedAt: database.Now().Add(-time.Hour), }) require.NoError(t, err) - server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, &replicasync.Options{ + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + server, err := replicasync.New(ctx, slogtest.Make(t, nil), db, pubsub, &replicasync.Options{ RelayAddress: "google.com", CleanupInterval: time.Millisecond, }) @@ -184,6 +196,8 @@ func TestReplica(t *testing.T) { // Ensures that twenty concurrent replicas can spawn and all // discover each other in parallel! t.Parallel() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() // This doesn't use the database fake because creating // this many PostgreSQL connections takes some // configuration tweaking. @@ -198,7 +212,7 @@ func TestReplica(t *testing.T) { count := 20 wg.Add(count) for i := 0; i < count; i++ { - server, err := replicasync.New(context.Background(), logger, db, pubsub, &replicasync.Options{ + server, err := replicasync.New(ctx, logger, db, pubsub, &replicasync.Options{ RelayAddress: srv.URL, }) require.NoError(t, err)