Skip to content

fix: Enable reconnectingpty loadtest and fix/improve logging #5403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 54 additions & 27 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/spf13/afero"
"go.uber.org/atomic"
gossh "golang.org/x/crypto/ssh"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
"tailscale.com/net/speedtest"
"tailscale.com/tailcfg"
Expand Down Expand Up @@ -90,7 +91,7 @@ func New(options Options) io.Closer {
}
}
ctx, cancelFunc := context.WithCancel(context.Background())
server := &agent{
a := &agent{
reconnectingPTYTimeout: options.ReconnectingPTYTimeout,
logger: options.Logger,
closeCancel: cancelFunc,
Expand All @@ -101,8 +102,8 @@ func New(options Options) io.Closer {
filesystem: options.Filesystem,
tempDir: options.TempDir,
}
server.init(ctx)
return server
a.init(ctx)
return a
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: This just bothered me, lol.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

;p I agree

}

type agent struct {
Expand Down Expand Up @@ -300,10 +301,12 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_
}
}()
if err = a.trackConnGoroutine(func() {
logger := a.logger.Named("reconnecting-pty")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: Named logging seemed like it would be helpful here, even though it's not fully a separate service.


for {
conn, err := reconnectingPTYListener.Accept()
if err != nil {
a.logger.Debug(ctx, "accept pty failed", slog.Error(err))
logger.Debug(ctx, "accept pty failed", slog.Error(err))
return
}
// This cannot use a JSON decoder, since that can
Expand All @@ -324,7 +327,9 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (_
if err != nil {
continue
}
go a.handleReconnectingPTY(ctx, msg, conn)
go func() {
_ = a.handleReconnectingPTY(ctx, logger, msg, conn)
}()
}
}); err != nil {
return nil, err
Expand Down Expand Up @@ -798,38 +803,56 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
return cmd.Wait()
}

func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.ReconnectingPTYInit, conn net.Conn) {
func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.ReconnectingPTYInit, conn net.Conn) (retErr error) {
defer conn.Close()

connectionID := uuid.NewString()
logger = logger.With(slog.F("id", msg.ID), slog.F("connection_id", connectionID))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: Addition of connection_id will help debug if logs are coming from one or multiple connections.


defer func() {
if err := retErr; err != nil {
a.closeMutex.Lock()
closed := a.isClosed()
a.closeMutex.Unlock()

// If the agent is closed, we don't want to
// log this as an error since it's expected.
if closed {
logger.Debug(ctx, "session error after agent close", slog.Error(err))
} else {
logger.Error(ctx, "session error", slog.Error(err))
}
}
logger.Debug(ctx, "session closed")
}()

var rpty *reconnectingPTY
rawRPTY, ok := a.reconnectingPTYs.Load(msg.ID)
if ok {
logger.Debug(ctx, "connecting to existing session")
rpty, ok = rawRPTY.(*reconnectingPTY)
if !ok {
a.logger.Error(ctx, "found invalid type in reconnecting pty map", slog.F("id", msg.ID))
return
return xerrors.Errorf("found invalid type in reconnecting pty map: %T", rawRPTY)
}
} else {
logger.Debug(ctx, "creating new session")

// Empty command will default to the users shell!
cmd, err := a.createCommand(ctx, msg.Command, nil)
if err != nil {
a.logger.Error(ctx, "create reconnecting pty command", slog.Error(err))
return
return xerrors.Errorf("create command: %w", err)
}
cmd.Env = append(cmd.Env, "TERM=xterm-256color")

// Default to buffer 64KiB.
circularBuffer, err := circbuf.NewBuffer(64 << 10)
if err != nil {
a.logger.Error(ctx, "create circular buffer", slog.Error(err))
return
return xerrors.Errorf("create circular buffer: %w", err)
}

ptty, process, err := pty.Start(cmd)
if err != nil {
a.logger.Error(ctx, "start reconnecting pty command", slog.F("id", msg.ID), slog.Error(err))
return
return xerrors.Errorf("start command: %w", err)
}

ctx, cancelFunc := context.WithCancel(ctx)
Expand Down Expand Up @@ -873,7 +896,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
_, err = rpty.circularBuffer.Write(part)
rpty.circularBufferMutex.Unlock()
if err != nil {
a.logger.Error(ctx, "reconnecting pty write buffer", slog.Error(err), slog.F("id", msg.ID))
logger.Error(ctx, "write to circular buffer", slog.Error(err))
break
}
rpty.activeConnsMutex.Lock()
Expand All @@ -889,23 +912,27 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
rpty.Close()
a.reconnectingPTYs.Delete(msg.ID)
}); err != nil {
a.logger.Error(ctx, "start reconnecting pty routine", slog.F("id", msg.ID), slog.Error(err))
return
return xerrors.Errorf("start routine: %w", err)
}
}
// Resize the PTY to initial height + width.
err := rpty.ptty.Resize(msg.Height, msg.Width)
if err != nil {
// We can continue after this, it's not fatal!
a.logger.Error(ctx, "resize reconnecting pty", slog.F("id", msg.ID), slog.Error(err))
logger.Error(ctx, "resize", slog.Error(err))
}
// Write any previously stored data for the TTY.
rpty.circularBufferMutex.RLock()
_, err = conn.Write(rpty.circularBuffer.Bytes())
prevBuf := slices.Clone(rpty.circularBuffer.Bytes())
rpty.circularBufferMutex.RUnlock()
// Note that there is a small race here between writing buffered
// data and storing conn in activeConns. This is likely a very minor
// edge case, but we should look into ways to avoid it. Holding
// activeConnsMutex would be one option, but holding this mutex
// while also holding circularBufferMutex seems dangerous.
_, err = conn.Write(prevBuf)
if err != nil {
a.logger.Warn(ctx, "write reconnecting pty buffer", slog.F("id", msg.ID), slog.Error(err))
return
return xerrors.Errorf("write buffer to conn: %w", err)
}
// Multiple connections to the same TTY are permitted.
// This could easily be used for terminal sharing, but
Expand Down Expand Up @@ -946,16 +973,16 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
for {
err = decoder.Decode(&req)
if xerrors.Is(err, io.EOF) {
return
return nil
}
if err != nil {
a.logger.Warn(ctx, "reconnecting pty buffer read error", slog.F("id", msg.ID), slog.Error(err))
return
logger.Warn(ctx, "read conn", slog.Error(err))
return nil
}
_, err = rpty.ptty.Input().Write([]byte(req.Data))
if err != nil {
a.logger.Warn(ctx, "write to reconnecting pty", slog.F("id", msg.ID), slog.Error(err))
return
logger.Warn(ctx, "write to pty", slog.Error(err))
return nil
}
// Check if a resize needs to happen!
if req.Height == 0 || req.Width == 0 {
Expand All @@ -964,7 +991,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg codersdk.Reconnec
err = rpty.ptty.Resize(req.Height, req.Width)
if err != nil {
// We can continue after this, it's not fatal!
a.logger.Error(ctx, "resize reconnecting pty", slog.F("id", msg.ID), slog.Error(err))
logger.Error(ctx, "resize", slog.Error(err))
}
}
}
Expand Down
18 changes: 15 additions & 3 deletions coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,39 +552,50 @@ func UpdateTemplateVersion(t *testing.T, client *codersdk.Client, organizationID
func AwaitTemplateVersionJob(t *testing.T, client *codersdk.Client, version uuid.UUID) codersdk.TemplateVersion {
t.Helper()

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium)
defer cancel()

t.Logf("waiting for template version job %s", version)
var templateVersion codersdk.TemplateVersion
require.Eventually(t, func() bool {
var err error
templateVersion, err = client.TemplateVersion(context.Background(), version)
templateVersion, err = client.TemplateVersion(ctx, version)
return assert.NoError(t, err) && templateVersion.Job.CompletedAt != nil
}, testutil.WaitMedium, testutil.IntervalFast)
t.Logf("got template version job %s", version)
return templateVersion
}

// AwaitWorkspaceBuildJob waits for a workspace provision job to reach completed status.
func AwaitWorkspaceBuildJob(t *testing.T, client *codersdk.Client, build uuid.UUID) codersdk.WorkspaceBuild {
t.Helper()

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()

t.Logf("waiting for workspace build job %s", build)
var workspaceBuild codersdk.WorkspaceBuild
require.Eventually(t, func() bool {
var err error
workspaceBuild, err = client.WorkspaceBuild(context.Background(), build)
workspaceBuild, err = client.WorkspaceBuild(ctx, build)
return assert.NoError(t, err) && workspaceBuild.Job.CompletedAt != nil
}, testutil.WaitShort, testutil.IntervalFast)
t.Logf("got workspace build job %s", build)
return workspaceBuild
}

// AwaitWorkspaceAgents waits for all resources with agents to be connected.
func AwaitWorkspaceAgents(t *testing.T, client *codersdk.Client, workspaceID uuid.UUID) []codersdk.WorkspaceResource {
t.Helper()

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

t.Logf("waiting for workspace agents (workspace %s)", workspaceID)
var resources []codersdk.WorkspaceResource
require.Eventually(t, func() bool {
var err error
workspace, err := client.Workspace(context.Background(), workspaceID)
workspace, err := client.Workspace(ctx, workspaceID)
if !assert.NoError(t, err) {
return false
}
Expand All @@ -604,6 +615,7 @@ func AwaitWorkspaceAgents(t *testing.T, client *codersdk.Client, workspaceID uui

return true
}, testutil.WaitLong, testutil.IntervalFast)
t.Logf("got workspace agents (workspace %s)", workspaceID)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: These log messages help discern the order of events.

return resources
}

Expand Down
1 change: 0 additions & 1 deletion loadtest/reconnectingpty/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

func Test_Runner(t *testing.T) {
t.Parallel()
t.Skip("See: https://github.com/coder/coder/issues/5247")

t.Run("OK", func(t *testing.T) {
t.Parallel()
Expand Down