Skip to content

Commit 5dfef33

Browse files
deansheatherethanndickson
authored andcommitted
chore: add DRPC server implementation for network telemetry
1 parent 089f068 commit 5dfef33

File tree

7 files changed

+1001
-503
lines changed

7 files changed

+1001
-503
lines changed

coderd/agentapi/api.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/coder/coder/v2/coderd/externalauth"
2424
"github.com/coder/coder/v2/coderd/prometheusmetrics"
2525
"github.com/coder/coder/v2/coderd/schedule"
26+
"github.com/coder/coder/v2/coderd/telemetry"
2627
"github.com/coder/coder/v2/coderd/tracing"
2728
"github.com/coder/coder/v2/coderd/workspacestats"
2829
"github.com/coder/coder/v2/codersdk"
@@ -47,6 +48,7 @@ type API struct {
4748

4849
mu sync.Mutex
4950
cachedWorkspaceID uuid.UUID
51+
drpcServiceClose func()
5052
}
5153

5254
var _ agentproto.DRPCAgentServer = &API{}
@@ -65,15 +67,18 @@ type Options struct {
6567
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
6668
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
6769
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
68-
69-
AccessURL *url.URL
70-
AppHostname string
71-
AgentStatsRefreshInterval time.Duration
72-
DisableDirectConnections bool
73-
DerpForceWebSockets bool
74-
DerpMapUpdateFrequency time.Duration
75-
ExternalAuthConfigs []*externalauth.Config
76-
Experiments codersdk.Experiments
70+
NetworkTelemetryBatchFn func(batch []telemetry.NetworkEvent)
71+
72+
AccessURL *url.URL
73+
AppHostname string
74+
AgentStatsRefreshInterval time.Duration
75+
DisableDirectConnections bool
76+
DerpForceWebSockets bool
77+
DerpMapUpdateFrequency time.Duration
78+
NetworkTelemetryBatchFrequency time.Duration
79+
NetworkTelemetryBatchMaxSize int
80+
ExternalAuthConfigs []*externalauth.Config
81+
Experiments codersdk.Experiments
7782

7883
// Optional:
7984
// WorkspaceID avoids a future lookup to find the workspace ID by setting
@@ -154,15 +159,24 @@ func New(opts Options) *API {
154159
}
155160

156161
api.DRPCService = &tailnet.DRPCService{
157-
CoordPtr: opts.TailnetCoordinator,
158-
Logger: opts.Log,
159-
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
160-
DerpMapFn: opts.DerpMapFn,
162+
CoordPtr: opts.TailnetCoordinator,
163+
Logger: opts.Log,
164+
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
165+
DerpMapFn: opts.DerpMapFn,
166+
NetworkTelemetryBatchFrequency: opts.NetworkTelemetryBatchFrequency,
167+
NetworkTelemetryBatchMaxSize: opts.NetworkTelemetryBatchMaxSize,
168+
NetworkTelemetryBatchFn: opts.NetworkTelemetryBatchFn,
161169
}
170+
api.drpcServiceClose = api.DRPCService.PeriodicTelemetryBatcher()
162171

163172
return api
164173
}
165174

175+
func (a *API) Close() error {
176+
a.drpcServiceClose()
177+
return nil
178+
}
179+
166180
func (a *API) Server(ctx context.Context) (*drpcserver.Server, error) {
167181
mux := drpcmux.New()
168182
err := agentproto.DRPCRegisterAgent(mux, a)

coderd/coderd.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,16 @@ type Options struct {
142142
DERPServer *derp.Server
143143
// BaseDERPMap is used as the base DERP map for all clients and agents.
144144
// Proxies are added to this list.
145-
BaseDERPMap *tailcfg.DERPMap
146-
DERPMapUpdateFrequency time.Duration
147-
SwaggerEndpoint bool
148-
SetUserGroups func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, orgGroupNames map[uuid.UUID][]string, createMissingGroups bool) error
149-
SetUserSiteRoles func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, roles []string) error
150-
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
151-
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
152-
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
145+
BaseDERPMap *tailcfg.DERPMap
146+
DERPMapUpdateFrequency time.Duration
147+
NetworkTelemetryBatchFrequency time.Duration
148+
NetworkTelemetryBatchMaxSize int
149+
SwaggerEndpoint bool
150+
SetUserGroups func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, orgGroupNames map[uuid.UUID][]string, createMissingGroups bool) error
151+
SetUserSiteRoles func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, roles []string) error
152+
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
153+
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
154+
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
153155
// AppSecurityKey is the crypto key used to sign and encrypt tokens related to
154156
// workspace applications. It consists of both a signing and encryption key.
155157
AppSecurityKey workspaceapps.SecurityKey
@@ -305,6 +307,12 @@ func New(options *Options) *API {
305307
if options.DERPMapUpdateFrequency == 0 {
306308
options.DERPMapUpdateFrequency = 5 * time.Second
307309
}
310+
if options.NetworkTelemetryBatchFrequency == 0 {
311+
options.NetworkTelemetryBatchFrequency = 1 * time.Minute
312+
}
313+
if options.NetworkTelemetryBatchMaxSize == 0 {
314+
options.NetworkTelemetryBatchMaxSize = 1_000
315+
}
308316
if options.TailnetCoordinator == nil {
309317
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger)
310318
}

0 commit comments

Comments
 (0)