@@ -23,6 +23,7 @@ import (
23
23
"github.com/go-chi/chi/v5"
24
24
"github.com/google/uuid"
25
25
"golang.org/x/exp/maps"
26
+ "golang.org/x/exp/slices"
26
27
"golang.org/x/mod/semver"
27
28
"golang.org/x/sync/errgroup"
28
29
"golang.org/x/xerrors"
@@ -481,6 +482,15 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
481
482
return
482
483
}
483
484
485
+ workspace , err := api .Database .GetWorkspaceByAgentID (ctx , workspaceAgent .ID )
486
+ if err != nil {
487
+ httpapi .Write (ctx , rw , http .StatusInternalServerError , codersdk.Response {
488
+ Message : "Internal error fetching workspace by agent id." ,
489
+ Detail : err .Error (),
490
+ })
491
+ return
492
+ }
493
+
484
494
api .WebsocketWaitMutex .Lock ()
485
495
api .WebsocketWaitGroup .Add (1 )
486
496
api .WebsocketWaitMutex .Unlock ()
@@ -556,7 +566,8 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
556
566
go func () {
557
567
defer close (bufferedLogs )
558
568
559
- for {
569
+ keepGoing := true
570
+ for keepGoing {
560
571
select {
561
572
case <- ctx .Done ():
562
573
return
@@ -565,6 +576,18 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
565
576
t .Reset (recheckInterval )
566
577
}
567
578
579
+ agents , err := api .Database .GetWorkspaceAgentsInLatestBuildByWorkspaceID (ctx , workspace .ID )
580
+ if err != nil {
581
+ if xerrors .Is (err , context .Canceled ) {
582
+ return
583
+ }
584
+ logger .Warn (ctx , "failed to get workspace agents in latest build" , slog .Error (err ))
585
+ continue
586
+ }
587
+ // If the agent is no longer in the latest build, we can stop after
588
+ // checking once.
589
+ keepGoing = slices .ContainsFunc (agents , func (agent database.WorkspaceAgent ) bool { return agent .ID == workspaceAgent .ID })
590
+
568
591
logs , err := api .Database .GetWorkspaceAgentLogsAfter (ctx , database.GetWorkspaceAgentLogsAfterParams {
569
592
AgentID : workspaceAgent .ID ,
570
593
CreatedAfter : lastSentLogID ,
@@ -878,13 +901,15 @@ func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
878
901
})
879
902
return
880
903
}
881
- nconn := websocket . NetConn (ctx , ws , websocket .MessageBinary )
904
+ ctx , nconn := websocketNetConn (ctx , ws , websocket .MessageBinary )
882
905
defer nconn .Close ()
883
906
884
907
// Slurp all packets from the connection into io.Discard so pongs get sent
885
- // by the websocket package.
908
+ // by the websocket package. We don't do any reads ourselves so this is
909
+ // necessary.
886
910
go func () {
887
911
_ , _ = io .Copy (io .Discard , nconn )
912
+ _ = nconn .Close ()
888
913
}()
889
914
890
915
go func (ctx context.Context ) {
@@ -899,13 +924,11 @@ func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
899
924
return
900
925
}
901
926
902
- // We don't need a context that times out here because the ping will
903
- // eventually go through. If the context times out, then other
904
- // websocket read operations will receive an error, obfuscating the
905
- // actual problem.
927
+ ctx , cancel := context .WithTimeout (ctx , 30 * time .Second )
906
928
err := ws .Ping (ctx )
929
+ cancel ()
907
930
if err != nil {
908
- _ = ws .Close (websocket . StatusInternalError , err . Error () )
931
+ _ = nconn .Close ()
909
932
return
910
933
}
911
934
}
@@ -920,7 +943,7 @@ func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
920
943
if lastDERPMap == nil || ! tailnet .CompareDERPMaps (lastDERPMap , derpMap ) {
921
944
err := json .NewEncoder (nconn ).Encode (derpMap )
922
945
if err != nil {
923
- _ = ws .Close (websocket . StatusInternalError , err . Error () )
946
+ _ = nconn .Close ()
924
947
return
925
948
}
926
949
lastDERPMap = derpMap
0 commit comments