@@ -3,8 +3,8 @@ package workspacetraffic
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
+ "errors"
6
7
"io"
7
- "sync/atomic"
8
8
"time"
9
9
10
10
"github.com/google/uuid"
@@ -19,6 +19,8 @@ import (
19
19
"github.com/coder/coder/cryptorand"
20
20
"github.com/coder/coder/scaletest/harness"
21
21
"github.com/coder/coder/scaletest/loadtestutil"
22
+
23
+ promtest "github.com/prometheus/client_golang/prometheus/testutil"
22
24
)
23
25
24
26
type Runner struct {
@@ -49,6 +51,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
49
51
r .client .Logger = logger
50
52
r .client .LogBodies = true
51
53
54
+ // Initialize our metrics eagerly. This is mainly so that we can test for the
55
+ // presence of a zero-valued metric as opposed to the absence of a metric.
56
+ lvs := []string {r .cfg .WorkspaceOwner , r .cfg .WorkspaceName , r .cfg .AgentName }
57
+ r .metrics .BytesReadTotal .WithLabelValues (lvs ... ).Add (0 )
58
+ r .metrics .BytesWrittenTotal .WithLabelValues (lvs ... ).Add (0 )
59
+ r .metrics .ReadErrorsTotal .WithLabelValues (lvs ... ).Add (0 )
60
+ r .metrics .WriteErrorsTotal .WithLabelValues (lvs ... ).Add (0 )
61
+ r .metrics .ReadLatencySeconds .WithLabelValues (lvs ... ).Observe (0 )
62
+ r .metrics .WriteLatencySeconds .WithLabelValues (lvs ... ).Observe (0 )
63
+
52
64
var (
53
65
agentID = r .cfg .AgentID
54
66
reconnect = uuid .New ()
@@ -92,7 +104,7 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
92
104
}()
93
105
94
106
// Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd.
95
- crw := countReadWriter {ReadWriter : conn , metrics : r .metrics , labels : [] string { r . cfg . WorkspaceOwner , r . cfg . WorkspaceName , r . cfg . AgentName } }
107
+ crw := countReadWriter {ReadWriter : conn , metrics : r .metrics , labels : lvs }
96
108
97
109
// Create a ticker for sending data to the PTY.
98
110
tick := time .NewTicker (tickInterval )
@@ -133,11 +145,12 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
133
145
}
134
146
135
147
duration := time .Since (start )
136
-
137
- logger .Info (ctx , "results" ,
148
+ logger .Info (ctx , "Test Results" ,
138
149
slog .F ("duration" , duration ),
139
- slog .F ("sent" , crw .BytesWritten ()),
140
- slog .F ("rcvd" , crw .BytesRead ()),
150
+ slog .F ("bytes_read_total" , promtest .ToFloat64 (r .metrics .BytesReadTotal )),
151
+ slog .F ("bytes_written_total" , promtest .ToFloat64 (r .metrics .BytesWrittenTotal )),
152
+ slog .F ("read_errors_total" , promtest .ToFloat64 (r .metrics .ReadErrorsTotal )),
153
+ slog .F ("write_errors_total" , promtest .ToFloat64 (r .metrics .WriteErrorsTotal )),
141
154
)
142
155
143
156
return nil
@@ -186,42 +199,36 @@ func writeRandomData(dst io.Writer, size int64, tick <-chan time.Time) error {
186
199
// countReadWriter wraps an io.ReadWriter and counts the number of bytes read and written.
187
200
type countReadWriter struct {
188
201
io.ReadWriter
189
- bytesRead atomic.Int64
190
- bytesWritten atomic.Int64
191
- metrics * Metrics
192
- labels []string
202
+ metrics * Metrics
203
+ labels []string
193
204
}
194
205
195
206
func (w * countReadWriter ) Read (p []byte ) (int , error ) {
196
207
start := time .Now ()
197
208
n , err := w .ReadWriter .Read (p )
209
+ if reportableErr (err ) {
210
+ w .metrics .ReadErrorsTotal .WithLabelValues (w .labels ... ).Inc ()
211
+ }
198
212
w .metrics .ReadLatencySeconds .WithLabelValues (w .labels ... ).Observe (time .Since (start ).Seconds ())
199
213
if n > 0 {
200
- w .bytesRead .Add (int64 (n ))
201
- w .metrics .BytesRead .WithLabelValues (w .labels ... ).Add (float64 (n ))
214
+ w .metrics .BytesReadTotal .WithLabelValues (w .labels ... ).Add (float64 (n ))
202
215
}
203
216
return n , err
204
217
}
205
218
206
219
func (w * countReadWriter ) Write (p []byte ) (int , error ) {
207
220
start := time .Now ()
208
221
n , err := w .ReadWriter .Write (p )
222
+ if reportableErr (err ) {
223
+ w .metrics .WriteErrorsTotal .WithLabelValues (w .labels ... ).Inc ()
224
+ }
209
225
w .metrics .WriteLatencySeconds .WithLabelValues (w .labels ... ).Observe (time .Since (start ).Seconds ())
210
226
if n > 0 {
211
- w .bytesWritten .Add (int64 (n ))
212
- w .metrics .BytesWritten .WithLabelValues (w .labels ... ).Add (float64 (n ))
227
+ w .metrics .BytesWrittenTotal .WithLabelValues (w .labels ... ).Add (float64 (n ))
213
228
}
214
229
return n , err
215
230
}
216
231
217
- func (w * countReadWriter ) BytesRead () int64 {
218
- return w .bytesRead .Load ()
219
- }
220
-
221
- func (w * countReadWriter ) BytesWritten () int64 {
222
- return w .bytesWritten .Load ()
223
- }
224
-
225
232
func mustRandStr (l int64 ) string {
226
233
if l < 1 {
227
234
l = 1
@@ -232,3 +239,19 @@ func mustRandStr(l int64) string {
232
239
}
233
240
return randStr
234
241
}
242
+
243
+ // some errors we want to report in metrics; others we want to ignore
244
+ // such as websocket.StatusNormalClosure or context.Canceled
245
+ func reportableErr (err error ) bool {
246
+ if err == nil {
247
+ return false
248
+ }
249
+ if xerrors .Is (err , context .Canceled ) {
250
+ return false
251
+ }
252
+ var wsErr websocket.CloseError
253
+ if errors .As (err , & wsErr ) {
254
+ return wsErr .Code != websocket .StatusNormalClosure
255
+ }
256
+ return false
257
+ }
0 commit comments