Skip to content

Commit 6c94dd4

Browse files
authored
chore: add DRPC server implementation for network telemetry (#13675)
1 parent 2fde054 commit 6c94dd4

File tree

14 files changed

+1190
-555
lines changed

14 files changed

+1190
-555
lines changed

coderd/agentapi/api.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/coder/coder/v2/coderd/database/pubsub"
2323
"github.com/coder/coder/v2/coderd/externalauth"
2424
"github.com/coder/coder/v2/coderd/prometheusmetrics"
25-
"github.com/coder/coder/v2/coderd/schedule"
2625
"github.com/coder/coder/v2/coderd/tracing"
2726
"github.com/coder/coder/v2/coderd/workspacestats"
2827
"github.com/coder/coder/v2/codersdk"
@@ -60,11 +59,11 @@ type Options struct {
6059
Pubsub pubsub.Pubsub
6160
DerpMapFn func() *tailcfg.DERPMap
6261
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
63-
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
6462
StatsReporter *workspacestats.Reporter
6563
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
6664
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
6765
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
66+
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
6867

6968
AccessURL *url.URL
7069
AppHostname string
@@ -154,10 +153,11 @@ func New(opts Options) *API {
154153
}
155154

156155
api.DRPCService = &tailnet.DRPCService{
157-
CoordPtr: opts.TailnetCoordinator,
158-
Logger: opts.Log,
159-
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
160-
DerpMapFn: opts.DerpMapFn,
156+
CoordPtr: opts.TailnetCoordinator,
157+
Logger: opts.Log,
158+
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
159+
DerpMapFn: opts.DerpMapFn,
160+
NetworkTelemetryHandler: opts.NetworkTelemetryHandler,
161161
}
162162

163163
return api

coderd/coderd.go

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"cdr.dev/slog"
4040
agentproto "github.com/coder/coder/v2/agent/proto"
4141
"github.com/coder/coder/v2/buildinfo"
42+
"github.com/coder/coder/v2/clock"
4243
_ "github.com/coder/coder/v2/coderd/apidoc" // Used for swagger docs.
4344
"github.com/coder/coder/v2/coderd/appearance"
4445
"github.com/coder/coder/v2/coderd/audit"
@@ -142,14 +143,16 @@ type Options struct {
142143
DERPServer *derp.Server
143144
// BaseDERPMap is used as the base DERP map for all clients and agents.
144145
// Proxies are added to this list.
145-
BaseDERPMap *tailcfg.DERPMap
146-
DERPMapUpdateFrequency time.Duration
147-
SwaggerEndpoint bool
148-
SetUserGroups func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, orgGroupNames map[uuid.UUID][]string, createMissingGroups bool) error
149-
SetUserSiteRoles func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, roles []string) error
150-
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
151-
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
152-
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
146+
BaseDERPMap *tailcfg.DERPMap
147+
DERPMapUpdateFrequency time.Duration
148+
NetworkTelemetryBatchFrequency time.Duration
149+
NetworkTelemetryBatchMaxSize int
150+
SwaggerEndpoint bool
151+
SetUserGroups func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, orgGroupNames map[uuid.UUID][]string, createMissingGroups bool) error
152+
SetUserSiteRoles func(ctx context.Context, logger slog.Logger, tx database.Store, userID uuid.UUID, roles []string) error
153+
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
154+
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
155+
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
153156
// AppSecurityKey is the crypto key used to sign and encrypt tokens related to
154157
// workspace applications. It consists of both a signing and encryption key.
155158
AppSecurityKey workspaceapps.SecurityKey
@@ -305,6 +308,12 @@ func New(options *Options) *API {
305308
if options.DERPMapUpdateFrequency == 0 {
306309
options.DERPMapUpdateFrequency = 5 * time.Second
307310
}
311+
if options.NetworkTelemetryBatchFrequency == 0 {
312+
options.NetworkTelemetryBatchFrequency = 1 * time.Minute
313+
}
314+
if options.NetworkTelemetryBatchMaxSize == 0 {
315+
options.NetworkTelemetryBatchMaxSize = 1_000
316+
}
308317
if options.TailnetCoordinator == nil {
309318
options.TailnetCoordinator = tailnet.NewCoordinator(options.Logger)
310319
}
@@ -539,12 +548,19 @@ func New(options *Options) *API {
539548
if options.DeploymentValues.Prometheus.Enable {
540549
options.PrometheusRegistry.MustRegister(stn)
541550
}
542-
api.TailnetClientService, err = tailnet.NewClientService(
543-
api.Logger.Named("tailnetclient"),
544-
&api.TailnetCoordinator,
545-
api.Options.DERPMapUpdateFrequency,
546-
api.DERPMap,
551+
api.NetworkTelemetryBatcher = tailnet.NewNetworkTelemetryBatcher(
552+
clock.NewReal(),
553+
api.Options.NetworkTelemetryBatchFrequency,
554+
api.Options.NetworkTelemetryBatchMaxSize,
555+
api.handleNetworkTelemetry,
547556
)
557+
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
558+
Logger: api.Logger.Named("tailnetclient"),
559+
CoordPtr: &api.TailnetCoordinator,
560+
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
561+
DERPMapFn: api.DERPMap,
562+
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
563+
})
548564
if err != nil {
549565
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
550566
}
@@ -1255,6 +1271,7 @@ type API struct {
12551271
Auditor atomic.Pointer[audit.Auditor]
12561272
WorkspaceClientCoordinateOverride atomic.Pointer[func(rw http.ResponseWriter) bool]
12571273
TailnetCoordinator atomic.Pointer[tailnet.Coordinator]
1274+
NetworkTelemetryBatcher *tailnet.NetworkTelemetryBatcher
12581275
TailnetClientService *tailnet.ClientService
12591276
QuotaCommitter atomic.Pointer[proto.QuotaCommitter]
12601277
AppearanceFetcher atomic.Pointer[appearance.Fetcher]
@@ -1313,7 +1330,12 @@ type API struct {
13131330

13141331
// Close waits for all WebSocket connections to drain before returning.
13151332
func (api *API) Close() error {
1316-
api.cancel()
1333+
select {
1334+
case <-api.ctx.Done():
1335+
return xerrors.New("API already closed")
1336+
default:
1337+
api.cancel()
1338+
}
13171339
if api.derpCloseFunc != nil {
13181340
api.derpCloseFunc()
13191341
}
@@ -1348,6 +1370,7 @@ func (api *API) Close() error {
13481370
}
13491371
_ = api.agentProvider.Close()
13501372
_ = api.statsReporter.Close()
1373+
_ = api.NetworkTelemetryBatcher.Close()
13511374
return nil
13521375
}
13531376

0 commit comments

Comments
 (0)