From 15c1560e3ee113dc1d1dc9d429f07bff1e88e280 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Mon, 24 Mar 2025 10:18:52 +0000 Subject: [PATCH 1/2] feat: add telemetry to user-scoped tailnet API call --- coderd/coderdtest/coderdtest.go | 10 +++- coderd/workspaceagents.go | 31 ++++++++++- coderd/workspaceagents_test.go | 94 +++++++++++++++++++++++++++++++-- 3 files changed, 128 insertions(+), 7 deletions(-) diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index aa096707b8fb7..f2297d07ec2c2 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -52,6 +52,8 @@ import ( "cdr.dev/slog" "cdr.dev/slog/sloggers/sloghuman" "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/quartz" + "github.com/coder/coder/v2/coderd" "github.com/coder/coder/v2/coderd/audit" "github.com/coder/coder/v2/coderd/autobuild" @@ -91,7 +93,6 @@ import ( sdkproto "github.com/coder/coder/v2/provisionersdk/proto" "github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/testutil" - "github.com/coder/quartz" ) type Options struct { @@ -170,6 +171,7 @@ type Options struct { APIKeyEncryptionCache cryptokeys.EncryptionKeycache OIDCConvertKeyCache cryptokeys.SigningKeycache Clock quartz.Clock + TelemetryReporter telemetry.Reporter } // New constructs a codersdk client connected to an in-memory API instance. @@ -358,6 +360,10 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can hangDetector.Start() t.Cleanup(hangDetector.Close) + if options.TelemetryReporter == nil { + options.TelemetryReporter = telemetry.NewNoop() + } + // Did last_used_at not update? Scratching your noggin? Here's why. // Workspace usage tracking must be triggered manually in tests. // The vast majority of existing tests do not depend on last_used_at @@ -517,7 +523,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can LoginRateLimit: options.LoginRateLimit, FilesRateLimit: options.FilesRateLimit, Authorizer: options.Authorizer, - Telemetry: telemetry.NewNoop(), + Telemetry: options.TelemetryReporter, TemplateScheduleStore: &templateScheduleStore, AccessControlStore: accessControlStore, TLSCertificates: options.TLSCertificates, diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index cf3c5ab1e8b03..a06cf96ea8616 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -23,6 +23,8 @@ import ( "tailscale.com/tailcfg" "cdr.dev/slog" + "github.com/coder/websocket" + "github.com/coder/coder/v2/coderd/agentapi" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/db2sdk" @@ -34,6 +36,7 @@ import ( "github.com/coder/coder/v2/coderd/jwtutils" "github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/coderd/rbac/policy" + "github.com/coder/coder/v2/coderd/telemetry" maputil "github.com/coder/coder/v2/coderd/util/maps" "github.com/coder/coder/v2/coderd/wspubsub" "github.com/coder/coder/v2/codersdk" @@ -42,7 +45,6 @@ import ( "github.com/coder/coder/v2/codersdk/wsjson" "github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet/proto" - "github.com/coder/websocket" ) // @Summary Get workspace agent by ID @@ -1635,6 +1637,33 @@ func (api *API) tailnetRPCConn(rw http.ResponseWriter, r *http.Request) { defer wsNetConn.Close() defer conn.Close(websocket.StatusNormalClosure, "") + // Get user ID for telemetry + apiKey := httpmw.APIKey(r) + userID := apiKey.UserID.String() + + // Store connection telemetry event + now := time.Now() + connectionTelemetryEvent := telemetry.UserTailnetConnection{ + ConnectedAt: now, + DisconnectedAt: nil, + UserID: userID, + PeerID: peerID.String(), + DeviceID: nil, + DeviceOS: nil, + CoderDesktopVersion: nil, + } + api.Telemetry.Report(&telemetry.Snapshot{ + UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent}, + }) + defer func() { + // Update telemetry event with disconnection time + disconnectTime := time.Now() + connectionTelemetryEvent.DisconnectedAt = &disconnectTime + api.Telemetry.Report(&telemetry.Snapshot{ + UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent}, + }) + }() + go httpapi.Heartbeat(ctx, conn) err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{ Name: "client", diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 6764deede15b7..bad5c8130ee1b 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -29,6 +29,9 @@ import ( "cdr.dev/slog" "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/quartz" + "github.com/coder/websocket" + "github.com/coder/coder/v2/agent" "github.com/coder/coder/v2/agent/agentcontainers" "github.com/coder/coder/v2/agent/agentcontainers/acmock" @@ -47,6 +50,7 @@ import ( "github.com/coder/coder/v2/coderd/externalauth" "github.com/coder/coder/v2/coderd/jwtutils" "github.com/coder/coder/v2/coderd/rbac" + "github.com/coder/coder/v2/coderd/telemetry" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" @@ -56,8 +60,6 @@ import ( tailnetproto "github.com/coder/coder/v2/tailnet/proto" "github.com/coder/coder/v2/tailnet/tailnettest" "github.com/coder/coder/v2/testutil" - "github.com/coder/quartz" - "github.com/coder/websocket" ) func TestWorkspaceAgent(t *testing.T) { @@ -2133,8 +2135,11 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) { ctx := testutil.Context(t, testutil.WaitLong) logger := testutil.Logger(t) + + fTelemetry := newFakeTelemetryReporter(ctx, t, 200) firstClient, _, api := coderdtest.NewWithAPI(t, &coderdtest.Options{ - Coordinator: tailnet.NewCoordinator(logger), + Coordinator: tailnet.NewCoordinator(logger), + TelemetryReporter: fTelemetry, }) firstUser := coderdtest.CreateFirstUser(t, firstClient) member, memberUser := coderdtest.CreateAnotherUser(t, firstClient, firstUser.OrganizationID, rbac.RoleTemplateAdmin()) @@ -2142,12 +2147,19 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) { // Create a workspace with an agent firstWorkspace := buildWorkspaceWithAgent(t, member, firstUser.OrganizationID, memberUser.ID, api.Database, api.Pubsub) + // clean out telemetry snapshots from the setup; we don't care about them for this test + for len(fTelemetry.snapshots) != 0 { + <-fTelemetry.snapshots + } + u, err := member.URL.Parse("/api/v2/tailnet") require.NoError(t, err) q := u.Query() q.Set("version", "2.0") u.RawQuery = q.Encode() + predialTime := time.Now() + //nolint:bodyclose // websocket package closes this for you wsConn, resp, err := websocket.Dial(ctx, u.String(), &websocket.DialOptions{ HTTPHeader: http.Header{ @@ -2155,13 +2167,22 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) { }, }) if err != nil { - if resp.StatusCode != http.StatusSwitchingProtocols { + if resp != nil && resp.StatusCode != http.StatusSwitchingProtocols { err = codersdk.ReadBodyAsError(resp) } require.NoError(t, err) } defer wsConn.Close(websocket.StatusNormalClosure, "done") + // Check telemetry + snapshot := testutil.RequireRecvCtx(ctx, t, fTelemetry.snapshots) + require.Len(t, snapshot.UserTailnetConnections, 1) + telemetryConnection := snapshot.UserTailnetConnections[0] + require.Equal(t, memberUser.ID.String(), telemetryConnection.UserID) + require.GreaterOrEqual(t, telemetryConnection.ConnectedAt, predialTime) + require.LessOrEqual(t, telemetryConnection.ConnectedAt, time.Now()) + require.NotEmpty(t, telemetryConnection.PeerID) + rpcClient, err := tailnet.NewDRPCClient( websocket.NetConn(ctx, wsConn, websocket.MessageBinary), logger, @@ -2209,6 +2230,23 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) { NumAgents: 0, }, }) + err = stream.Close() + require.NoError(t, err) + + beforeDisconnectTime := time.Now() + err = wsConn.Close(websocket.StatusNormalClosure, "done") + require.NoError(t, err) + + snapshot = testutil.RequireRecvCtx(ctx, t, fTelemetry.snapshots) + require.Len(t, snapshot.UserTailnetConnections, 1) + telemetryDisconnection := snapshot.UserTailnetConnections[0] + require.Equal(t, memberUser.ID.String(), telemetryDisconnection.UserID) + require.Equal(t, telemetryConnection.ConnectedAt, telemetryDisconnection.ConnectedAt) + require.Equal(t, telemetryConnection.UserID, telemetryDisconnection.UserID) + require.Equal(t, telemetryConnection.PeerID, telemetryDisconnection.PeerID) + require.NotNil(t, telemetryDisconnection.DisconnectedAt) + require.GreaterOrEqual(t, *telemetryDisconnection.DisconnectedAt, beforeDisconnectTime) + require.LessOrEqual(t, *telemetryDisconnection.DisconnectedAt, time.Now()) } func buildWorkspaceWithAgent( @@ -2334,3 +2372,51 @@ func waitForUpdates( t.Fatal("Timeout waiting for desired state", currentState) } } + +// fakeTelemetryReporter is a fake implementation of telemetry.Reporter +// that sends snapshots on a buffered channel, useful for testing. +type fakeTelemetryReporter struct { + enabled bool + snapshots chan *telemetry.Snapshot + t testing.TB + ctx context.Context +} + +// newFakeTelemetryReporter creates a new fakeTelemetryReporter with a buffered channel. +// The buffer size determines how many snapshots can be reported before blocking. +func newFakeTelemetryReporter(ctx context.Context, t testing.TB, bufferSize int) *fakeTelemetryReporter { + return &fakeTelemetryReporter{ + enabled: true, + snapshots: make(chan *telemetry.Snapshot, bufferSize), + ctx: ctx, + t: t, + } +} + +// Report implements the telemetry.Reporter interface by sending the snapshot +// to the snapshots channel. +func (f *fakeTelemetryReporter) Report(snapshot *telemetry.Snapshot) { + if !f.enabled { + return + } + + select { + case f.snapshots <- snapshot: + // Successfully sent + case <-f.ctx.Done(): + f.t.Error("context closed while writing snapshot") + } +} + +// Enabled implements the telemetry.Reporter interface. +func (f *fakeTelemetryReporter) Enabled() bool { + return f.enabled +} + +// SetEnabled allows controlling whether the reporter is enabled. +func (f *fakeTelemetryReporter) SetEnabled(enabled bool) { + f.enabled = enabled +} + +// Close implements the telemetry.Reporter interface. +func (f *fakeTelemetryReporter) Close() {} From ade1a654f44fb32c0d0a819ba9be5dd23c04d470 Mon Sep 17 00:00:00 2001 From: Spike Curtis Date: Mon, 24 Mar 2025 11:09:46 +0000 Subject: [PATCH 2/2] fix lint, simplify fakeTelemetry --- coderd/workspaceagents_test.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index bad5c8130ee1b..899708ce1fb06 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -2137,6 +2137,7 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) { logger := testutil.Logger(t) fTelemetry := newFakeTelemetryReporter(ctx, t, 200) + fTelemetry.enabled = false firstClient, _, api := coderdtest.NewWithAPI(t, &coderdtest.Options{ Coordinator: tailnet.NewCoordinator(logger), TelemetryReporter: fTelemetry, @@ -2147,10 +2148,8 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) { // Create a workspace with an agent firstWorkspace := buildWorkspaceWithAgent(t, member, firstUser.OrganizationID, memberUser.ID, api.Database, api.Pubsub) - // clean out telemetry snapshots from the setup; we don't care about them for this test - for len(fTelemetry.snapshots) != 0 { - <-fTelemetry.snapshots - } + // enable telemetry now that workspace is built; we don't care about snapshots before this. + fTelemetry.enabled = true u, err := member.URL.Parse("/api/v2/tailnet") require.NoError(t, err) @@ -2413,10 +2412,5 @@ func (f *fakeTelemetryReporter) Enabled() bool { return f.enabled } -// SetEnabled allows controlling whether the reporter is enabled. -func (f *fakeTelemetryReporter) SetEnabled(enabled bool) { - f.enabled = enabled -} - // Close implements the telemetry.Reporter interface. -func (f *fakeTelemetryReporter) Close() {} +func (*fakeTelemetryReporter) Close() {}