9
9
"net/http"
10
10
"sort"
11
11
"strconv"
12
+ "sync"
12
13
"time"
13
14
14
15
"github.com/google/uuid"
@@ -371,6 +372,7 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
371
372
var (
372
373
closed = make (chan struct {})
373
374
bufferedLogs = make (chan database.ProvisionerJobLog , 128 )
375
+ logMut = & sync.Mutex {}
374
376
)
375
377
closeSubscribe , err := api .Pubsub .Subscribe (
376
378
provisionerJobLogsChannel (jobID ),
@@ -380,12 +382,14 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
380
382
return
381
383
default :
382
384
}
385
+
383
386
jlMsg := provisionerJobLogsMessage {}
384
387
err := json .Unmarshal (message , & jlMsg )
385
388
if err != nil {
386
389
logger .Warn (ctx , "invalid provisioner job log on channel" , slog .Error (err ))
387
390
return
388
391
}
392
+
389
393
if jlMsg .CreatedAfter != 0 {
390
394
logs , err := api .Database .GetProvisionerLogsByIDBetween (ctx , database.GetProvisionerLogsByIDBetweenParams {
391
395
JobID : jobID ,
@@ -397,6 +401,18 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
397
401
}
398
402
399
403
for _ , log := range logs {
404
+ // Sadly we have to use a mutex here because events may be
405
+ // handled out of order due to golang goroutine scheduling
406
+ // semantics (even though Postgres guarantees ordering of
407
+ // notifications).
408
+ logMut .Lock ()
409
+ select {
410
+ case <- closed :
411
+ logMut .Unlock ()
412
+ return
413
+ default :
414
+ }
415
+
400
416
select {
401
417
case bufferedLogs <- log :
402
418
logger .Debug (ctx , "subscribe buffered log" , slog .F ("stage" , log .Stage ))
@@ -406,12 +422,24 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
406
422
// so just drop them.
407
423
logger .Warn (ctx , "provisioner job log overflowing channel" )
408
424
}
425
+ logMut .Unlock ()
409
426
}
410
427
}
428
+
411
429
if jlMsg .EndOfLogs {
430
+ // This mutex is to guard double-closes.
431
+ logMut .Lock ()
432
+ select {
433
+ case <- closed :
434
+ logMut .Unlock ()
435
+ return
436
+ default :
437
+ }
412
438
logger .Debug (ctx , "got End of Logs" )
439
+
413
440
close (closed )
414
441
close (bufferedLogs )
442
+ logMut .Unlock ()
415
443
}
416
444
},
417
445
)
0 commit comments