Skip to content

Commit 9915118

Browse files
authored
feat(scaletest): replace bash with dd in ssh/rpty traffic and use pseudorandomness (#10821)
Fixes #10795 Refs #8556
1 parent 433be7b commit 9915118

File tree

6 files changed

+325
-145
lines changed

6 files changed

+325
-145
lines changed

cli/exp_scaletest.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"math/rand"
1111
"net/http"
1212
"os"
13+
"os/signal"
1314
"strconv"
1415
"strings"
1516
"sync"
@@ -173,11 +174,12 @@ func (s *scaletestStrategyFlags) attach(opts *clibase.OptionSet) {
173174

174175
func (s *scaletestStrategyFlags) toStrategy() harness.ExecutionStrategy {
175176
var strategy harness.ExecutionStrategy
176-
if s.concurrency == 1 {
177+
switch s.concurrency {
178+
case 1:
177179
strategy = harness.LinearExecutionStrategy{}
178-
} else if s.concurrency == 0 {
180+
case 0:
179181
strategy = harness.ConcurrentExecutionStrategy{}
180-
} else {
182+
default:
181183
strategy = harness.ParallelExecutionStrategy{
182184
Limit: int(s.concurrency),
183185
}
@@ -244,7 +246,9 @@ func (o *scaleTestOutput) write(res harness.Results, stdout io.Writer) error {
244246
err := s.Sync()
245247
// On Linux, EINVAL is returned when calling fsync on /dev/stdout. We
246248
// can safely ignore this error.
247-
if err != nil && !xerrors.Is(err, syscall.EINVAL) {
249+
// On macOS, ENOTTY is returned when calling sync on /dev/stdout. We
250+
// can safely ignore this error.
251+
if err != nil && !xerrors.Is(err, syscall.EINVAL) && !xerrors.Is(err, syscall.ENOTTY) {
248252
return xerrors.Errorf("flush output file: %w", err)
249253
}
250254
}
@@ -871,9 +875,13 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
871875
Middleware: clibase.Chain(
872876
r.InitClient(client),
873877
),
874-
Handler: func(inv *clibase.Invocation) error {
878+
Handler: func(inv *clibase.Invocation) (err error) {
875879
ctx := inv.Context()
876880

881+
notifyCtx, stop := signal.NotifyContext(ctx, InterruptSignals...) // Checked later.
882+
defer stop()
883+
ctx = notifyCtx
884+
877885
me, err := requireAdmin(ctx, client)
878886
if err != nil {
879887
return err
@@ -965,6 +973,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
965973
ReadMetrics: metrics.ReadMetrics(ws.OwnerName, ws.Name, agentName),
966974
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agentName),
967975
SSH: ssh,
976+
Echo: ssh,
968977
}
969978

970979
if err := config.Validate(); err != nil {
@@ -990,6 +999,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
990999
return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err)
9911000
}
9921001

1002+
// If the command was interrupted, skip stats.
1003+
if notifyCtx.Err() != nil {
1004+
return notifyCtx.Err()
1005+
}
1006+
9931007
res := th.Results()
9941008
for _, o := range outputs {
9951009
err = o.write(res, inv.Stdout)

scaletest/workspacetraffic/config.go

+6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ type Config struct {
2525
WriteMetrics ConnMetrics `json:"-"`
2626

2727
SSH bool `json:"ssh"`
28+
29+
// Echo controls whether the agent should echo the data it receives.
30+
// If false, the agent will discard the data. Note that setting this
31+
// to true will double the amount of data read from the agent for
32+
// PTYs (e.g. reconnecting pty or SSH connections that request PTY).
33+
Echo bool `json:"echo"`
2834
}
2935

3036
func (c Config) Validate() error {

scaletest/workspacetraffic/conn.go

+182-43
Original file line numberDiff line numberDiff line change
@@ -2,122 +2,261 @@ package workspacetraffic
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
57
"io"
68
"sync"
9+
"time"
710

811
"github.com/coder/coder/v2/codersdk"
912

1013
"github.com/google/uuid"
11-
"github.com/hashicorp/go-multierror"
1214
gossh "golang.org/x/crypto/ssh"
1315
"golang.org/x/xerrors"
1416
)
1517

16-
func connectPTY(ctx context.Context, client *codersdk.Client, agentID, reconnect uuid.UUID) (*countReadWriteCloser, error) {
18+
const (
19+
// Set a timeout for graceful close of the connection.
20+
connCloseTimeout = 30 * time.Second
21+
// Set a timeout for waiting for the connection to close.
22+
waitCloseTimeout = connCloseTimeout + 5*time.Second
23+
24+
// In theory, we can send larger payloads to push bandwidth, but we need to
25+
// be careful not to send too much data at once or the server will close the
26+
// connection. We see this more readily as our JSON payloads approach 28KB.
27+
//
28+
// failed to write frame: WebSocket closed: received close frame: status = StatusMessageTooBig and reason = "read limited at 32769 bytes"
29+
//
30+
// Since we can't control fragmentation/buffer sizes, we keep it simple and
31+
// match the conservative payload size used by agent/reconnectingpty (1024).
32+
rptyJSONMaxDataSize = 1024
33+
)
34+
35+
func connectRPTY(ctx context.Context, client *codersdk.Client, agentID, reconnect uuid.UUID, cmd string) (*countReadWriteCloser, error) {
36+
width, height := 80, 25
1737
conn, err := client.WorkspaceAgentReconnectingPTY(ctx, codersdk.WorkspaceAgentReconnectingPTYOpts{
1838
AgentID: agentID,
1939
Reconnect: reconnect,
20-
Height: 25,
21-
Width: 80,
22-
Command: "sh",
40+
Width: uint16(width),
41+
Height: uint16(height),
42+
Command: cmd,
2343
})
2444
if err != nil {
2545
return nil, xerrors.Errorf("connect pty: %w", err)
2646
}
2747

2848
// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd.
29-
crw := countReadWriteCloser{ctx: ctx, rwc: conn}
49+
crw := countReadWriteCloser{rwc: newPTYConn(conn)}
3050
return &crw, nil
3151
}
3252

33-
func connectSSH(ctx context.Context, client *codersdk.Client, agentID uuid.UUID) (*countReadWriteCloser, error) {
53+
type rptyConn struct {
54+
conn io.ReadWriteCloser
55+
wenc *json.Encoder
56+
57+
readOnce sync.Once
58+
readErr chan error
59+
60+
mu sync.Mutex // Protects following.
61+
closed bool
62+
}
63+
64+
func newPTYConn(conn io.ReadWriteCloser) *rptyConn {
65+
rc := &rptyConn{
66+
conn: conn,
67+
wenc: json.NewEncoder(conn),
68+
readErr: make(chan error, 1),
69+
}
70+
return rc
71+
}
72+
73+
func (c *rptyConn) Read(p []byte) (int, error) {
74+
n, err := c.conn.Read(p)
75+
if err != nil {
76+
c.readOnce.Do(func() {
77+
c.readErr <- err
78+
close(c.readErr)
79+
})
80+
return n, err
81+
}
82+
return n, nil
83+
}
84+
85+
func (c *rptyConn) Write(p []byte) (int, error) {
86+
c.mu.Lock()
87+
defer c.mu.Unlock()
88+
89+
// Early exit in case we're closing, this is to let call write Ctrl+C
90+
// without a flood of other writes.
91+
if c.closed {
92+
return 0, io.EOF
93+
}
94+
95+
return c.writeNoLock(p)
96+
}
97+
98+
func (c *rptyConn) writeNoLock(p []byte) (n int, err error) {
99+
// If we try to send more than the max payload size, the server will close the connection.
100+
for len(p) > 0 {
101+
pp := p
102+
if len(pp) > rptyJSONMaxDataSize {
103+
pp = p[:rptyJSONMaxDataSize]
104+
}
105+
p = p[len(pp):]
106+
req := codersdk.ReconnectingPTYRequest{Data: string(pp)}
107+
if err := c.wenc.Encode(req); err != nil {
108+
return n, xerrors.Errorf("encode pty request: %w", err)
109+
}
110+
n += len(pp)
111+
}
112+
return n, nil
113+
}
114+
115+
func (c *rptyConn) Close() (err error) {
116+
c.mu.Lock()
117+
if c.closed {
118+
c.mu.Unlock()
119+
return nil
120+
}
121+
c.closed = true
122+
c.mu.Unlock()
123+
124+
defer c.conn.Close()
125+
126+
// Send Ctrl+C to interrupt the command.
127+
_, err = c.writeNoLock([]byte("\u0003"))
128+
if err != nil {
129+
return xerrors.Errorf("write ctrl+c: %w", err)
130+
}
131+
select {
132+
case <-time.After(connCloseTimeout):
133+
return xerrors.Errorf("timeout waiting for read to finish")
134+
case err = <-c.readErr:
135+
if errors.Is(err, io.EOF) {
136+
return nil
137+
}
138+
return err
139+
}
140+
}
141+
142+
//nolint:revive // Ignore requestPTY control flag.
143+
func connectSSH(ctx context.Context, client *codersdk.Client, agentID uuid.UUID, cmd string, requestPTY bool) (rwc *countReadWriteCloser, err error) {
144+
var closers []func() error
145+
defer func() {
146+
if err != nil {
147+
for _, c := range closers {
148+
if err2 := c(); err2 != nil {
149+
err = errors.Join(err, err2)
150+
}
151+
}
152+
}
153+
}()
154+
34155
agentConn, err := client.DialWorkspaceAgent(ctx, agentID, &codersdk.DialWorkspaceAgentOptions{})
35156
if err != nil {
36157
return nil, xerrors.Errorf("dial workspace agent: %w", err)
37158
}
38-
agentConn.AwaitReachable(ctx)
159+
closers = append(closers, agentConn.Close)
160+
39161
sshClient, err := agentConn.SSHClient(ctx)
40162
if err != nil {
41163
return nil, xerrors.Errorf("get ssh client: %w", err)
42164
}
165+
closers = append(closers, sshClient.Close)
166+
43167
sshSession, err := sshClient.NewSession()
44168
if err != nil {
45-
_ = agentConn.Close()
46169
return nil, xerrors.Errorf("new ssh session: %w", err)
47170
}
48-
wrappedConn := &wrappedSSHConn{ctx: ctx}
171+
closers = append(closers, sshSession.Close)
172+
173+
wrappedConn := &wrappedSSHConn{}
174+
49175
// Do some plumbing to hook up the wrappedConn
50176
pr1, pw1 := io.Pipe()
177+
closers = append(closers, pr1.Close, pw1.Close)
51178
wrappedConn.stdout = pr1
52179
sshSession.Stdout = pw1
180+
53181
pr2, pw2 := io.Pipe()
182+
closers = append(closers, pr2.Close, pw2.Close)
54183
sshSession.Stdin = pr2
55184
wrappedConn.stdin = pw2
56-
err = sshSession.RequestPty("xterm", 25, 80, gossh.TerminalModes{})
57-
if err != nil {
58-
_ = pr1.Close()
59-
_ = pr2.Close()
60-
_ = pw1.Close()
61-
_ = pw2.Close()
62-
_ = sshSession.Close()
63-
_ = agentConn.Close()
64-
return nil, xerrors.Errorf("request pty: %w", err)
185+
186+
if requestPTY {
187+
err = sshSession.RequestPty("xterm", 25, 80, gossh.TerminalModes{})
188+
if err != nil {
189+
return nil, xerrors.Errorf("request pty: %w", err)
190+
}
65191
}
66-
err = sshSession.Shell()
192+
err = sshSession.Start(cmd)
67193
if err != nil {
68-
_ = sshSession.Close()
69-
_ = agentConn.Close()
70194
return nil, xerrors.Errorf("shell: %w", err)
71195
}
196+
waitErr := make(chan error, 1)
197+
go func() {
198+
waitErr <- sshSession.Wait()
199+
}()
72200

73201
closeFn := func() error {
74-
var merr error
75-
if err := sshSession.Close(); err != nil {
76-
merr = multierror.Append(merr, err)
202+
// Start by closing stdin so we stop writing to the ssh session.
203+
merr := pw2.Close()
204+
if err := sshSession.Signal(gossh.SIGHUP); err != nil {
205+
merr = errors.Join(merr, err)
77206
}
78-
if err := agentConn.Close(); err != nil {
79-
merr = multierror.Append(merr, err)
207+
select {
208+
case <-time.After(connCloseTimeout):
209+
merr = errors.Join(merr, xerrors.Errorf("timeout waiting for ssh session to close"))
210+
case err := <-waitErr:
211+
if err != nil {
212+
var exitErr *gossh.ExitError
213+
if xerrors.As(err, &exitErr) {
214+
// The exit status is 255 when the command is
215+
// interrupted by a signal. This is expected.
216+
if exitErr.ExitStatus() != 255 {
217+
merr = errors.Join(merr, xerrors.Errorf("ssh session exited with unexpected status: %d", int32(exitErr.ExitStatus())))
218+
}
219+
} else {
220+
merr = errors.Join(merr, err)
221+
}
222+
}
223+
}
224+
for _, c := range closers {
225+
if err := c(); err != nil {
226+
if !errors.Is(err, io.EOF) {
227+
merr = errors.Join(merr, err)
228+
}
229+
}
80230
}
81231
return merr
82232
}
83233
wrappedConn.close = closeFn
84234

85-
crw := &countReadWriteCloser{ctx: ctx, rwc: wrappedConn}
235+
crw := &countReadWriteCloser{rwc: wrappedConn}
236+
86237
return crw, nil
87238
}
88239

89240
// wrappedSSHConn wraps an ssh.Session to implement io.ReadWriteCloser.
90241
type wrappedSSHConn struct {
91-
ctx context.Context
92242
stdout io.Reader
93-
stdin io.Writer
243+
stdin io.WriteCloser
94244
closeOnce sync.Once
95245
closeErr error
96246
close func() error
97247
}
98248

99249
func (w *wrappedSSHConn) Close() error {
100250
w.closeOnce.Do(func() {
101-
_, _ = w.stdin.Write([]byte("exit\n"))
102251
w.closeErr = w.close()
103252
})
104253
return w.closeErr
105254
}
106255

107256
func (w *wrappedSSHConn) Read(p []byte) (n int, err error) {
108-
select {
109-
case <-w.ctx.Done():
110-
return 0, xerrors.Errorf("read: %w", w.ctx.Err())
111-
default:
112-
return w.stdout.Read(p)
113-
}
257+
return w.stdout.Read(p)
114258
}
115259

116260
func (w *wrappedSSHConn) Write(p []byte) (n int, err error) {
117-
select {
118-
case <-w.ctx.Done():
119-
return 0, xerrors.Errorf("write: %w", w.ctx.Err())
120-
default:
121-
return w.stdin.Write(p)
122-
}
261+
return w.stdin.Write(p)
123262
}

scaletest/workspacetraffic/countreadwriter.go

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
// countReadWriteCloser wraps an io.ReadWriteCloser and counts the number of bytes read and written.
1515
type countReadWriteCloser struct {
16-
ctx context.Context
1716
rwc io.ReadWriteCloser
1817
readMetrics ConnMetrics
1918
writeMetrics ConnMetrics

0 commit comments

Comments
 (0)