Skip to content

feat: add telemetry to user-scoped tailnet API call #17065

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions coderd/coderdtest/coderdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import (
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/quartz"

"github.com/coder/coder/v2/coderd"
"github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/autobuild"
Expand Down Expand Up @@ -91,7 +93,6 @@ import (
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
)

type Options struct {
Expand Down Expand Up @@ -170,6 +171,7 @@ type Options struct {
APIKeyEncryptionCache cryptokeys.EncryptionKeycache
OIDCConvertKeyCache cryptokeys.SigningKeycache
Clock quartz.Clock
TelemetryReporter telemetry.Reporter
}

// New constructs a codersdk client connected to an in-memory API instance.
Expand Down Expand Up @@ -358,6 +360,10 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
hangDetector.Start()
t.Cleanup(hangDetector.Close)

if options.TelemetryReporter == nil {
options.TelemetryReporter = telemetry.NewNoop()
}

// Did last_used_at not update? Scratching your noggin? Here's why.
// Workspace usage tracking must be triggered manually in tests.
// The vast majority of existing tests do not depend on last_used_at
Expand Down Expand Up @@ -517,7 +523,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
LoginRateLimit: options.LoginRateLimit,
FilesRateLimit: options.FilesRateLimit,
Authorizer: options.Authorizer,
Telemetry: telemetry.NewNoop(),
Telemetry: options.TelemetryReporter,
TemplateScheduleStore: &templateScheduleStore,
AccessControlStore: accessControlStore,
TLSCertificates: options.TLSCertificates,
Expand Down
31 changes: 30 additions & 1 deletion coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"tailscale.com/tailcfg"

"cdr.dev/slog"
"github.com/coder/websocket"

"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/db2sdk"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/coder/coder/v2/coderd/jwtutils"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/telemetry"
maputil "github.com/coder/coder/v2/coderd/util/maps"
"github.com/coder/coder/v2/coderd/wspubsub"
"github.com/coder/coder/v2/codersdk"
Expand All @@ -42,7 +45,6 @@ import (
"github.com/coder/coder/v2/codersdk/wsjson"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/websocket"
)

// @Summary Get workspace agent by ID
Expand Down Expand Up @@ -1635,6 +1637,33 @@ func (api *API) tailnetRPCConn(rw http.ResponseWriter, r *http.Request) {
defer wsNetConn.Close()
defer conn.Close(websocket.StatusNormalClosure, "")

// Get user ID for telemetry
apiKey := httpmw.APIKey(r)
userID := apiKey.UserID.String()

// Store connection telemetry event
now := time.Now()
connectionTelemetryEvent := telemetry.UserTailnetConnection{
ConnectedAt: now,
DisconnectedAt: nil,
UserID: userID,
PeerID: peerID.String(),
DeviceID: nil,
DeviceOS: nil,
CoderDesktopVersion: nil,
}
api.Telemetry.Report(&telemetry.Snapshot{
UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent},
})
defer func() {
// Update telemetry event with disconnection time
disconnectTime := time.Now()
connectionTelemetryEvent.DisconnectedAt = &disconnectTime
api.Telemetry.Report(&telemetry.Snapshot{
UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent},
})
}()

go httpapi.Heartbeat(ctx, conn)
err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{
Name: "client",
Expand Down
88 changes: 84 additions & 4 deletions coderd/workspaceagents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/quartz"
"github.com/coder/websocket"

"github.com/coder/coder/v2/agent"
"github.com/coder/coder/v2/agent/agentcontainers"
"github.com/coder/coder/v2/agent/agentcontainers/acmock"
Expand All @@ -47,6 +50,7 @@ import (
"github.com/coder/coder/v2/coderd/externalauth"
"github.com/coder/coder/v2/coderd/jwtutils"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/telemetry"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
Expand All @@ -56,8 +60,6 @@ import (
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/coder/v2/tailnet/tailnettest"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
"github.com/coder/websocket"
)

func TestWorkspaceAgent(t *testing.T) {
Expand Down Expand Up @@ -2133,35 +2135,53 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) {

ctx := testutil.Context(t, testutil.WaitLong)
logger := testutil.Logger(t)

fTelemetry := newFakeTelemetryReporter(ctx, t, 200)
fTelemetry.enabled = false
firstClient, _, api := coderdtest.NewWithAPI(t, &coderdtest.Options{
Coordinator: tailnet.NewCoordinator(logger),
Coordinator: tailnet.NewCoordinator(logger),
TelemetryReporter: fTelemetry,
})
firstUser := coderdtest.CreateFirstUser(t, firstClient)
member, memberUser := coderdtest.CreateAnotherUser(t, firstClient, firstUser.OrganizationID, rbac.RoleTemplateAdmin())

// Create a workspace with an agent
firstWorkspace := buildWorkspaceWithAgent(t, member, firstUser.OrganizationID, memberUser.ID, api.Database, api.Pubsub)

// enable telemetry now that workspace is built; we don't care about snapshots before this.
fTelemetry.enabled = true

u, err := member.URL.Parse("/api/v2/tailnet")
require.NoError(t, err)
q := u.Query()
q.Set("version", "2.0")
u.RawQuery = q.Encode()

predialTime := time.Now()

//nolint:bodyclose // websocket package closes this for you
wsConn, resp, err := websocket.Dial(ctx, u.String(), &websocket.DialOptions{
HTTPHeader: http.Header{
"Coder-Session-Token": []string{member.SessionToken()},
},
})
if err != nil {
if resp.StatusCode != http.StatusSwitchingProtocols {
if resp != nil && resp.StatusCode != http.StatusSwitchingProtocols {
err = codersdk.ReadBodyAsError(resp)
}
require.NoError(t, err)
}
defer wsConn.Close(websocket.StatusNormalClosure, "done")

// Check telemetry
snapshot := testutil.RequireRecvCtx(ctx, t, fTelemetry.snapshots)
require.Len(t, snapshot.UserTailnetConnections, 1)
telemetryConnection := snapshot.UserTailnetConnections[0]
require.Equal(t, memberUser.ID.String(), telemetryConnection.UserID)
require.GreaterOrEqual(t, telemetryConnection.ConnectedAt, predialTime)
require.LessOrEqual(t, telemetryConnection.ConnectedAt, time.Now())
require.NotEmpty(t, telemetryConnection.PeerID)

rpcClient, err := tailnet.NewDRPCClient(
websocket.NetConn(ctx, wsConn, websocket.MessageBinary),
logger,
Expand Down Expand Up @@ -2209,6 +2229,23 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) {
NumAgents: 0,
},
})
err = stream.Close()
require.NoError(t, err)

beforeDisconnectTime := time.Now()
err = wsConn.Close(websocket.StatusNormalClosure, "done")
require.NoError(t, err)

snapshot = testutil.RequireRecvCtx(ctx, t, fTelemetry.snapshots)
require.Len(t, snapshot.UserTailnetConnections, 1)
telemetryDisconnection := snapshot.UserTailnetConnections[0]
require.Equal(t, memberUser.ID.String(), telemetryDisconnection.UserID)
require.Equal(t, telemetryConnection.ConnectedAt, telemetryDisconnection.ConnectedAt)
require.Equal(t, telemetryConnection.UserID, telemetryDisconnection.UserID)
require.Equal(t, telemetryConnection.PeerID, telemetryDisconnection.PeerID)
require.NotNil(t, telemetryDisconnection.DisconnectedAt)
require.GreaterOrEqual(t, *telemetryDisconnection.DisconnectedAt, beforeDisconnectTime)
require.LessOrEqual(t, *telemetryDisconnection.DisconnectedAt, time.Now())
}

func buildWorkspaceWithAgent(
Expand Down Expand Up @@ -2334,3 +2371,46 @@ func waitForUpdates(
t.Fatal("Timeout waiting for desired state", currentState)
}
}

// fakeTelemetryReporter is a fake implementation of telemetry.Reporter
// that sends snapshots on a buffered channel, useful for testing.
type fakeTelemetryReporter struct {
enabled bool
snapshots chan *telemetry.Snapshot
t testing.TB
ctx context.Context
}

// newFakeTelemetryReporter creates a new fakeTelemetryReporter with a buffered channel.
// The buffer size determines how many snapshots can be reported before blocking.
func newFakeTelemetryReporter(ctx context.Context, t testing.TB, bufferSize int) *fakeTelemetryReporter {
return &fakeTelemetryReporter{
enabled: true,
snapshots: make(chan *telemetry.Snapshot, bufferSize),
ctx: ctx,
t: t,
}
}

// Report implements the telemetry.Reporter interface by sending the snapshot
// to the snapshots channel.
func (f *fakeTelemetryReporter) Report(snapshot *telemetry.Snapshot) {
if !f.enabled {
return
}

select {
case f.snapshots <- snapshot:
// Successfully sent
case <-f.ctx.Done():
f.t.Error("context closed while writing snapshot")
}
}

// Enabled implements the telemetry.Reporter interface.
func (f *fakeTelemetryReporter) Enabled() bool {
return f.enabled
}

// Close implements the telemetry.Reporter interface.
func (*fakeTelemetryReporter) Close() {}
Loading