@@ -41,6 +41,7 @@ import (
41
41
"cdr.dev/slog"
42
42
"github.com/coder/coder/agent/usershell"
43
43
"github.com/coder/coder/buildinfo"
44
+ "github.com/coder/coder/coderd/database"
44
45
"github.com/coder/coder/coderd/gitauth"
45
46
"github.com/coder/coder/codersdk"
46
47
"github.com/coder/coder/codersdk/agentsdk"
@@ -88,6 +89,7 @@ type Client interface {
88
89
PostLifecycle (ctx context.Context , state agentsdk.PostLifecycleRequest ) error
89
90
PostAppHealth (ctx context.Context , req agentsdk.PostAppHealthsRequest ) error
90
91
PostStartup (ctx context.Context , req agentsdk.PostStartupRequest ) error
92
+ PatchStartupLogs (ctx context.Context , req agentsdk.PatchStartupLogs ) error
91
93
}
92
94
93
95
func New (options Options ) io.Closer {
@@ -642,13 +644,32 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) error {
642
644
}
643
645
644
646
a .logger .Info (ctx , "running script" , slog .F ("lifecycle" , lifecycle ), slog .F ("script" , script ))
645
- writer , err := a .filesystem .OpenFile (filepath .Join (a .logDir , fmt .Sprintf ("coder-%s-script.log" , lifecycle )), os .O_CREATE | os .O_RDWR , 0o600 )
647
+ fileWriter , err := a .filesystem .OpenFile (filepath .Join (a .logDir , fmt .Sprintf ("coder-%s-script.log" , lifecycle )), os .O_CREATE | os .O_RDWR , 0o600 )
646
648
if err != nil {
647
649
return xerrors .Errorf ("open %s script log file: %w" , lifecycle , err )
648
650
}
649
651
defer func () {
650
- _ = writer .Close ()
652
+ _ = fileWriter .Close ()
651
653
}()
654
+
655
+ var writer io.Writer = fileWriter
656
+ if lifecycle == "startup" {
657
+ // Create pipes for startup logs reader and writer
658
+ logsReader , logsWriter := io .Pipe ()
659
+ defer func () {
660
+ _ = logsReader .Close ()
661
+ }()
662
+ writer = io .MultiWriter (fileWriter , logsWriter )
663
+ flushedLogs , err := a .trackScriptLogs (ctx , logsReader )
664
+ if err != nil {
665
+ return xerrors .Errorf ("track script logs: %w" , err )
666
+ }
667
+ defer func () {
668
+ _ = logsWriter .Close ()
669
+ <- flushedLogs
670
+ }()
671
+ }
672
+
652
673
cmd , err := a .createCommand (ctx , script , nil )
653
674
if err != nil {
654
675
return xerrors .Errorf ("create command: %w" , err )
@@ -664,10 +685,124 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) error {
664
685
665
686
return xerrors .Errorf ("run: %w" , err )
666
687
}
667
-
668
688
return nil
669
689
}
670
690
691
+ func (a * agent ) trackScriptLogs (ctx context.Context , reader io.Reader ) (chan struct {}, error ) {
692
+ // Initialize variables for log management
693
+ queuedLogs := make ([]agentsdk.StartupLog , 0 )
694
+ var flushLogsTimer * time.Timer
695
+ var logMutex sync.Mutex
696
+ logsFlushed := sync .NewCond (& sync.Mutex {})
697
+ var logsSending bool
698
+ defer func () {
699
+ logMutex .Lock ()
700
+ if flushLogsTimer != nil {
701
+ flushLogsTimer .Stop ()
702
+ }
703
+ logMutex .Unlock ()
704
+ }()
705
+
706
+ // sendLogs function uploads the queued logs to the server
707
+ sendLogs := func () {
708
+ // Lock logMutex and check if logs are already being sent
709
+ logMutex .Lock ()
710
+ if logsSending {
711
+ logMutex .Unlock ()
712
+ return
713
+ }
714
+ if flushLogsTimer != nil {
715
+ flushLogsTimer .Stop ()
716
+ }
717
+ if len (queuedLogs ) == 0 {
718
+ logMutex .Unlock ()
719
+ return
720
+ }
721
+ // Move the current queued logs to logsToSend and clear the queue
722
+ logsToSend := queuedLogs
723
+ logsSending = true
724
+ queuedLogs = make ([]agentsdk.StartupLog , 0 )
725
+ logMutex .Unlock ()
726
+
727
+ // Retry uploading logs until successful or a specific error occurs
728
+ for r := retry .New (time .Second , 5 * time .Second ); r .Wait (ctx ); {
729
+ err := a .client .PatchStartupLogs (ctx , agentsdk.PatchStartupLogs {
730
+ Logs : logsToSend ,
731
+ })
732
+ if err == nil {
733
+ break
734
+ }
735
+ var sdkErr * codersdk.Error
736
+ if errors .As (err , & sdkErr ) {
737
+ if sdkErr .StatusCode () == http .StatusRequestEntityTooLarge {
738
+ a .logger .Warn (ctx , "startup logs too large, dropping logs" )
739
+ break
740
+ }
741
+ }
742
+ a .logger .Error (ctx , "upload startup logs" , slog .Error (err ), slog .F ("to_send" , logsToSend ))
743
+ }
744
+ // Reset logsSending flag
745
+ logMutex .Lock ()
746
+ logsSending = false
747
+ flushLogsTimer .Reset (100 * time .Millisecond )
748
+ logMutex .Unlock ()
749
+ logsFlushed .Broadcast ()
750
+ }
751
+ // queueLog function appends a log to the queue and triggers sendLogs if necessary
752
+ queueLog := func (log agentsdk.StartupLog ) {
753
+ logMutex .Lock ()
754
+ defer logMutex .Unlock ()
755
+
756
+ // Append log to the queue
757
+ queuedLogs = append (queuedLogs , log )
758
+
759
+ // If there are more than 100 logs, send them immediately
760
+ if len (queuedLogs ) > 100 {
761
+ // Don't early return after this, because we still want
762
+ // to reset the timer just in case logs come in while
763
+ // we're sending.
764
+ go sendLogs ()
765
+ }
766
+ // Reset or set the flushLogsTimer to trigger sendLogs after 100 milliseconds
767
+ if flushLogsTimer != nil {
768
+ flushLogsTimer .Reset (100 * time .Millisecond )
769
+ return
770
+ }
771
+ flushLogsTimer = time .AfterFunc (100 * time .Millisecond , sendLogs )
772
+ }
773
+
774
+ // It's important that we either flush or drop all logs before returning
775
+ // because the startup state is reported after flush.
776
+ //
777
+ // It'd be weird for the startup state to be ready, but logs are still
778
+ // coming in.
779
+ logsFinished := make (chan struct {})
780
+ err := a .trackConnGoroutine (func () {
781
+ scanner := bufio .NewScanner (reader )
782
+ for scanner .Scan () {
783
+ queueLog (agentsdk.StartupLog {
784
+ CreatedAt : database .Now (),
785
+ Output : scanner .Text (),
786
+ })
787
+ }
788
+ defer close (logsFinished )
789
+ logsFlushed .L .Lock ()
790
+ for {
791
+ logMutex .Lock ()
792
+ if len (queuedLogs ) == 0 {
793
+ logMutex .Unlock ()
794
+ break
795
+ }
796
+ logMutex .Unlock ()
797
+ logsFlushed .Wait ()
798
+ }
799
+ })
800
+ if err != nil {
801
+ return nil , xerrors .Errorf ("track conn goroutine: %w" , err )
802
+ }
803
+ return logsFinished , nil
804
+ }
805
+
671
806
func (a * agent ) init (ctx context.Context ) {
672
807
// Clients' should ignore the host key when connecting.
673
808
// The agent needs to authenticate with coderd to SSH,
@@ -709,6 +844,7 @@ func (a *agent) init(ctx context.Context) {
709
844
_ = session .Exit (MagicSessionErrorCode )
710
845
return
711
846
}
847
+ _ = session .Exit (0 )
712
848
},
713
849
HostSigners : []ssh.Signer {randomSigner },
714
850
LocalPortForwardingCallback : func (ctx ssh.Context , destinationHost string , destinationPort uint32 ) bool {
@@ -965,7 +1101,9 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
965
1101
if err != nil {
966
1102
return xerrors .Errorf ("start command: %w" , err )
967
1103
}
1104
+ var wg sync.WaitGroup
968
1105
defer func () {
1106
+ defer wg .Wait ()
969
1107
closeErr := ptty .Close ()
970
1108
if closeErr != nil {
971
1109
a .logger .Warn (ctx , "failed to close tty" , slog .Error (closeErr ))
@@ -982,10 +1120,16 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
982
1120
}
983
1121
}
984
1122
}()
1123
+ // We don't add input copy to wait group because
1124
+ // it won't return until the session is closed.
985
1125
go func () {
986
1126
_ , _ = io .Copy (ptty .Input (), session )
987
1127
}()
1128
+ wg .Add (1 )
988
1129
go func () {
1130
+ // Ensure data is flushed to session on command exit, if we
1131
+ // close the session too soon, we might lose data.
1132
+ defer wg .Done ()
989
1133
_ , _ = io .Copy (session , ptty .Output ())
990
1134
}()
991
1135
err = process .Wait ()
0 commit comments