@@ -9,10 +9,10 @@ import (
9
9
"net/http"
10
10
"sort"
11
11
"strconv"
12
- "sync"
13
12
"time"
14
13
15
14
"github.com/google/uuid"
15
+ "go.uber.org/atomic"
16
16
"nhooyr.io/websocket"
17
17
18
18
"cdr.dev/slog"
@@ -377,26 +377,38 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
377
377
logger := api .Logger .With (slog .F ("job_id" , jobID ))
378
378
379
379
var (
380
- closed = make (chan struct {} )
381
- bufferedLogs = make ( chan * database. ProvisionerJobLog , 128 )
382
- logMut = & sync. Mutex {}
380
+ bufferedLogs = make (chan * database. ProvisionerJobLog , 128 )
381
+ endOfLogs atomic. Bool
382
+ lastSentLogID atomic. Int64
383
383
)
384
+
385
+ sendLog := func (log * database.ProvisionerJobLog ) {
386
+ select {
387
+ case bufferedLogs <- log :
388
+ logger .Debug (context .Background (), "subscribe buffered log" , slog .F ("stage" , log .Stage ))
389
+ lastSentLogID .Store (log .ID )
390
+ default :
391
+ // If this overflows users could miss logs streaming. This can happen
392
+ // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
393
+ // so just drop them.
394
+ logger .Warn (context .Background (), "provisioner job log overflowing channel" )
395
+ }
396
+ }
397
+
384
398
closeSubscribe , err := api .Pubsub .Subscribe (
385
399
provisionerJobLogsChannel (jobID ),
386
400
func (ctx context.Context , message []byte ) {
387
- select {
388
- case <- closed :
401
+ if endOfLogs .Load () {
389
402
return
390
- default :
391
403
}
392
-
393
404
jlMsg := provisionerJobLogsMessage {}
394
405
err := json .Unmarshal (message , & jlMsg )
395
406
if err != nil {
396
407
logger .Warn (ctx , "invalid provisioner job log on channel" , slog .Error (err ))
397
408
return
398
409
}
399
410
411
+ // CreatedAfter is sent when logs are streaming!
400
412
if jlMsg .CreatedAfter != 0 {
401
413
logs , err := api .Database .GetProvisionerLogsByIDBetween (dbauthz .As (ctx , actor ), database.GetProvisionerLogsByIDBetweenParams {
402
414
JobID : jobID ,
@@ -406,54 +418,44 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
406
418
logger .Warn (ctx , "get provisioner logs" , slog .Error (err ))
407
419
return
408
420
}
409
-
410
421
for _ , log := range logs {
411
- // Sadly we have to use a mutex here because events may be
412
- // handled out of order due to golang goroutine scheduling
413
- // semantics (even though Postgres guarantees ordering of
414
- // notifications).
415
- logMut .Lock ()
416
- select {
417
- case <- closed :
418
- logMut .Unlock ()
422
+ if endOfLogs .Load () {
423
+ // An end of logs message came in while we were fetching
424
+ // logs or processing them!
419
425
return
420
- default :
421
426
}
422
427
log := log
423
- select {
424
- case bufferedLogs <- & log :
425
- logger .Debug (ctx , "subscribe buffered log" , slog .F ("stage" , log .Stage ))
426
- default :
427
- // If this overflows users could miss logs streaming. This can happen
428
- // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
429
- // so just drop them.
430
- logger .Warn (ctx , "provisioner job log overflowing channel" )
431
- }
432
- logMut .Unlock ()
428
+ sendLog (& log )
433
429
}
434
430
}
435
431
432
+ // EndOfLogs is sent when logs are done streaming.
433
+ // We don't want to end the stream until we've sent all the logs,
434
+ // so we fetch logs after the last ID we've seen and send them!
436
435
if jlMsg .EndOfLogs {
437
- // This mutex is to guard double-closes.
438
- logMut .Lock ()
439
- select {
440
- case <- closed :
441
- logMut .Unlock ()
436
+ endOfLogs .Store (true )
437
+ logs , err := api .Database .GetProvisionerLogsByIDBetween (dbauthz .As (ctx , actor ), database.GetProvisionerLogsByIDBetweenParams {
438
+ JobID : jobID ,
439
+ CreatedAfter : lastSentLogID .Load (),
440
+ })
441
+ if err != nil {
442
+ logger .Warn (ctx , "get provisioner logs" , slog .Error (err ))
442
443
return
443
- default :
444
+ }
445
+ for _ , log := range logs {
446
+ log := log
447
+ sendLog (& log )
444
448
}
445
449
logger .Debug (ctx , "got End of Logs" )
446
450
bufferedLogs <- nil
447
- logMut .Unlock ()
448
451
}
452
+
453
+ lastSentLogID .Store (jlMsg .CreatedAfter )
449
454
},
450
455
)
451
456
if err != nil {
452
457
return nil , nil , err
453
458
}
454
- return bufferedLogs , func () {
455
- closeSubscribe ()
456
- close (closed )
457
- close (bufferedLogs )
458
- }, nil
459
+ // We don't need to close the bufferedLogs channel because it will be garbage collected!
460
+ return bufferedLogs , closeSubscribe , nil
459
461
}
0 commit comments