@@ -29,6 +29,9 @@ import (
29
29
30
30
"cdr.dev/slog"
31
31
"cdr.dev/slog/sloggers/slogtest"
32
+ "github.com/coder/quartz"
33
+ "github.com/coder/websocket"
34
+
32
35
"github.com/coder/coder/v2/agent"
33
36
"github.com/coder/coder/v2/agent/agentcontainers"
34
37
"github.com/coder/coder/v2/agent/agentcontainers/acmock"
@@ -47,6 +50,7 @@ import (
47
50
"github.com/coder/coder/v2/coderd/externalauth"
48
51
"github.com/coder/coder/v2/coderd/jwtutils"
49
52
"github.com/coder/coder/v2/coderd/rbac"
53
+ "github.com/coder/coder/v2/coderd/telemetry"
50
54
"github.com/coder/coder/v2/codersdk"
51
55
"github.com/coder/coder/v2/codersdk/agentsdk"
52
56
"github.com/coder/coder/v2/codersdk/workspacesdk"
@@ -56,8 +60,6 @@ import (
56
60
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
57
61
"github.com/coder/coder/v2/tailnet/tailnettest"
58
62
"github.com/coder/coder/v2/testutil"
59
- "github.com/coder/quartz"
60
- "github.com/coder/websocket"
61
63
)
62
64
63
65
func TestWorkspaceAgent (t * testing.T ) {
@@ -2133,35 +2135,54 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) {
2133
2135
2134
2136
ctx := testutil .Context (t , testutil .WaitLong )
2135
2137
logger := testutil .Logger (t )
2138
+
2139
+ fTelemetry := newFakeTelemetryReporter (ctx , t , 200 )
2136
2140
firstClient , _ , api := coderdtest .NewWithAPI (t , & coderdtest.Options {
2137
- Coordinator : tailnet .NewCoordinator (logger ),
2141
+ Coordinator : tailnet .NewCoordinator (logger ),
2142
+ TelemetryReporter : fTelemetry ,
2138
2143
})
2139
2144
firstUser := coderdtest .CreateFirstUser (t , firstClient )
2140
2145
member , memberUser := coderdtest .CreateAnotherUser (t , firstClient , firstUser .OrganizationID , rbac .RoleTemplateAdmin ())
2141
2146
2142
2147
// Create a workspace with an agent
2143
2148
firstWorkspace := buildWorkspaceWithAgent (t , member , firstUser .OrganizationID , memberUser .ID , api .Database , api .Pubsub )
2144
2149
2150
+ // clean out telemetry snapshots from the setup; we don't care about them for this test
2151
+ for len (fTelemetry .snapshots ) != 0 {
2152
+ <- fTelemetry .snapshots
2153
+ }
2154
+
2145
2155
u , err := member .URL .Parse ("/api/v2/tailnet" )
2146
2156
require .NoError (t , err )
2147
2157
q := u .Query ()
2148
2158
q .Set ("version" , "2.0" )
2149
2159
u .RawQuery = q .Encode ()
2150
2160
2161
+ predialTime := time .Now ()
2162
+
2151
2163
//nolint:bodyclose // websocket package closes this for you
2152
2164
wsConn , resp , err := websocket .Dial (ctx , u .String (), & websocket.DialOptions {
2153
2165
HTTPHeader : http.Header {
2154
2166
"Coder-Session-Token" : []string {member .SessionToken ()},
2155
2167
},
2156
2168
})
2157
2169
if err != nil {
2158
- if resp .StatusCode != http .StatusSwitchingProtocols {
2170
+ if resp != nil && resp .StatusCode != http .StatusSwitchingProtocols {
2159
2171
err = codersdk .ReadBodyAsError (resp )
2160
2172
}
2161
2173
require .NoError (t , err )
2162
2174
}
2163
2175
defer wsConn .Close (websocket .StatusNormalClosure , "done" )
2164
2176
2177
+ // Check telemetry
2178
+ snapshot := testutil .RequireRecvCtx (ctx , t , fTelemetry .snapshots )
2179
+ require .Len (t , snapshot .UserTailnetConnections , 1 )
2180
+ telemetryConnection := snapshot .UserTailnetConnections [0 ]
2181
+ require .Equal (t , memberUser .ID .String (), telemetryConnection .UserID )
2182
+ require .GreaterOrEqual (t , telemetryConnection .ConnectedAt , predialTime )
2183
+ require .LessOrEqual (t , telemetryConnection .ConnectedAt , time .Now ())
2184
+ require .NotEmpty (t , telemetryConnection .PeerID )
2185
+
2165
2186
rpcClient , err := tailnet .NewDRPCClient (
2166
2187
websocket .NetConn (ctx , wsConn , websocket .MessageBinary ),
2167
2188
logger ,
@@ -2209,6 +2230,23 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) {
2209
2230
NumAgents : 0 ,
2210
2231
},
2211
2232
})
2233
+ err = stream .Close ()
2234
+ require .NoError (t , err )
2235
+
2236
+ beforeDisconnectTime := time .Now ()
2237
+ err = wsConn .Close (websocket .StatusNormalClosure , "done" )
2238
+ require .NoError (t , err )
2239
+
2240
+ snapshot = testutil .RequireRecvCtx (ctx , t , fTelemetry .snapshots )
2241
+ require .Len (t , snapshot .UserTailnetConnections , 1 )
2242
+ telemetryDisconnection := snapshot .UserTailnetConnections [0 ]
2243
+ require .Equal (t , memberUser .ID .String (), telemetryDisconnection .UserID )
2244
+ require .Equal (t , telemetryConnection .ConnectedAt , telemetryDisconnection .ConnectedAt )
2245
+ require .Equal (t , telemetryConnection .UserID , telemetryDisconnection .UserID )
2246
+ require .Equal (t , telemetryConnection .PeerID , telemetryDisconnection .PeerID )
2247
+ require .NotNil (t , telemetryDisconnection .DisconnectedAt )
2248
+ require .GreaterOrEqual (t , * telemetryDisconnection .DisconnectedAt , beforeDisconnectTime )
2249
+ require .LessOrEqual (t , * telemetryDisconnection .DisconnectedAt , time .Now ())
2212
2250
}
2213
2251
2214
2252
func buildWorkspaceWithAgent (
@@ -2334,3 +2372,51 @@ func waitForUpdates(
2334
2372
t .Fatal ("Timeout waiting for desired state" , currentState )
2335
2373
}
2336
2374
}
2375
+
2376
+ // fakeTelemetryReporter is a fake implementation of telemetry.Reporter
2377
+ // that sends snapshots on a buffered channel, useful for testing.
2378
+ type fakeTelemetryReporter struct {
2379
+ enabled bool
2380
+ snapshots chan * telemetry.Snapshot
2381
+ t testing.TB
2382
+ ctx context.Context
2383
+ }
2384
+
2385
+ // newFakeTelemetryReporter creates a new fakeTelemetryReporter with a buffered channel.
2386
+ // The buffer size determines how many snapshots can be reported before blocking.
2387
+ func newFakeTelemetryReporter (ctx context.Context , t testing.TB , bufferSize int ) * fakeTelemetryReporter {
2388
+ return & fakeTelemetryReporter {
2389
+ enabled : true ,
2390
+ snapshots : make (chan * telemetry.Snapshot , bufferSize ),
2391
+ ctx : ctx ,
2392
+ t : t ,
2393
+ }
2394
+ }
2395
+
2396
+ // Report implements the telemetry.Reporter interface by sending the snapshot
2397
+ // to the snapshots channel.
2398
+ func (f * fakeTelemetryReporter ) Report (snapshot * telemetry.Snapshot ) {
2399
+ if ! f .enabled {
2400
+ return
2401
+ }
2402
+
2403
+ select {
2404
+ case f .snapshots <- snapshot :
2405
+ // Successfully sent
2406
+ case <- f .ctx .Done ():
2407
+ f .t .Error ("context closed while writing snapshot" )
2408
+ }
2409
+ }
2410
+
2411
+ // Enabled implements the telemetry.Reporter interface.
2412
+ func (f * fakeTelemetryReporter ) Enabled () bool {
2413
+ return f .enabled
2414
+ }
2415
+
2416
+ // SetEnabled allows controlling whether the reporter is enabled.
2417
+ func (f * fakeTelemetryReporter ) SetEnabled (enabled bool ) {
2418
+ f .enabled = enabled
2419
+ }
2420
+
2421
+ // Close implements the telemetry.Reporter interface.
2422
+ func (f * fakeTelemetryReporter ) Close () {}
0 commit comments