Skip to content

Commit 772dff1

Browse files
committed
fix: Enable reconnectingpty loadtest and fix/improve logging
This commit re-enabled reconnectingpty loadtests after a logging refactor of `(*agent).handleReconnectingPTY`. The reasons the tests were flaking was that `logger.Error` was being called and `slogtest` failing the test. We could have set the option for `slogtest` to disable failing, but that could hide real issues. The current approach improves reconnectingpty logging overall and provides more insight into what's happening. It's expected that reconnectingpty sessions fail after the agent is closed, so calling `logger.Error` at that point is not wanted. Ref: #5322
1 parent 50d1c71 commit 772dff1

File tree

3 files changed

+64
-31
lines changed

3 files changed

+64
-31
lines changed

agent/agent.go

+49-27
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/spf13/afero"
3131
"go.uber.org/atomic"
3232
gossh "golang.org/x/crypto/ssh"
33+
"golang.org/x/exp/slices"
3334
"golang.org/x/xerrors"
3435
"tailscale.com/net/speedtest"
3536
"tailscale.com/tailcfg"
@@ -90,7 +91,7 @@ func New(options Options) io.Closer {
9091
}
9192
}
9293
ctx, cancelFunc := context.WithCancel(context.Background())
93-
server := &agent{
94+
a := &agent{
9495
reconnectingPTYTimeout: options.ReconnectingPTYTimeout,
9596
logger: options.Logger,
9697
closeCancel: cancelFunc,
@@ -101,8 +102,8 @@ func New(options Options) io.Closer {
101102
filesystem: options.Filesystem,
102103
tempDir: options.TempDir,
103104
}
104-
server.init(ctx)
105-
return server
105+
a.init(ctx)
106+
return a
106107
}
107108

108109
type agent struct {
@@ -300,10 +301,12 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_
300301
}
301302
}()
302303
if err = a.trackConnGoroutine(func() {
304+
logger := a.logger.Named("reconnecting-pty")
305+
303306
for {
304307
conn, err := reconnectingPTYListener.Accept()
305308
if err != nil {
306-
a.logger.Debug(ctx, "accept pty failed", slog.Error(err))
309+
logger.Debug(ctx, "accept pty failed", slog.Error(err))
307310
return
308311
}
309312
// This cannot use a JSON decoder, since that can
@@ -324,7 +327,9 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_
324327
if err != nil {
325328
continue
326329
}
327-
go a.handleReconnectingPTY(ctx, msg, conn)
330+
go func() {
331+
_ = a.handleReconnectingPTY(ctx, logger, msg, conn)
332+
}()
328333
}
329334
}); err != nil {
330335
return nil, err
@@ -798,38 +803,56 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
798803
return cmd.Wait()
799804
}
800805

801-
func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.ReconnectingPTYInit, conn net.Conn) {
806+
func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.ReconnectingPTYInit, conn net.Conn) (retErr error) {
802807
defer conn.Close()
803808

804809
connectionID := uuid.NewString()
810+
logger = logger.With(slog.F("id", msg.ID), slog.F("connection_id", connectionID))
811+
812+
defer func() {
813+
if err := retErr; err != nil {
814+
a.closeMutex.Lock()
815+
closed := a.isClosed()
816+
a.closeMutex.Unlock()
817+
818+
// If the agent is closed, we don't want to
819+
// log this as an error since it's expected.
820+
if closed {
821+
logger.Debug(ctx, "session error after agent close", slog.Error(err))
822+
} else {
823+
logger.Error(ctx, "session error", slog.Error(err))
824+
}
825+
}
826+
logger.Debug(ctx, "session closed")
827+
}()
828+
805829
var rpty *reconnectingPTY
806830
rawRPTY, ok := a.reconnectingPTYs.Load(msg.ID)
807831
if ok {
832+
logger.Debug(ctx, "connecting to existing session")
808833
rpty, ok = rawRPTY.(*reconnectingPTY)
809834
if !ok {
810-
a.logger.Error(ctx, "found invalid type in reconnecting pty map", slog.F("id", msg.ID))
811-
return
835+
return xerrors.Errorf("found invalid type in reconnecting pty map: %T", rawRPTY)
812836
}
813837
} else {
838+
logger.Debug(ctx, "creating new session")
839+
814840
// Empty command will default to the users shell!
815841
cmd, err := a.createCommand(ctx, msg.Command, nil)
816842
if err != nil {
817-
a.logger.Error(ctx, "create reconnecting pty command", slog.Error(err))
818-
return
843+
return xerrors.Errorf("create command: %w", err)
819844
}
820845
cmd.Env = append(cmd.Env, "TERM=xterm-256color")
821846

822847
// Default to buffer 64KiB.
823848
circularBuffer, err := circbuf.NewBuffer(64 << 10)
824849
if err != nil {
825-
a.logger.Error(ctx, "create circular buffer", slog.Error(err))
826-
return
850+
return xerrors.Errorf("create circular buffer: %w", err)
827851
}
828852

829853
ptty, process, err := pty.Start(cmd)
830854
if err != nil {
831-
a.logger.Error(ctx, "start reconnecting pty command", slog.F("id", msg.ID), slog.Error(err))
832-
return
855+
return xerrors.Errorf("start command: %w", err)
833856
}
834857

835858
ctx, cancelFunc := context.WithCancel(ctx)
@@ -873,7 +896,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
873896
_, err = rpty.circularBuffer.Write(part)
874897
rpty.circularBufferMutex.Unlock()
875898
if err != nil {
876-
a.logger.Error(ctx, "reconnecting pty write buffer", slog.Error(err), slog.F("id", msg.ID))
899+
logger.Error(ctx, "write to circular buffer", slog.Error(err))
877900
break
878901
}
879902
rpty.activeConnsMutex.Lock()
@@ -889,23 +912,22 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
889912
rpty.Close()
890913
a.reconnectingPTYs.Delete(msg.ID)
891914
}); err != nil {
892-
a.logger.Error(ctx, "start reconnecting pty routine", slog.F("id", msg.ID), slog.Error(err))
893-
return
915+
return xerrors.Errorf("start routine: %w", err)
894916
}
895917
}
896918
// Resize the PTY to initial height + width.
897919
err := rpty.ptty.Resize(msg.Height, msg.Width)
898920
if err != nil {
899921
// We can continue after this, it's not fatal!
900-
a.logger.Error(ctx, "resize reconnecting pty", slog.F("id", msg.ID), slog.Error(err))
922+
logger.Error(ctx, "resize", slog.Error(err))
901923
}
902924
// Write any previously stored data for the TTY.
903925
rpty.circularBufferMutex.RLock()
904-
_, err = conn.Write(rpty.circularBuffer.Bytes())
926+
prevBuf := slices.Clone(rpty.circularBuffer.Bytes())
905927
rpty.circularBufferMutex.RUnlock()
928+
_, err = conn.Write(prevBuf)
906929
if err != nil {
907-
a.logger.Warn(ctx, "write reconnecting pty buffer", slog.F("id", msg.ID), slog.Error(err))
908-
return
930+
return xerrors.Errorf("write buffer to conn: %w", err)
909931
}
910932
// Multiple connections to the same TTY are permitted.
911933
// This could easily be used for terminal sharing, but
@@ -946,16 +968,16 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
946968
for {
947969
err = decoder.Decode(&req)
948970
if xerrors.Is(err, io.EOF) {
949-
return
971+
return nil
950972
}
951973
if err != nil {
952-
a.logger.Warn(ctx, "reconnecting pty buffer read error", slog.F("id", msg.ID), slog.Error(err))
953-
return
974+
logger.Warn(ctx, "read conn", slog.Error(err))
975+
return nil
954976
}
955977
_, err = rpty.ptty.Input().Write([]byte(req.Data))
956978
if err != nil {
957-
a.logger.Warn(ctx, "write to reconnecting pty", slog.F("id", msg.ID), slog.Error(err))
958-
return
979+
logger.Warn(ctx, "write to pty", slog.Error(err))
980+
return nil
959981
}
960982
// Check if a resize needs to happen!
961983
if req.Height == 0 || req.Width == 0 {
@@ -964,7 +986,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
964986
err = rpty.ptty.Resize(req.Height, req.Width)
965987
if err != nil {
966988
// We can continue after this, it's not fatal!
967-
a.logger.Error(ctx, "resize reconnecting pty", slog.F("id", msg.ID), slog.Error(err))
989+
logger.Error(ctx, "resize", slog.Error(err))
968990
}
969991
}
970992
}

coderd/coderdtest/coderdtest.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -552,39 +552,50 @@ func UpdateTemplateVersion(t *testing.T, client *codersdk.Client, organizationID
552552
func AwaitTemplateVersionJob(t *testing.T, client *codersdk.Client, version uuid.UUID) codersdk.TemplateVersion {
553553
t.Helper()
554554

555+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
556+
defer cancel()
557+
555558
t.Logf("waiting for template version job %s", version)
556559
var templateVersion codersdk.TemplateVersion
557560
require.Eventually(t, func() bool {
558561
var err error
559-
templateVersion, err = client.TemplateVersion(context.Background(), version)
562+
templateVersion, err = client.TemplateVersion(ctx, version)
560563
return assert.NoError(t, err) && templateVersion.Job.CompletedAt != nil
561564
}, testutil.WaitMedium, testutil.IntervalFast)
565+
t.Logf("got template version job %s", version)
562566
return templateVersion
563567
}
564568

565569
// AwaitWorkspaceBuildJob waits for a workspace provision job to reach completed status.
566570
func AwaitWorkspaceBuildJob(t *testing.T, client *codersdk.Client, build uuid.UUID) codersdk.WorkspaceBuild {
567571
t.Helper()
568572

573+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
574+
defer cancel()
575+
569576
t.Logf("waiting for workspace build job %s", build)
570577
var workspaceBuild codersdk.WorkspaceBuild
571578
require.Eventually(t, func() bool {
572579
var err error
573-
workspaceBuild, err = client.WorkspaceBuild(context.Background(), build)
580+
workspaceBuild, err = client.WorkspaceBuild(ctx, build)
574581
return assert.NoError(t, err) && workspaceBuild.Job.CompletedAt != nil
575582
}, testutil.WaitShort, testutil.IntervalFast)
583+
t.Logf("got workspace build job %s", build)
576584
return workspaceBuild
577585
}
578586

579587
// AwaitWorkspaceAgents waits for all resources with agents to be connected.
580588
func AwaitWorkspaceAgents(t *testing.T, client *codersdk.Client, workspaceID uuid.UUID) []codersdk.WorkspaceResource {
581589
t.Helper()
582590

591+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
592+
defer cancel()
593+
583594
t.Logf("waiting for workspace agents (workspace %s)", workspaceID)
584595
var resources []codersdk.WorkspaceResource
585596
require.Eventually(t, func() bool {
586597
var err error
587-
workspace, err := client.Workspace(context.Background(), workspaceID)
598+
workspace, err := client.Workspace(ctx, workspaceID)
588599
if !assert.NoError(t, err) {
589600
return false
590601
}
@@ -604,6 +615,7 @@ func AwaitWorkspaceAgents(t *testing.T, client *codersdk.Client, workspaceID uui
604615

605616
return true
606617
}, testutil.WaitLong, testutil.IntervalFast)
618+
t.Logf("got workspace agents (workspace %s)", workspaceID)
607619
return resources
608620
}
609621

loadtest/reconnectingpty/run_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
func Test_Runner(t *testing.T) {
2424
t.Parallel()
25-
t.Skip("See: https://github.com/coder/coder/issues/5247")
2625

2726
t.Run("OK", func(t *testing.T) {
2827
t.Parallel()

0 commit comments

Comments
 (0)