@@ -24,6 +24,7 @@ import (
24
24
25
25
"github.com/coder/coder/v2/agent/proto"
26
26
"github.com/coder/coder/v2/apiversion"
27
+ "github.com/coder/coder/v2/coderd/httpapi"
27
28
"github.com/coder/coder/v2/codersdk"
28
29
drpcsdk "github.com/coder/coder/v2/codersdk/drpc"
29
30
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
@@ -730,8 +731,86 @@ func (c *Client) WaitForReinit(ctx context.Context) (*ReinitializationEvent, err
730
731
return nil , codersdk .ReadBodyAsError (res )
731
732
}
732
733
733
- nextEvent := codersdk .ServerSentEventReader (ctx , res .Body )
734
+ reinitEvent , err := NewSSEAgentReinitReceiver (res .Body ).Receive (ctx )
735
+ if err != nil {
736
+ return nil , xerrors .Errorf ("listening for reinitialization events: %w" , err )
737
+ }
738
+ return reinitEvent , nil
739
+ }
740
+
741
+ func WaitForReinitLoop (ctx context.Context , logger slog.Logger , client * Client ) <- chan ReinitializationEvent {
742
+ reinitEvents := make (chan ReinitializationEvent )
743
+
744
+ go func () {
745
+ for retrier := retry .New (100 * time .Millisecond , 10 * time .Second ); retrier .Wait (ctx ); {
746
+ logger .Debug (ctx , "waiting for agent reinitialization instructions" )
747
+ reinitEvent , err := client .WaitForReinit (ctx )
748
+ if err != nil {
749
+ logger .Error (ctx , "failed to wait for agent reinitialization instructions" , slog .Error (err ))
750
+ continue
751
+ }
752
+ select {
753
+ case <- ctx .Done ():
754
+ close (reinitEvents )
755
+ return
756
+ case reinitEvents <- * reinitEvent :
757
+ }
758
+ }
759
+ }()
760
+
761
+ return reinitEvents
762
+ }
763
+
764
+ func NewSSEAgentReinitTransmitter (logger slog.Logger , rw http.ResponseWriter , r * http.Request ) * SSEAgentReinitTransmitter {
765
+ return & SSEAgentReinitTransmitter {logger : logger , rw : rw , r : r }
766
+ }
767
+
768
+ type SSEAgentReinitTransmitter struct {
769
+ rw http.ResponseWriter
770
+ r * http.Request
771
+ logger slog.Logger
772
+ }
773
+
774
+ func (s * SSEAgentReinitTransmitter ) Transmit (ctx context.Context , reinitEvents <- chan ReinitializationEvent ) error {
775
+ select {
776
+ case <- ctx .Done ():
777
+ return ctx .Err ()
778
+ default :
779
+ }
780
+
781
+ sseSendEvent , sseSenderClosed , err := httpapi .ServerSentEventSender (s .rw , s .r )
782
+ if err != nil {
783
+ return xerrors .Errorf ("failed to create sse transmitter: %w" , err )
784
+ }
734
785
786
+ for {
787
+ select {
788
+ case <- ctx .Done ():
789
+ return ctx .Err ()
790
+ case <- sseSenderClosed :
791
+ return xerrors .New ("sse connection closed" )
792
+ case reinitEvent := <- reinitEvents :
793
+ err := sseSendEvent (codersdk.ServerSentEvent {
794
+ Type : codersdk .ServerSentEventTypeData ,
795
+ Data : reinitEvent ,
796
+ })
797
+ if err != nil {
798
+ s .logger .Warn (ctx , "failed to send SSE response to trigger reinit" , slog .Error (err ))
799
+ }
800
+ }
801
+ }
802
+ }
803
+
804
+ func NewSSEAgentReinitReceiver (r io.ReadCloser ) * SSEAgentReinitReceiver {
805
+ return & SSEAgentReinitReceiver {r : r }
806
+ }
807
+
808
+ type SSEAgentReinitReceiver struct {
809
+ r io.ReadCloser
810
+ }
811
+
812
+ func (s * SSEAgentReinitReceiver ) Receive (ctx context.Context ) (* ReinitializationEvent , error ) {
813
+ nextEvent := codersdk .ServerSentEventReader (ctx , s .r )
735
814
for {
736
815
select {
737
816
case <- ctx .Done ():
@@ -763,26 +842,3 @@ func (c *Client) WaitForReinit(ctx context.Context) (*ReinitializationEvent, err
763
842
}
764
843
}
765
844
}
766
-
767
- func WaitForReinitLoop (ctx context.Context , logger slog.Logger , client * Client ) <- chan ReinitializationEvent {
768
- reinitEvents := make (chan ReinitializationEvent )
769
-
770
- go func () {
771
- for retrier := retry .New (100 * time .Millisecond , 10 * time .Second ); retrier .Wait (ctx ); {
772
- logger .Debug (ctx , "waiting for agent reinitialization instructions" )
773
- reinitEvent , err := client .WaitForReinit (ctx )
774
- if err != nil {
775
- logger .Error (ctx , "failed to wait for agent reinitialization instructions" , slog .Error (err ))
776
- continue
777
- }
778
- select {
779
- case <- ctx .Done ():
780
- close (reinitEvents )
781
- return
782
- case reinitEvents <- * reinitEvent :
783
- }
784
- }
785
- }()
786
-
787
- return reinitEvents
788
- }
0 commit comments