@@ -44,7 +44,7 @@ type StoreReconciler struct {
44
44
running atomic.Bool
45
45
stopped atomic.Bool
46
46
done chan struct {}
47
- provisionNotifyCh chan * database.ProvisionerJob
47
+ provisionNotifyCh chan database.ProvisionerJob
48
48
}
49
49
50
50
var _ prebuilds.ReconciliationOrchestrator = & StoreReconciler {}
@@ -64,7 +64,7 @@ func NewStoreReconciler(store database.Store,
64
64
clock : clock ,
65
65
registerer : registerer ,
66
66
done : make (chan struct {}, 1 ),
67
- provisionNotifyCh : make (chan * database.ProvisionerJob , 100 ),
67
+ provisionNotifyCh : make (chan database.ProvisionerJob , 10 ),
68
68
}
69
69
70
70
reconciler .metrics = NewMetricsCollector (store , logger , reconciler )
@@ -117,11 +117,7 @@ func (c *StoreReconciler) Run(ctx context.Context) {
117
117
case <- ctx .Done ():
118
118
return
119
119
case job := <- c .provisionNotifyCh :
120
- if job == nil {
121
- continue
122
- }
123
-
124
- err := provisionerjobs .PostJob (c .pubsub , * job )
120
+ err := provisionerjobs .PostJob (c .pubsub , job )
125
121
if err != nil {
126
122
c .logger .Error (ctx , "failed to post provisioner job to pubsub" , slog .Error (err ))
127
123
}
@@ -600,9 +596,13 @@ func (c *StoreReconciler) provision(
600
596
return xerrors .Errorf ("provision workspace: %w" , err )
601
597
}
602
598
599
+ if provisionerJob == nil {
600
+ return nil
601
+ }
602
+
603
603
// Publish provisioner job event outside of transaction.
604
604
select {
605
- case c .provisionNotifyCh <- provisionerJob :
605
+ case c .provisionNotifyCh <- * provisionerJob :
606
606
default : // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
607
607
c .logger .Warn (ctx , "provisioner job notification queue full, dropping" ,
608
608
slog .F ("job_id" , provisionerJob .ID ), slog .F ("prebuild_id" , prebuildID .String ()))
0 commit comments