@@ -911,122 +911,141 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
911
911
return nil
912
912
}
913
913
914
- func (a * agent ) trackScriptLogs (ctx context.Context , reader io.Reader ) (chan struct {}, error ) {
915
- // Initialize variables for log management
916
- queuedLogs := make ([]agentsdk.StartupLog , 0 )
917
- var flushLogsTimer * time.Timer
918
- var logMutex sync.Mutex
919
- logsFlushed := sync .NewCond (& sync.Mutex {})
920
- var logsSending bool
921
- defer func () {
922
- logMutex .Lock ()
923
- if flushLogsTimer != nil {
924
- flushLogsTimer .Stop ()
925
- }
926
- logMutex .Unlock ()
927
- }()
914
+ func (a * agent ) trackScriptLogs (ctx context.Context , reader io.ReadCloser ) (chan struct {}, error ) {
915
+ // Synchronous sender, there can only be one outbound send at a time.
916
+ //
917
+ // It's important that we either flush or drop all logs before returning
918
+ // because the startup state is reported after flush.
919
+ sendDone := make (chan struct {})
920
+ send := make (chan []agentsdk.StartupLog , 1 )
921
+ go func () {
922
+ // Set flushTimeout and backlogLimit so that logs are uploaded
923
+ // once every 250ms or when 100 logs have been added to the
924
+ // backlog, whichever comes first.
925
+ flushTimeout := 250 * time .Millisecond
926
+ backlogLimit := 100
928
927
929
- // sendLogs function uploads the queued logs to the server
930
- sendLogs := func () {
931
- // Lock logMutex and check if logs are already being sent
932
- logMutex .Lock ()
933
- if logsSending {
934
- logMutex .Unlock ()
935
- return
936
- }
937
- if flushLogsTimer != nil {
938
- flushLogsTimer .Stop ()
939
- }
940
- if len (queuedLogs ) == 0 {
941
- logMutex .Unlock ()
942
- return
943
- }
944
- // Move the current queued logs to logsToSend and clear the queue
945
- logsToSend := queuedLogs
946
- logsSending = true
947
- queuedLogs = make ([]agentsdk.StartupLog , 0 )
948
- logMutex .Unlock ()
949
-
950
- // Retry uploading logs until successful or a specific error occurs
951
- for r := retry .New (time .Second , 5 * time .Second ); r .Wait (ctx ); {
952
- err := a .client .PatchStartupLogs (ctx , agentsdk.PatchStartupLogs {
953
- Logs : logsToSend ,
954
- })
955
- if err == nil {
956
- break
928
+ flush := time .NewTicker (flushTimeout )
929
+
930
+ var backlog []agentsdk.StartupLog
931
+ defer func () {
932
+ flush .Stop ()
933
+ _ = reader .Close () // Ensure read routine is closed.
934
+ if len (backlog ) > 0 {
935
+ a .logger .Debug (ctx , "track script logs sender exiting, discarding logs" , slog .F ("discarded_logs_count" , len (backlog )))
957
936
}
958
- var sdkErr * codersdk.Error
959
- if errors .As (err , & sdkErr ) {
960
- if sdkErr .StatusCode () == http .StatusRequestEntityTooLarge {
961
- a .logger .Warn (ctx , "startup logs too large, dropping logs" )
962
- break
937
+ a .logger .Debug (ctx , "track script logs sender exited" )
938
+ close (sendDone )
939
+ }()
940
+
941
+ done := false
942
+ for {
943
+ flushed := false
944
+ select {
945
+ case <- ctx .Done ():
946
+ return
947
+ case <- a .closed :
948
+ return
949
+ // Close (!ok) can be triggered by the reader closing due to
950
+ // EOF or due to agent closing, when this happens we attempt
951
+ // a final flush. If the context is canceled this will be a
952
+ // no-op.
953
+ case logs , ok := <- send :
954
+ done = ! ok
955
+ if ok {
956
+ backlog = append (backlog , logs ... )
957
+ flushed = len (backlog ) >= backlogLimit
963
958
}
959
+ case <- flush .C :
960
+ flushed = true
961
+ }
962
+
963
+ if (done || flushed ) && len (backlog ) > 0 {
964
+ flush .Stop () // Lower the chance of a double flush.
965
+
966
+ // Retry uploading logs until successful or a specific
967
+ // error occurs.
968
+ for r := retry .New (time .Second , 5 * time .Second ); r .Wait (ctx ); {
969
+ err := a .client .PatchStartupLogs (ctx , agentsdk.PatchStartupLogs {
970
+ Logs : backlog ,
971
+ })
972
+ if err == nil {
973
+ break
974
+ }
975
+
976
+ if errors .Is (err , context .Canceled ) {
977
+ return
978
+ }
979
+ var sdkErr * codersdk.Error
980
+ if errors .As (err , & sdkErr ) {
981
+ if sdkErr .StatusCode () == http .StatusRequestEntityTooLarge {
982
+ a .logger .Warn (ctx , "startup logs too large, dropping logs" )
983
+ break
984
+ }
985
+ }
986
+ a .logger .Error (ctx , "upload startup logs failed" , slog .Error (err ), slog .F ("to_send" , backlog ))
987
+ }
988
+ if ctx .Err () != nil {
989
+ return
990
+ }
991
+ backlog = nil
992
+
993
+ // Anchor flush to the last log upload.
994
+ flush .Reset (flushTimeout )
995
+ }
996
+ if done {
997
+ return
964
998
}
965
- a .logger .Error (ctx , "upload startup logs" , slog .Error (err ), slog .F ("to_send" , logsToSend ))
966
- }
967
- // Reset logsSending flag
968
- logMutex .Lock ()
969
- logsSending = false
970
- flushLogsTimer .Reset (100 * time .Millisecond )
971
- logMutex .Unlock ()
972
- logsFlushed .Broadcast ()
973
- }
974
- // queueLog function appends a log to the queue and triggers sendLogs if necessary
975
- queueLog := func (log agentsdk.StartupLog ) {
976
- logMutex .Lock ()
977
- defer logMutex .Unlock ()
978
-
979
- // Append log to the queue
980
- queuedLogs = append (queuedLogs , log )
981
-
982
- // If there are more than 100 logs, send them immediately
983
- if len (queuedLogs ) > 100 {
984
- // Don't early return after this, because we still want
985
- // to reset the timer just in case logs come in while
986
- // we're sending.
987
- go sendLogs ()
988
- }
989
- // Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
990
- if flushLogsTimer != nil {
991
- flushLogsTimer .Reset (100 * time .Millisecond )
992
- return
993
999
}
994
- flushLogsTimer = time .AfterFunc (100 * time .Millisecond , sendLogs )
995
- }
1000
+ }()
996
1001
997
- // It's important that we either flush or drop all logs before returning
998
- // because the startup state is reported after flush .
1002
+ // Forward read lines to the sender or queue them for when the
1003
+ // sender is ready to process them .
999
1004
//
1000
- // It'd be weird for the startup state to be ready, but logs are still
1001
- // coming in .
1002
- logsFinished := make (chan struct {})
1005
+ // We only need to track this goroutine since it will ensure that
1006
+ // the sender has closed before returning .
1007
+ logsDone := make (chan struct {})
1003
1008
err := a .trackConnGoroutine (func () {
1004
- scanner := bufio .NewScanner (reader )
1005
- for scanner .Scan () {
1006
- queueLog (agentsdk.StartupLog {
1009
+ defer func () {
1010
+ close (send )
1011
+ <- sendDone
1012
+ a .logger .Debug (ctx , "track script logs reader exited" )
1013
+ close (logsDone )
1014
+ }()
1015
+
1016
+ var queue []agentsdk.StartupLog
1017
+
1018
+ s := bufio .NewScanner (reader )
1019
+ for s .Scan () {
1020
+ select {
1021
+ case <- ctx .Done ():
1022
+ return
1023
+ case <- a .closed :
1024
+ return
1025
+ case queue = <- send :
1026
+ // Not captured by sender yet, re-use.
1027
+ default :
1028
+ }
1029
+
1030
+ queue = append (queue , agentsdk.StartupLog {
1007
1031
CreatedAt : database .Now (),
1008
- Output : scanner .Text (),
1032
+ Output : s .Text (),
1009
1033
})
1034
+ send <- queue
1035
+ queue = nil
1010
1036
}
1011
- if err := scanner .Err (); err != nil {
1012
- a .logger .Error (ctx , "scan startup logs" , slog .Error (err ))
1013
- }
1014
- defer close (logsFinished )
1015
- logsFlushed .L .Lock ()
1016
- for {
1017
- logMutex .Lock ()
1018
- if len (queuedLogs ) == 0 {
1019
- logMutex .Unlock ()
1020
- break
1021
- }
1022
- logMutex .Unlock ()
1023
- logsFlushed .Wait ()
1037
+ if err := s .Err (); err != nil {
1038
+ a .logger .Warn (ctx , "scan startup logs ended unexpectedly" , slog .Error (err ))
1024
1039
}
1025
1040
})
1026
1041
if err != nil {
1027
- return nil , xerrors .Errorf ("track conn goroutine: %w" , err )
1042
+ close (send )
1043
+ <- sendDone
1044
+ close (logsDone )
1045
+ return logsDone , err
1028
1046
}
1029
- return logsFinished , nil
1047
+
1048
+ return logsDone , nil
1030
1049
}
1031
1050
1032
1051
func (a * agent ) handleReconnectingPTY (ctx context.Context , logger slog.Logger , msg codersdk.WorkspaceAgentReconnectingPTYInit , conn net.Conn ) (retErr error ) {
0 commit comments