@@ -913,121 +913,124 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er
913
913
}
914
914
915
915
func (a * agent ) trackScriptLogs (ctx context.Context , reader io.Reader ) (chan struct {}, error ) {
916
- // Initialize variables for log management
917
- queuedLogs := make ([]agentsdk.StartupLog , 0 )
918
- var flushLogsTimer * time.Timer
919
- var logMutex sync.Mutex
920
- logsFlushed := sync .NewCond (& sync.Mutex {})
921
- var logsSending bool
922
- defer func () {
923
- logMutex .Lock ()
924
- if flushLogsTimer != nil {
925
- flushLogsTimer .Stop ()
926
- }
927
- logMutex .Unlock ()
928
- }()
916
+ // Synchronous sender, there can only be one outbound send at a time.
917
+ //
918
+ // It's important that we either flush or drop all logs before returning
919
+ // because the startup state is reported after flush.
920
+ sendDone := make (chan struct {})
921
+ send := make (chan []agentsdk.StartupLog , 1 )
922
+ go func () {
923
+ flushTimeout := 100 * time .Millisecond
924
+ flush := time .NewTimer (flushTimeout )
925
+ flush .Stop ()
929
926
930
- // sendLogs function uploads the queued logs to the server
931
- sendLogs := func () {
932
- // Lock logMutex and check if logs are already being sent
933
- logMutex .Lock ()
934
- if logsSending {
935
- logMutex .Unlock ()
936
- return
937
- }
938
- if flushLogsTimer != nil {
939
- flushLogsTimer .Stop ()
940
- }
941
- if len (queuedLogs ) == 0 {
942
- logMutex .Unlock ()
943
- return
944
- }
945
- // Move the current queued logs to logsToSend and clear the queue
946
- logsToSend := queuedLogs
947
- logsSending = true
948
- queuedLogs = make ([]agentsdk.StartupLog , 0 )
949
- logMutex .Unlock ()
950
-
951
- // Retry uploading logs until successful or a specific error occurs
952
- for r := retry .New (time .Second , 5 * time .Second ); r .Wait (ctx ); {
953
- err := a .client .PatchStartupLogs (ctx , agentsdk.PatchStartupLogs {
954
- Logs : logsToSend ,
955
- })
956
- if err == nil {
957
- break
927
+ defer func () {
928
+ flush .Stop ()
929
+ a .logger .Debug (ctx , "track script logs sender exited" )
930
+ close (sendDone )
931
+ }()
932
+
933
+ var backlog []agentsdk.StartupLog
934
+ done := false
935
+ for {
936
+ flushed := false
937
+ select {
938
+ // Close (!ok) can be triggered by the reader closing due to
939
+ // EOF or due to agent closing, when this happens we attempt
940
+ // a final flush. If the context is canceled this will be a
941
+ // no-op.
942
+ case logs , ok := <- send :
943
+ done = ! ok
944
+ if ok {
945
+ backlog = append (backlog , logs ... )
946
+ flushed = len (backlog ) >= 100
947
+ }
948
+ case <- flush .C :
949
+ flushed = true
958
950
}
959
- var sdkErr * codersdk.Error
960
- if errors .As (err , & sdkErr ) {
961
- if sdkErr .StatusCode () == http .StatusRequestEntityTooLarge {
962
- a .logger .Warn (ctx , "startup logs too large, dropping logs" )
963
- break
951
+
952
+ if (done || flushed ) && len (backlog ) > 0 {
953
+ // Retry uploading logs until successful or a specific
954
+ // error occurs.
955
+ for r := retry .New (time .Second , 5 * time .Second ); r .Wait (ctx ); {
956
+ err := a .client .PatchStartupLogs (ctx , agentsdk.PatchStartupLogs {
957
+ Logs : backlog ,
958
+ })
959
+ if err == nil {
960
+ break
961
+ }
962
+
963
+ if errors .Is (err , context .Canceled ) {
964
+ return
965
+ }
966
+ var sdkErr * codersdk.Error
967
+ if errors .As (err , & sdkErr ) {
968
+ if sdkErr .StatusCode () == http .StatusRequestEntityTooLarge {
969
+ a .logger .Warn (ctx , "startup logs too large, dropping logs" )
970
+ break
971
+ }
972
+ }
973
+ a .logger .Error (ctx , "upload startup logs failed" , slog .Error (err ), slog .F ("to_send" , backlog ))
974
+ }
975
+ if ctx .Err () != nil {
976
+ return
964
977
}
978
+ backlog = nil
965
979
}
966
- a .logger .Error (ctx , "upload startup logs" , slog .Error (err ), slog .F ("to_send" , logsToSend ))
967
- }
968
- // Reset logsSending flag
969
- logMutex .Lock ()
970
- logsSending = false
971
- flushLogsTimer .Reset (100 * time .Millisecond )
972
- logMutex .Unlock ()
973
- logsFlushed .Broadcast ()
974
- }
975
- // queueLog function appends a log to the queue and triggers sendLogs if necessary
976
- queueLog := func (log agentsdk.StartupLog ) {
977
- logMutex .Lock ()
978
- defer logMutex .Unlock ()
979
-
980
- // Append log to the queue
981
- queuedLogs = append (queuedLogs , log )
982
-
983
- // If there are more than 100 logs, send them immediately
984
- if len (queuedLogs ) > 100 {
985
- // Don't early return after this, because we still want
986
- // to reset the timer just in case logs come in while
987
- // we're sending.
988
- go sendLogs ()
989
- }
990
- // Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
991
- if flushLogsTimer != nil {
992
- flushLogsTimer .Reset (100 * time .Millisecond )
993
- return
980
+ if done {
981
+ return
982
+ }
983
+
984
+ flush .Reset (flushTimeout )
994
985
}
995
- flushLogsTimer = time .AfterFunc (100 * time .Millisecond , sendLogs )
996
- }
986
+ }()
997
987
998
- // It's important that we either flush or drop all logs before returning
999
- // because the startup state is reported after flush .
988
+ // Forward read lines to the sender or queue them for when the
989
+ // sender is ready to process them .
1000
990
//
1001
- // It'd be weird for the startup state to be ready, but logs are still
1002
- // coming in .
1003
- logsFinished := make (chan struct {})
991
+ // We only need to track this goroutine since it will ensure that
992
+ // the sender has closed before returning .
993
+ logsDone := make (chan struct {})
1004
994
err := a .trackConnGoroutine (func () {
1005
- scanner := bufio .NewScanner (reader )
1006
- for scanner .Scan () {
1007
- queueLog (agentsdk.StartupLog {
995
+ defer func () {
996
+ close (send )
997
+ <- sendDone
998
+ a .logger .Debug (ctx , "track script logs reader exited" )
999
+ close (logsDone )
1000
+ }()
1001
+
1002
+ var queue []agentsdk.StartupLog
1003
+
1004
+ s := bufio .NewScanner (reader )
1005
+ for s .Scan () {
1006
+ select {
1007
+ case <- ctx .Done ():
1008
+ return
1009
+ case <- a .closed :
1010
+ return
1011
+ case queue = <- send :
1012
+ // Not captured by sender yet, re-use.
1013
+ default :
1014
+ }
1015
+
1016
+ queue = append (queue , agentsdk.StartupLog {
1008
1017
CreatedAt : database .Now (),
1009
- Output : scanner .Text (),
1018
+ Output : s .Text (),
1010
1019
})
1020
+ send <- queue
1021
+ queue = nil
1011
1022
}
1012
- if err := scanner .Err (); err != nil {
1013
- a .logger .Error (ctx , "scan startup logs" , slog .Error (err ))
1014
- }
1015
- defer close (logsFinished )
1016
- logsFlushed .L .Lock ()
1017
- for {
1018
- logMutex .Lock ()
1019
- if len (queuedLogs ) == 0 {
1020
- logMutex .Unlock ()
1021
- break
1022
- }
1023
- logMutex .Unlock ()
1024
- logsFlushed .Wait ()
1023
+ if err := s .Err (); err != nil {
1024
+ a .logger .Error (ctx , "scan startup logs failed" , slog .Error (err ))
1025
1025
}
1026
1026
})
1027
1027
if err != nil {
1028
- return nil , xerrors .Errorf ("track conn goroutine: %w" , err )
1028
+ close (send )
1029
+ <- sendDone
1030
+ return nil , err
1029
1031
}
1030
- return logsFinished , nil
1032
+
1033
+ return logsDone , nil
1031
1034
}
1032
1035
1033
1036
func (a * agent ) handleReconnectingPTY (ctx context.Context , logger slog.Logger , msg codersdk.WorkspaceAgentReconnectingPTYInit , conn net.Conn ) (retErr error ) {
0 commit comments