@@ -9,10 +9,10 @@ import (
9
9
"net/http"
10
10
"sort"
11
11
"strconv"
12
- "sync"
13
12
"time"
14
13
15
14
"github.com/google/uuid"
15
+ "go.uber.org/atomic"
16
16
"nhooyr.io/websocket"
17
17
18
18
"cdr.dev/slog"
@@ -374,26 +374,38 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
374
374
logger := api .Logger .With (slog .F ("job_id" , jobID ))
375
375
376
376
var (
377
- closed = make (chan struct {} )
378
- bufferedLogs = make ( chan * database. ProvisionerJobLog , 128 )
379
- logMut = & sync. Mutex {}
377
+ bufferedLogs = make (chan * database. ProvisionerJobLog , 128 )
378
+ endOfLogs atomic. Bool
379
+ lastSentLogID atomic. Int64
380
380
)
381
+
382
+ sendLog := func (log * database.ProvisionerJobLog ) {
383
+ select {
384
+ case bufferedLogs <- log :
385
+ logger .Debug (context .Background (), "subscribe buffered log" , slog .F ("stage" , log .Stage ))
386
+ lastSentLogID .Store (log .ID )
387
+ default :
388
+ // If this overflows users could miss logs streaming. This can happen
389
+ // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
390
+ // so just drop them.
391
+ logger .Warn (context .Background (), "provisioner job log overflowing channel" )
392
+ }
393
+ }
394
+
381
395
closeSubscribe , err := api .Pubsub .Subscribe (
382
396
provisionerJobLogsChannel (jobID ),
383
397
func (ctx context.Context , message []byte ) {
384
- select {
385
- case <- closed :
398
+ if endOfLogs .Load () {
386
399
return
387
- default :
388
400
}
389
-
390
401
jlMsg := provisionerJobLogsMessage {}
391
402
err := json .Unmarshal (message , & jlMsg )
392
403
if err != nil {
393
404
logger .Warn (ctx , "invalid provisioner job log on channel" , slog .Error (err ))
394
405
return
395
406
}
396
407
408
+ // CreatedAfter is sent when logs are streaming!
397
409
if jlMsg .CreatedAfter != 0 {
398
410
logs , err := api .Database .GetProvisionerLogsByIDBetween (dbauthz .As (ctx , actor ), database.GetProvisionerLogsByIDBetweenParams {
399
411
JobID : jobID ,
@@ -403,54 +415,44 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
403
415
logger .Warn (ctx , "get provisioner logs" , slog .Error (err ))
404
416
return
405
417
}
406
-
407
418
for _ , log := range logs {
408
- // Sadly we have to use a mutex here because events may be
409
- // handled out of order due to golang goroutine scheduling
410
- // semantics (even though Postgres guarantees ordering of
411
- // notifications).
412
- logMut .Lock ()
413
- select {
414
- case <- closed :
415
- logMut .Unlock ()
419
+ if endOfLogs .Load () {
420
+ // An end of logs message came in while we were fetching
421
+ // logs or processing them!
416
422
return
417
- default :
418
423
}
419
424
log := log
420
- select {
421
- case bufferedLogs <- & log :
422
- logger .Debug (ctx , "subscribe buffered log" , slog .F ("stage" , log .Stage ))
423
- default :
424
- // If this overflows users could miss logs streaming. This can happen
425
- // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
426
- // so just drop them.
427
- logger .Warn (ctx , "provisioner job log overflowing channel" )
428
- }
429
- logMut .Unlock ()
425
+ sendLog (& log )
430
426
}
431
427
}
432
428
429
+ // EndOfLogs is sent when logs are done streaming.
430
+ // We don't want to end the stream until we've sent all the logs,
431
+ // so we fetch logs after the last ID we've seen and send them!
433
432
if jlMsg .EndOfLogs {
434
- // This mutex is to guard double-closes.
435
- logMut .Lock ()
436
- select {
437
- case <- closed :
438
- logMut .Unlock ()
433
+ endOfLogs .Store (true )
434
+ logs , err := api .Database .GetProvisionerLogsByIDBetween (dbauthz .As (ctx , actor ), database.GetProvisionerLogsByIDBetweenParams {
435
+ JobID : jobID ,
436
+ CreatedAfter : lastSentLogID .Load (),
437
+ })
438
+ if err != nil {
439
+ logger .Warn (ctx , "get provisioner logs" , slog .Error (err ))
439
440
return
440
- default :
441
+ }
442
+ for _ , log := range logs {
443
+ log := log
444
+ sendLog (& log )
441
445
}
442
446
logger .Debug (ctx , "got End of Logs" )
443
447
bufferedLogs <- nil
444
- logMut .Unlock ()
445
448
}
449
+
450
+ lastSentLogID .Store (jlMsg .CreatedAfter )
446
451
},
447
452
)
448
453
if err != nil {
449
454
return nil , nil , err
450
455
}
451
- return bufferedLogs , func () {
452
- closeSubscribe ()
453
- close (closed )
454
- close (bufferedLogs )
455
- }, nil
456
+ // We don't need to close the bufferedLogs channel because it will be garbage collected!
457
+ return bufferedLogs , closeSubscribe , nil
456
458
}
0 commit comments