Skip to content

Commit ba2f90a

Browse files
committed
fix: move pubsub publishing out of database transactions to avoid conn starvation
Signed-off-by: Danny Kopping <dannykopping@gmail.com>
1 parent b7e08ba commit ba2f90a

File tree

1 file changed

+42
-15
lines changed

1 file changed

+42
-15
lines changed

enterprise/coderd/prebuilds/reconcile.go

+42-15
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@ type StoreReconciler struct {
4040
registerer prometheus.Registerer
4141
metrics *MetricsCollector
4242

43-
cancelFn context.CancelCauseFunc
44-
running atomic.Bool
45-
stopped atomic.Bool
46-
done chan struct{}
43+
cancelFn context.CancelCauseFunc
44+
running atomic.Bool
45+
stopped atomic.Bool
46+
done chan struct{}
47+
provisionNotifyCh chan *database.ProvisionerJob
4748
}
4849

4950
var _ prebuilds.ReconciliationOrchestrator = &StoreReconciler{}
@@ -56,13 +57,14 @@ func NewStoreReconciler(store database.Store,
5657
registerer prometheus.Registerer,
5758
) *StoreReconciler {
5859
reconciler := &StoreReconciler{
59-
store: store,
60-
pubsub: ps,
61-
logger: logger,
62-
cfg: cfg,
63-
clock: clock,
64-
registerer: registerer,
65-
done: make(chan struct{}, 1),
60+
store: store,
61+
pubsub: ps,
62+
logger: logger,
63+
cfg: cfg,
64+
clock: clock,
65+
registerer: registerer,
66+
done: make(chan struct{}, 1),
67+
provisionNotifyCh: make(chan *database.ProvisionerJob, 100),
6668
}
6769

6870
reconciler.metrics = NewMetricsCollector(store, logger, reconciler)
@@ -100,6 +102,29 @@ func (c *StoreReconciler) Run(ctx context.Context) {
100102
// NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above.
101103
c.running.Store(true)
102104

105+
// Publish provisioning jobs outside of database transactions.
106+
// PGPubsub tries to acquire a new connection on Publish. A connection is held while a database transaction is active,
107+
// so we can exhaust available connections.
108+
go func() {
109+
for {
110+
select {
111+
case <-c.done:
112+
return
113+
case <-ctx.Done():
114+
return
115+
case job := <-c.provisionNotifyCh:
116+
if job == nil {
117+
continue
118+
}
119+
120+
err := provisionerjobs.PostJob(c.pubsub, *job)
121+
if err != nil {
122+
c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err))
123+
}
124+
}
125+
}
126+
}()
127+
103128
for {
104129
select {
105130
// TODO: implement pubsub listener to allow reconciling a specific template imperatively once it has been changed,
@@ -571,10 +596,12 @@ func (c *StoreReconciler) provision(
571596
return xerrors.Errorf("provision workspace: %w", err)
572597
}
573598

574-
err = provisionerjobs.PostJob(c.pubsub, *provisionerJob)
575-
if err != nil {
576-
// Client probably doesn't care about this error, so just log it.
577-
c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err))
599+
// Publish provisioner job event outside of transaction.
600+
select {
601+
case c.provisionNotifyCh <- provisionerJob:
602+
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
603+
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
604+
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String()))
578605
}
579606

580607
c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),

0 commit comments

Comments
 (0)