Skip to content

chore: add prometheus monitoring of workspace traffic generation #7583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 26, 2023
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ignore certain errors such as websocket closing or context cancellation
  • Loading branch information
johnstcn committed May 26, 2023
commit ec96a0016b9af15ff8a03092cf63b78fea7cfca7
67 changes: 45 additions & 22 deletions scaletest/workspacetraffic/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package workspacetraffic
import (
"context"
"encoding/json"
"errors"
"io"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand All @@ -19,6 +19,8 @@ import (
"github.com/coder/coder/cryptorand"
"github.com/coder/coder/scaletest/harness"
"github.com/coder/coder/scaletest/loadtestutil"

promtest "github.com/prometheus/client_golang/prometheus/testutil"
)

type Runner struct {
Expand Down Expand Up @@ -49,6 +51,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
r.client.Logger = logger
r.client.LogBodies = true

// Initialize our metrics eagerly. This is mainly so that we can test for the
// presence of a zero-valued metric as opposed to the absence of a metric.
lvs := []string{r.cfg.WorkspaceOwner, r.cfg.WorkspaceName, r.cfg.AgentName}
r.metrics.BytesReadTotal.WithLabelValues(lvs...).Add(0)
r.metrics.BytesWrittenTotal.WithLabelValues(lvs...).Add(0)
r.metrics.ReadErrorsTotal.WithLabelValues(lvs...).Add(0)
r.metrics.WriteErrorsTotal.WithLabelValues(lvs...).Add(0)
r.metrics.ReadLatencySeconds.WithLabelValues(lvs...).Observe(0)
r.metrics.WriteLatencySeconds.WithLabelValues(lvs...).Observe(0)

var (
agentID = r.cfg.AgentID
reconnect = uuid.New()
Expand Down Expand Up @@ -92,7 +104,7 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
}()

// Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd.
crw := countReadWriter{ReadWriter: conn, metrics: r.metrics, labels: []string{r.cfg.WorkspaceOwner, r.cfg.WorkspaceName, r.cfg.AgentName}}
crw := countReadWriter{ReadWriter: conn, metrics: r.metrics, labels: lvs}

// Create a ticker for sending data to the PTY.
tick := time.NewTicker(tickInterval)
Expand Down Expand Up @@ -133,11 +145,12 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
}

duration := time.Since(start)

logger.Info(ctx, "results",
logger.Info(ctx, "Test Results",
slog.F("duration", duration),
slog.F("sent", crw.BytesWritten()),
slog.F("rcvd", crw.BytesRead()),
slog.F("bytes_read_total", promtest.ToFloat64(r.metrics.BytesReadTotal)),
slog.F("bytes_written_total", promtest.ToFloat64(r.metrics.BytesWrittenTotal)),
slog.F("read_errors_total", promtest.ToFloat64(r.metrics.ReadErrorsTotal)),
slog.F("write_errors_total", promtest.ToFloat64(r.metrics.WriteErrorsTotal)),
)

return nil
Expand Down Expand Up @@ -186,42 +199,36 @@ func writeRandomData(dst io.Writer, size int64, tick <-chan time.Time) error {
// countReadWriter wraps an io.ReadWriter and counts the number of bytes read and written.
type countReadWriter struct {
io.ReadWriter
bytesRead atomic.Int64
bytesWritten atomic.Int64
metrics *Metrics
labels []string
metrics *Metrics
labels []string
}

func (w *countReadWriter) Read(p []byte) (int, error) {
start := time.Now()
n, err := w.ReadWriter.Read(p)
if reportableErr(err) {
w.metrics.ReadErrorsTotal.WithLabelValues(w.labels...).Inc()
}
w.metrics.ReadLatencySeconds.WithLabelValues(w.labels...).Observe(time.Since(start).Seconds())
if n > 0 {
w.bytesRead.Add(int64(n))
w.metrics.BytesRead.WithLabelValues(w.labels...).Add(float64(n))
w.metrics.BytesReadTotal.WithLabelValues(w.labels...).Add(float64(n))
}
return n, err
}

func (w *countReadWriter) Write(p []byte) (int, error) {
start := time.Now()
n, err := w.ReadWriter.Write(p)
if reportableErr(err) {
w.metrics.WriteErrorsTotal.WithLabelValues(w.labels...).Inc()
}
w.metrics.WriteLatencySeconds.WithLabelValues(w.labels...).Observe(time.Since(start).Seconds())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious if we need another metric (bytes?) to calculate the bandwidth. As far as I see, there are only WriteLatencySeconds and total BytesWritten. I'm curious if we need a metric to calculate "bytes per write in time"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that bytes per tick and ticks per second are inputs to this, I think we should probably be OK. But good food for thought!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You calc throughput by the (count at time X) - (count at time Y). We should not be doing that math in coderd imo

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, will address in a follow-up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prometheus itself will happily compute the time derivative of any metric, so a counter of bytes written is all we need.

if n > 0 {
w.bytesWritten.Add(int64(n))
w.metrics.BytesWritten.WithLabelValues(w.labels...).Add(float64(n))
w.metrics.BytesWrittenTotal.WithLabelValues(w.labels...).Add(float64(n))
}
return n, err
}

func (w *countReadWriter) BytesRead() int64 {
return w.bytesRead.Load()
}

func (w *countReadWriter) BytesWritten() int64 {
return w.bytesWritten.Load()
}

func mustRandStr(l int64) string {
if l < 1 {
l = 1
Expand All @@ -232,3 +239,19 @@ func mustRandStr(l int64) string {
}
return randStr
}

// some errors we want to report in metrics; others we want to ignore
// such as websocket.StatusNormalClosure or context.Canceled
func reportableErr(err error) bool {
if err == nil {
return false
}
if xerrors.Is(err, context.Canceled) {
return false
}
var wsErr websocket.CloseError
if errors.As(err, &wsErr) {
return wsErr.Code != websocket.StatusNormalClosure
}
return false
}