Skip to content

feat(cli): add trafficgen command for load testing #7307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 5, 2023
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
220edbf
feat(cli): add trafficgen command for load testing
johnstcn Apr 18, 2023
737b475
skip test for now
johnstcn Apr 27, 2023
9b26587
make fmt
johnstcn Apr 27, 2023
c56d84e
lint
johnstcn Apr 27, 2023
e548892
swap order of waiting for read and write
johnstcn Apr 27, 2023
31ef743
close connection, add output formatting
johnstcn May 2, 2023
fafca95
do what the comment says
johnstcn May 2, 2023
65c6d88
move back under scaletest cmd
johnstcn May 2, 2023
0bfa9f6
integrate with scaletest harness
johnstcn May 2, 2023
da935a2
drain connection async
johnstcn May 3, 2023
5daa526
fix cancellation
johnstcn May 3, 2023
4f165be
handle deadline exceeded in drain
johnstcn May 3, 2023
31fa8be
address PR comments
johnstcn May 3, 2023
0817204
fixup! address PR comments
johnstcn May 3, 2023
a6d7870
ACTUALLY limit traffic instead of just blasting the firehose
johnstcn May 3, 2023
935dcbd
log config
johnstcn May 3, 2023
e2efeff
lint
johnstcn May 3, 2023
b105e67
chore(cli): scaletest: move logic for flushing traces into tracing pr…
johnstcn May 4, 2023
731b4db
remove unnecessary context-based I/O
johnstcn May 4, 2023
9dc28a2
refactor bytes per second to bytes per tick and tick interval
johnstcn May 4, 2023
7b98b35
rename trafficgen -> workspace-traffic
johnstcn May 4, 2023
b9c845f
make gen
johnstcn May 4, 2023
2574a00
use strategy.timeout instead of duration
johnstcn May 4, 2023
516ffa1
rm ctx from countReadWriter
johnstcn May 4, 2023
655d95a
fixup
johnstcn May 5, 2023
ca8b212
Merge remote-tracking branch 'origin/main' into cj/scaletest-trafficgen
johnstcn May 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove unnecessary context-based I/O
  • Loading branch information
johnstcn committed May 4, 2023
commit 731b4db4bc7bbbdc91acecb83c6867243f85d847
107 changes: 24 additions & 83 deletions scaletest/trafficgen/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/google/uuid"
"golang.org/x/xerrors"
"nhooyr.io/websocket"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
Expand Down Expand Up @@ -101,22 +102,22 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {

go func() {
<-deadlineCtx.Done()
logger.Debug(ctx, "context deadline reached", slog.F("duration", time.Since(start)))
logger.Debug(ctx, "closing agent connection")
conn.Close()
}()

// Read forever in the background.
go func() {
logger.Debug(ctx, "reading from agent", slog.F("agent_id", agentID))
rch <- drainContext(deadlineCtx, &crw)
rch <- drain(&crw)
logger.Debug(ctx, "done reading from agent", slog.F("agent_id", agentID))
conn.Close()
close(rch)
}()

// Write random data to the PTY every tick.
go func() {
logger.Debug(ctx, "writing to agent", slog.F("agent_id", agentID))
wch <- writeRandomData(deadlineCtx, &crw, bytesPerTick, tick.C)
wch <- writeRandomData(&crw, bytesPerTick, tick.C)
logger.Debug(ctx, "done writing to agent", slog.F("agent_id", agentID))
close(wch)
}()
Expand Down Expand Up @@ -145,93 +146,33 @@ func (*Runner) Cleanup(context.Context, string) error {
return nil
}

// drainContext drains from src until it returns io.EOF or ctx times out.
func drainContext(ctx context.Context, src io.Reader) error {
errCh := make(chan error, 1)
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
_, err := io.CopyN(io.Discard, src, 1)
if ctx.Err() != nil {
return // context canceled while we were copying.
}
if err != nil {
errCh <- err
close(errCh)
return
}
}
}
}()
for {
select {
case <-ctx.Done():
close(done)
return nil
case err := <-errCh:
if err != nil {
if xerrors.Is(err, io.EOF) {
return nil
}
// It's OK if the context is canceled.
if xerrors.Is(err, context.DeadlineExceeded) {
return nil
}
return err
}
}
}
}

func writeRandomData(ctx context.Context, dst io.Writer, size int64, tick <-chan time.Time) error {
for {
select {
case <-ctx.Done():
// drain drains from src until it returns io.EOF or ctx times out.
func drain(src io.Reader) error {
if _, err := io.Copy(io.Discard, src); err != nil {
if xerrors.Is(err, context.DeadlineExceeded) || xerrors.Is(err, websocket.CloseError{}) {
return nil
case <-tick:
payload := "#" + mustRandStr(size-1)
data, err := json.Marshal(codersdk.ReconnectingPTYRequest{
Data: payload,
})
if err != nil {
return err
}
if _, err := copyContext(ctx, dst, data); err != nil {
return err
}
}
return err
}
return nil
}

// copyContext copies from src to dst until ctx is canceled.
func copyContext(ctx context.Context, dst io.Writer, src []byte) (int, error) {
var count int
for {
select {
case <-ctx.Done():
return count, nil
default:
for idx := range src {
n, err := dst.Write(src[idx : idx+1])
if err != nil {
if xerrors.Is(err, io.EOF) {
return count, nil
}
if xerrors.Is(err, context.DeadlineExceeded) {
// It's OK if we reach the deadline before writing the full payload.
return count, nil
}
return count, err
}
count += n
func writeRandomData(dst io.Writer, size int64, tick <-chan time.Time) error {
var (
enc = json.NewEncoder(dst)
ptyReq = codersdk.ReconnectingPTYRequest{}
)
for range tick {
payload := "#" + mustRandStr(size-1)
ptyReq.Data = payload
if err := enc.Encode(ptyReq); err != nil {
if xerrors.Is(err, context.DeadlineExceeded) || xerrors.Is(err, websocket.CloseError{}) {
return nil
}
return count, nil
return err
}
}
return nil
}

// countReadWriter wraps an io.ReadWriter and counts the number of bytes read and written.
Expand Down