@@ -30,6 +30,7 @@ import (
30
30
"github.com/spf13/afero"
31
31
"go.uber.org/atomic"
32
32
gossh "golang.org/x/crypto/ssh"
33
+ "golang.org/x/exp/slices"
33
34
"golang.org/x/xerrors"
34
35
"tailscale.com/net/speedtest"
35
36
"tailscale.com/tailcfg"
@@ -90,7 +91,7 @@ func New(options Options) io.Closer {
90
91
}
91
92
}
92
93
ctx , cancelFunc := context .WithCancel (context .Background ())
93
- server := & agent {
94
+ a := & agent {
94
95
reconnectingPTYTimeout : options .ReconnectingPTYTimeout ,
95
96
logger : options .Logger ,
96
97
closeCancel : cancelFunc ,
@@ -101,8 +102,8 @@ func New(options Options) io.Closer {
101
102
filesystem : options .Filesystem ,
102
103
tempDir : options .TempDir ,
103
104
}
104
- server .init (ctx )
105
- return server
105
+ a .init (ctx )
106
+ return a
106
107
}
107
108
108
109
type agent struct {
@@ -300,10 +301,12 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_
300
301
}
301
302
}()
302
303
if err = a .trackConnGoroutine (func () {
304
+ logger := a .logger .Named ("reconnecting-pty" )
305
+
303
306
for {
304
307
conn , err := reconnectingPTYListener .Accept ()
305
308
if err != nil {
306
- a . logger .Debug (ctx , "accept pty failed" , slog .Error (err ))
309
+ logger .Debug (ctx , "accept pty failed" , slog .Error (err ))
307
310
return
308
311
}
309
312
// This cannot use a JSON decoder, since that can
@@ -324,7 +327,9 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_
324
327
if err != nil {
325
328
continue
326
329
}
327
- go a .handleReconnectingPTY (ctx , msg , conn )
330
+ go func () {
331
+ _ = a .handleReconnectingPTY (ctx , logger , msg , conn )
332
+ }()
328
333
}
329
334
}); err != nil {
330
335
return nil , err
@@ -798,38 +803,56 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
798
803
return cmd .Wait ()
799
804
}
800
805
801
- func (a * agent ) handleReconnectingPTY (ctx context.Context , msg codersdk.ReconnectingPTYInit , conn net.Conn ) {
806
+ func (a * agent ) handleReconnectingPTY (ctx context.Context , logger slog. Logger , msg codersdk.ReconnectingPTYInit , conn net.Conn ) ( retErr error ) {
802
807
defer conn .Close ()
803
808
804
809
connectionID := uuid .NewString ()
810
+ logger = logger .With (slog .F ("id" , msg .ID ), slog .F ("connection_id" , connectionID ))
811
+
812
+ defer func () {
813
+ if err := retErr ; err != nil {
814
+ a .closeMutex .Lock ()
815
+ closed := a .isClosed ()
816
+ a .closeMutex .Unlock ()
817
+
818
+ // If the agent is closed, we don't want to
819
+ // log this as an error since it's expected.
820
+ if closed {
821
+ logger .Debug (ctx , "session error after agent close" , slog .Error (err ))
822
+ } else {
823
+ logger .Error (ctx , "session error" , slog .Error (err ))
824
+ }
825
+ }
826
+ logger .Debug (ctx , "session closed" )
827
+ }()
828
+
805
829
var rpty * reconnectingPTY
806
830
rawRPTY , ok := a .reconnectingPTYs .Load (msg .ID )
807
831
if ok {
832
+ logger .Debug (ctx , "connecting to existing session" )
808
833
rpty , ok = rawRPTY .(* reconnectingPTY )
809
834
if ! ok {
810
- a .logger .Error (ctx , "found invalid type in reconnecting pty map" , slog .F ("id" , msg .ID ))
811
- return
835
+ return xerrors .Errorf ("found invalid type in reconnecting pty map: %T" , rawRPTY )
812
836
}
813
837
} else {
838
+ logger .Debug (ctx , "creating new session" )
839
+
814
840
// Empty command will default to the users shell!
815
841
cmd , err := a .createCommand (ctx , msg .Command , nil )
816
842
if err != nil {
817
- a .logger .Error (ctx , "create reconnecting pty command" , slog .Error (err ))
818
- return
843
+ return xerrors .Errorf ("create command: %w" , err )
819
844
}
820
845
cmd .Env = append (cmd .Env , "TERM=xterm-256color" )
821
846
822
847
// Default to buffer 64KiB.
823
848
circularBuffer , err := circbuf .NewBuffer (64 << 10 )
824
849
if err != nil {
825
- a .logger .Error (ctx , "create circular buffer" , slog .Error (err ))
826
- return
850
+ return xerrors .Errorf ("create circular buffer: %w" , err )
827
851
}
828
852
829
853
ptty , process , err := pty .Start (cmd )
830
854
if err != nil {
831
- a .logger .Error (ctx , "start reconnecting pty command" , slog .F ("id" , msg .ID ), slog .Error (err ))
832
- return
855
+ return xerrors .Errorf ("start command: %w" , err )
833
856
}
834
857
835
858
ctx , cancelFunc := context .WithCancel (ctx )
@@ -873,7 +896,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
873
896
_ , err = rpty .circularBuffer .Write (part )
874
897
rpty .circularBufferMutex .Unlock ()
875
898
if err != nil {
876
- a . logger .Error (ctx , "reconnecting pty write buffer" , slog .Error (err ), slog . F ( "id" , msg . ID ))
899
+ logger .Error (ctx , "write to circular buffer" , slog .Error (err ))
877
900
break
878
901
}
879
902
rpty .activeConnsMutex .Lock ()
@@ -889,23 +912,22 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
889
912
rpty .Close ()
890
913
a .reconnectingPTYs .Delete (msg .ID )
891
914
}); err != nil {
892
- a .logger .Error (ctx , "start reconnecting pty routine" , slog .F ("id" , msg .ID ), slog .Error (err ))
893
- return
915
+ return xerrors .Errorf ("start routine: %w" , err )
894
916
}
895
917
}
896
918
// Resize the PTY to initial height + width.
897
919
err := rpty .ptty .Resize (msg .Height , msg .Width )
898
920
if err != nil {
899
921
// We can continue after this, it's not fatal!
900
- a . logger .Error (ctx , "resize reconnecting pty" , slog . F ( "id" , msg . ID ) , slog .Error (err ))
922
+ logger .Error (ctx , "resize" , slog .Error (err ))
901
923
}
902
924
// Write any previously stored data for the TTY.
903
925
rpty .circularBufferMutex .RLock ()
904
- _ , err = conn . Write (rpty .circularBuffer .Bytes ())
926
+ prevBuf := slices . Clone (rpty .circularBuffer .Bytes ())
905
927
rpty .circularBufferMutex .RUnlock ()
928
+ _ , err = conn .Write (prevBuf )
906
929
if err != nil {
907
- a .logger .Warn (ctx , "write reconnecting pty buffer" , slog .F ("id" , msg .ID ), slog .Error (err ))
908
- return
930
+ return xerrors .Errorf ("write buffer to conn: %w" , err )
909
931
}
910
932
// Multiple connections to the same TTY are permitted.
911
933
// This could easily be used for terminal sharing, but
@@ -946,16 +968,16 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
946
968
for {
947
969
err = decoder .Decode (& req )
948
970
if xerrors .Is (err , io .EOF ) {
949
- return
971
+ return nil
950
972
}
951
973
if err != nil {
952
- a . logger .Warn (ctx , "reconnecting pty buffer read error" , slog . F ( "id" , msg . ID ) , slog .Error (err ))
953
- return
974
+ logger .Warn (ctx , "read conn" , slog .Error (err ))
975
+ return nil
954
976
}
955
977
_ , err = rpty .ptty .Input ().Write ([]byte (req .Data ))
956
978
if err != nil {
957
- a . logger .Warn (ctx , "write to reconnecting pty" , slog . F ( "id" , msg . ID ) , slog .Error (err ))
958
- return
979
+ logger .Warn (ctx , "write to pty" , slog .Error (err ))
980
+ return nil
959
981
}
960
982
// Check if a resize needs to happen!
961
983
if req .Height == 0 || req .Width == 0 {
@@ -964,7 +986,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
964
986
err = rpty .ptty .Resize (req .Height , req .Width )
965
987
if err != nil {
966
988
// We can continue after this, it's not fatal!
967
- a . logger .Error (ctx , "resize reconnecting pty" , slog . F ( "id" , msg . ID ) , slog .Error (err ))
989
+ logger .Error (ctx , "resize" , slog .Error (err ))
968
990
}
969
991
}
970
992
}
0 commit comments