@@ -912,121 +912,124 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
912
912
}
913
913
914
914
func (a * agent ) trackScriptLogs (ctx context.Context , reader io.Reader ) (chan struct {}, error ) {
915
- // Initialize variables for log management
916
- queuedLogs := make ([]agentsdk.StartupLog , 0 )
917
- var flushLogsTimer * time.Timer
918
- var logMutex sync.Mutex
919
- logsFlushed := sync .NewCond (& sync.Mutex {})
920
- var logsSending bool
921
- defer func () {
922
- logMutex .Lock ()
923
- if flushLogsTimer != nil {
924
- flushLogsTimer .Stop ()
925
- }
926
- logMutex .Unlock ()
927
- }()
915
+ // Synchronous sender, there can only be one outbound send at a time.
916
+ //
917
+ // It's important that we either flush or drop all logs before returning
918
+ // because the startup state is reported after flush.
919
+ sendDone := make (chan struct {})
920
+ send := make (chan []agentsdk.StartupLog , 1 )
921
+ go func () {
922
+ flushTimeout := 100 * time .Millisecond
923
+ flush := time .NewTimer (flushTimeout )
924
+ flush .Stop ()
928
925
929
- // sendLogs function uploads the queued logs to the server
930
- sendLogs := func () {
931
- // Lock logMutex and check if logs are already being sent
932
- logMutex .Lock ()
933
- if logsSending {
934
- logMutex .Unlock ()
935
- return
936
- }
937
- if flushLogsTimer != nil {
938
- flushLogsTimer .Stop ()
939
- }
940
- if len (queuedLogs ) == 0 {
941
- logMutex .Unlock ()
942
- return
943
- }
944
- // Move the current queued logs to logsToSend and clear the queue
945
- logsToSend := queuedLogs
946
- logsSending = true
947
- queuedLogs = make ([]agentsdk.StartupLog , 0 )
948
- logMutex .Unlock ()
949
-
950
- // Retry uploading logs until successful or a specific error occurs
951
- for r := retry .New (time .Second , 5 * time .Second ); r .Wait (ctx ); {
952
- err := a .client .PatchStartupLogs (ctx , agentsdk.PatchStartupLogs {
953
- Logs : logsToSend ,
954
- })
955
- if err == nil {
956
- break
926
+ defer func () {
927
+ flush .Stop ()
928
+ a .logger .Debug (ctx , "track script logs sender exited" )
929
+ close (sendDone )
930
+ }()
931
+
932
+ var backlog []agentsdk.StartupLog
933
+ done := false
934
+ for {
935
+ flushed := false
936
+ select {
937
+ // Close (!ok) can be triggered by the reader closing due to
938
+ // EOF or due to agent closing, when this happens we attempt
939
+ // a final flush. If the context is canceled this will be a
940
+ // no-op.
941
+ case logs , ok := <- send :
942
+ done = ! ok
943
+ if ok {
944
+ backlog = append (backlog , logs ... )
945
+ flushed = len (backlog ) >= 100
946
+ }
947
+ case <- flush .C :
948
+ flushed = true
957
949
}
958
- var sdkErr * codersdk.Error
959
- if errors .As (err , & sdkErr ) {
960
- if sdkErr .StatusCode () == http .StatusRequestEntityTooLarge {
961
- a .logger .Warn (ctx , "startup logs too large, dropping logs" )
962
- break
950
+
951
+ if (done || flushed ) && len (backlog ) > 0 {
952
+ // Retry uploading logs until successful or a specific
953
+ // error occurs.
954
+ for r := retry .New (time .Second , 5 * time .Second ); r .Wait (ctx ); {
955
+ err := a .client .PatchStartupLogs (ctx , agentsdk.PatchStartupLogs {
956
+ Logs : backlog ,
957
+ })
958
+ if err == nil {
959
+ break
960
+ }
961
+
962
+ if errors .Is (err , context .Canceled ) {
963
+ return
964
+ }
965
+ var sdkErr * codersdk.Error
966
+ if errors .As (err , & sdkErr ) {
967
+ if sdkErr .StatusCode () == http .StatusRequestEntityTooLarge {
968
+ a .logger .Warn (ctx , "startup logs too large, dropping logs" )
969
+ break
970
+ }
971
+ }
972
+ a .logger .Error (ctx , "upload startup logs failed" , slog .Error (err ), slog .F ("to_send" , backlog ))
973
+ }
974
+ if ctx .Err () != nil {
975
+ return
963
976
}
977
+ backlog = nil
964
978
}
965
- a .logger .Error (ctx , "upload startup logs" , slog .Error (err ), slog .F ("to_send" , logsToSend ))
966
- }
967
- // Reset logsSending flag
968
- logMutex .Lock ()
969
- logsSending = false
970
- flushLogsTimer .Reset (100 * time .Millisecond )
971
- logMutex .Unlock ()
972
- logsFlushed .Broadcast ()
973
- }
974
- // queueLog function appends a log to the queue and triggers sendLogs if necessary
975
- queueLog := func (log agentsdk.StartupLog ) {
976
- logMutex .Lock ()
977
- defer logMutex .Unlock ()
978
-
979
- // Append log to the queue
980
- queuedLogs = append (queuedLogs , log )
981
-
982
- // If there are more than 100 logs, send them immediately
983
- if len (queuedLogs ) > 100 {
984
- // Don't early return after this, because we still want
985
- // to reset the timer just in case logs come in while
986
- // we're sending.
987
- go sendLogs ()
988
- }
989
- // Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
990
- if flushLogsTimer != nil {
991
- flushLogsTimer .Reset (100 * time .Millisecond )
992
- return
979
+ if done {
980
+ return
981
+ }
982
+
983
+ flush .Reset (flushTimeout )
993
984
}
994
- flushLogsTimer = time .AfterFunc (100 * time .Millisecond , sendLogs )
995
- }
985
+ }()
996
986
997
- // It's important that we either flush or drop all logs before returning
998
- // because the startup state is reported after flush .
987
+ // Forward read lines to the sender or queue them for when the
988
+ // sender is ready to process them .
999
989
//
1000
- // It'd be weird for the startup state to be ready, but logs are still
1001
- // coming in .
1002
- logsFinished := make (chan struct {})
990
+ // We only need to track this goroutine since it will ensure that
991
+ // the sender has closed before returning .
992
+ logsDone := make (chan struct {})
1003
993
err := a .trackConnGoroutine (func () {
1004
- scanner := bufio .NewScanner (reader )
1005
- for scanner .Scan () {
1006
- queueLog (agentsdk.StartupLog {
994
+ defer func () {
995
+ close (send )
996
+ <- sendDone
997
+ a .logger .Debug (ctx , "track script logs reader exited" )
998
+ close (logsDone )
999
+ }()
1000
+
1001
+ var queue []agentsdk.StartupLog
1002
+
1003
+ s := bufio .NewScanner (reader )
1004
+ for s .Scan () {
1005
+ select {
1006
+ case <- ctx .Done ():
1007
+ return
1008
+ case <- a .closed :
1009
+ return
1010
+ case queue = <- send :
1011
+ // Not captured by sender yet, re-use.
1012
+ default :
1013
+ }
1014
+
1015
+ queue = append (queue , agentsdk.StartupLog {
1007
1016
CreatedAt : database .Now (),
1008
- Output : scanner .Text (),
1017
+ Output : s .Text (),
1009
1018
})
1019
+ send <- queue
1020
+ queue = nil
1010
1021
}
1011
- if err := scanner .Err (); err != nil {
1012
- a .logger .Error (ctx , "scan startup logs" , slog .Error (err ))
1013
- }
1014
- defer close (logsFinished )
1015
- logsFlushed .L .Lock ()
1016
- for {
1017
- logMutex .Lock ()
1018
- if len (queuedLogs ) == 0 {
1019
- logMutex .Unlock ()
1020
- break
1021
- }
1022
- logMutex .Unlock ()
1023
- logsFlushed .Wait ()
1022
+ if err := s .Err (); err != nil {
1023
+ a .logger .Error (ctx , "scan startup logs failed" , slog .Error (err ))
1024
1024
}
1025
1025
})
1026
1026
if err != nil {
1027
- return nil , xerrors .Errorf ("track conn goroutine: %w" , err )
1027
+ close (send )
1028
+ <- sendDone
1029
+ return nil , err
1028
1030
}
1029
- return logsFinished , nil
1031
+
1032
+ return logsDone , nil
1030
1033
}
1031
1034
1032
1035
func (a * agent ) handleReconnectingPTY (ctx context.Context , logger slog.Logger , msg codersdk.WorkspaceAgentReconnectingPTYInit , conn net.Conn ) (retErr error ) {
0 commit comments