@@ -40,10 +40,11 @@ type StoreReconciler struct {
40
40
registerer prometheus.Registerer
41
41
metrics * MetricsCollector
42
42
43
- cancelFn context.CancelCauseFunc
44
- running atomic.Bool
45
- stopped atomic.Bool
46
- done chan struct {}
43
+ cancelFn context.CancelCauseFunc
44
+ running atomic.Bool
45
+ stopped atomic.Bool
46
+ done chan struct {}
47
+ provisionNotifyCh chan * database.ProvisionerJob
47
48
}
48
49
49
50
var _ prebuilds.ReconciliationOrchestrator = & StoreReconciler {}
@@ -56,13 +57,14 @@ func NewStoreReconciler(store database.Store,
56
57
registerer prometheus.Registerer ,
57
58
) * StoreReconciler {
58
59
reconciler := & StoreReconciler {
59
- store : store ,
60
- pubsub : ps ,
61
- logger : logger ,
62
- cfg : cfg ,
63
- clock : clock ,
64
- registerer : registerer ,
65
- done : make (chan struct {}, 1 ),
60
+ store : store ,
61
+ pubsub : ps ,
62
+ logger : logger ,
63
+ cfg : cfg ,
64
+ clock : clock ,
65
+ registerer : registerer ,
66
+ done : make (chan struct {}, 1 ),
67
+ provisionNotifyCh : make (chan * database.ProvisionerJob , 100 ),
66
68
}
67
69
68
70
reconciler .metrics = NewMetricsCollector (store , logger , reconciler )
@@ -100,6 +102,29 @@ func (c *StoreReconciler) Run(ctx context.Context) {
100
102
// NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above.
101
103
c .running .Store (true )
102
104
105
+ // Publish provisioning jobs outside of database transactions.
106
+ // PGPubsub tries to acquire a new connection on Publish. A connection is held while a database transaction is active,
107
+ // so we can exhaust available connections.
108
+ go func () {
109
+ for {
110
+ select {
111
+ case <- c .done :
112
+ return
113
+ case <- ctx .Done ():
114
+ return
115
+ case job := <- c .provisionNotifyCh :
116
+ if job == nil {
117
+ continue
118
+ }
119
+
120
+ err := provisionerjobs .PostJob (c .pubsub , * job )
121
+ if err != nil {
122
+ c .logger .Error (ctx , "failed to post provisioner job to pubsub" , slog .Error (err ))
123
+ }
124
+ }
125
+ }
126
+ }()
127
+
103
128
for {
104
129
select {
105
130
// TODO: implement pubsub listener to allow reconciling a specific template imperatively once it has been changed,
@@ -571,10 +596,12 @@ func (c *StoreReconciler) provision(
571
596
return xerrors .Errorf ("provision workspace: %w" , err )
572
597
}
573
598
574
- err = provisionerjobs .PostJob (c .pubsub , * provisionerJob )
575
- if err != nil {
576
- // Client probably doesn't care about this error, so just log it.
577
- c .logger .Error (ctx , "failed to post provisioner job to pubsub" , slog .Error (err ))
599
+ // Publish provisioner job event outside of transaction.
600
+ select {
601
+ case c .provisionNotifyCh <- provisionerJob :
602
+ default : // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
603
+ c .logger .Warn (ctx , "provisioner job notification queue full, dropping" ,
604
+ slog .F ("job_id" , provisionerJob .ID ), slog .F ("prebuild_id" , prebuildID .String ()))
578
605
}
579
606
580
607
c .logger .Info (ctx , "prebuild job scheduled" , slog .F ("transition" , transition ),
0 commit comments