Skip to content

Commit 7205f3c

Browse files
committed
Refactor network telemetry batching
1 parent fb9d5aa commit 7205f3c

File tree

12 files changed

+228
-310
lines changed

12 files changed

+228
-310
lines changed

coderd/agentapi/api.go

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ type API struct {
4646

4747
mu sync.Mutex
4848
cachedWorkspaceID uuid.UUID
49-
drpcServiceClose func()
5049
}
5150

5251
var _ agentproto.DRPCAgentServer = &API{}
@@ -64,18 +63,16 @@ type Options struct {
6463
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
6564
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
6665
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
67-
NetworkTelemetryBatchFn func(batch []*tailnetproto.TelemetryEvent)
68-
69-
AccessURL *url.URL
70-
AppHostname string
71-
AgentStatsRefreshInterval time.Duration
72-
DisableDirectConnections bool
73-
DerpForceWebSockets bool
74-
DerpMapUpdateFrequency time.Duration
75-
NetworkTelemetryBatchFrequency time.Duration
76-
NetworkTelemetryBatchMaxSize int
77-
ExternalAuthConfigs []*externalauth.Config
78-
Experiments codersdk.Experiments
66+
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
67+
68+
AccessURL *url.URL
69+
AppHostname string
70+
AgentStatsRefreshInterval time.Duration
71+
DisableDirectConnections bool
72+
DerpForceWebSockets bool
73+
DerpMapUpdateFrequency time.Duration
74+
ExternalAuthConfigs []*externalauth.Config
75+
Experiments codersdk.Experiments
7976

8077
// Optional:
8178
// WorkspaceID avoids a future lookup to find the workspace ID by setting
@@ -156,24 +153,16 @@ func New(opts Options) *API {
156153
}
157154

158155
api.DRPCService = &tailnet.DRPCService{
159-
CoordPtr: opts.TailnetCoordinator,
160-
Logger: opts.Log,
161-
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
162-
DerpMapFn: opts.DerpMapFn,
163-
NetworkTelemetryBatchFrequency: opts.NetworkTelemetryBatchFrequency,
164-
NetworkTelemetryBatchMaxSize: opts.NetworkTelemetryBatchMaxSize,
165-
NetworkTelemetryBatchFn: opts.NetworkTelemetryBatchFn,
156+
CoordPtr: opts.TailnetCoordinator,
157+
Logger: opts.Log,
158+
DerpMapUpdateFrequency: opts.DerpMapUpdateFrequency,
159+
DerpMapFn: opts.DerpMapFn,
160+
NetworkTelemetryHandler: opts.NetworkTelemetryHandler,
166161
}
167-
api.drpcServiceClose = api.DRPCService.PeriodicTelemetryBatcher()
168162

169163
return api
170164
}
171165

172-
func (a *API) Close() error {
173-
a.drpcServiceClose()
174-
return nil
175-
}
176-
177166
func (a *API) Server(ctx context.Context) (*drpcserver.Server, error) {
178167
mux := drpcmux.New()
179168
err := agentproto.DRPCRegisterAgent(mux, a)

coderd/agentapi/api_test.go

Lines changed: 0 additions & 106 deletions
This file was deleted.

coderd/coderd.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -547,12 +547,18 @@ func New(options *Options) *API {
547547
if options.DeploymentValues.Prometheus.Enable {
548548
options.PrometheusRegistry.MustRegister(stn)
549549
}
550-
api.TailnetClientService, err = tailnet.NewClientService(
551-
api.Logger.Named("tailnetclient"),
552-
&api.TailnetCoordinator,
553-
api.Options.DERPMapUpdateFrequency,
554-
api.DERPMap,
550+
api.NetworkTelemetryBatcher = tailnet.NewNetworkTelemetryBatcher(
551+
api.Options.NetworkTelemetryBatchFrequency,
552+
api.Options.NetworkTelemetryBatchMaxSize,
553+
api.handleNetworkTelemetry,
555554
)
555+
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
556+
Logger: api.Logger.Named("tailnetclient"),
557+
CoordPtr: &api.TailnetCoordinator,
558+
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
559+
DERPMapFn: api.DERPMap,
560+
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
561+
})
556562
if err != nil {
557563
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
558564
}
@@ -1263,6 +1269,7 @@ type API struct {
12631269
Auditor atomic.Pointer[audit.Auditor]
12641270
WorkspaceClientCoordinateOverride atomic.Pointer[func(rw http.ResponseWriter) bool]
12651271
TailnetCoordinator atomic.Pointer[tailnet.Coordinator]
1272+
NetworkTelemetryBatcher *tailnet.NetworkTelemetryBatcher
12661273
TailnetClientService *tailnet.ClientService
12671274
QuotaCommitter atomic.Pointer[proto.QuotaCommitter]
12681275
AppearanceFetcher atomic.Pointer[appearance.Fetcher]
@@ -1356,6 +1363,7 @@ func (api *API) Close() error {
13561363
}
13571364
_ = api.agentProvider.Close()
13581365
_ = api.statsReporter.Close()
1366+
_ = api.NetworkTelemetryBatcher.Close()
13591367
return nil
13601368
}
13611369

coderd/workspaceagentsrpc.go

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -136,38 +136,21 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
136136
StatsReporter: api.statsReporter,
137137
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
138138
PublishWorkspaceAgentLogsUpdateFn: api.publishWorkspaceAgentLogsUpdate,
139-
NetworkTelemetryBatchFn: func(batch []*tailnetproto.TelemetryEvent) {
140-
telemetryEvents := make([]telemetry.NetworkEvent, 0, len(batch))
141-
for _, pEvent := range batch {
142-
tEvent, err := telemetry.NetworkEventFromProto(pEvent)
143-
if err != nil {
144-
// Events that fail to be converted get discarded for now.
145-
continue
146-
}
147-
telemetryEvents = append(telemetryEvents, tEvent)
148-
}
149-
150-
api.Telemetry.Report(&telemetry.Snapshot{
151-
NetworkEvents: telemetryEvents,
152-
})
153-
},
139+
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
154140

155-
AccessURL: api.AccessURL,
156-
AppHostname: api.AppHostname,
157-
AgentStatsRefreshInterval: api.AgentStatsRefreshInterval,
158-
DisableDirectConnections: api.DeploymentValues.DERP.Config.BlockDirect.Value(),
159-
DerpForceWebSockets: api.DeploymentValues.DERP.Config.ForceWebSockets.Value(),
160-
DerpMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
161-
NetworkTelemetryBatchFrequency: api.Options.NetworkTelemetryBatchFrequency,
162-
NetworkTelemetryBatchMaxSize: api.Options.NetworkTelemetryBatchMaxSize,
163-
ExternalAuthConfigs: api.ExternalAuthConfigs,
164-
Experiments: api.Experiments,
141+
AccessURL: api.AccessURL,
142+
AppHostname: api.AppHostname,
143+
AgentStatsRefreshInterval: api.AgentStatsRefreshInterval,
144+
DisableDirectConnections: api.DeploymentValues.DERP.Config.BlockDirect.Value(),
145+
DerpForceWebSockets: api.DeploymentValues.DERP.Config.ForceWebSockets.Value(),
146+
DerpMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
147+
ExternalAuthConfigs: api.ExternalAuthConfigs,
148+
Experiments: api.Experiments,
165149

166150
// Optional:
167151
WorkspaceID: build.WorkspaceID, // saves the extra lookup later
168152
UpdateAgentMetricsFn: api.UpdateAgentMetrics,
169153
})
170-
defer agentAPI.Close()
171154

172155
streamID := tailnet.StreamID{
173156
Name: fmt.Sprintf("%s-%s-%s", owner.Username, workspace.Name, workspaceAgent.Name),
@@ -184,6 +167,22 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
184167
}
185168
}
186169

170+
func (api *API) handleNetworkTelemetry(batch []*tailnetproto.TelemetryEvent) {
171+
telemetryEvents := make([]telemetry.NetworkEvent, 0, len(batch))
172+
for _, pEvent := range batch {
173+
tEvent, err := telemetry.NetworkEventFromProto(pEvent)
174+
if err != nil {
175+
// Events that fail to be converted get discarded for now.
176+
continue
177+
}
178+
telemetryEvents = append(telemetryEvents, tEvent)
179+
}
180+
181+
api.Telemetry.Report(&telemetry.Snapshot{
182+
NetworkEvents: telemetryEvents,
183+
})
184+
}
185+
187186
type yamuxPingerCloser struct {
188187
mux *yamux.Session
189188
}

codersdk/workspacesdk/connector_internal_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,13 @@ func TestTailnetAPIConnector_Disconnects(t *testing.T) {
5050
coordPtr.Store(&coord)
5151
derpMapCh := make(chan *tailcfg.DERPMap)
5252
defer close(derpMapCh)
53-
svc, err := tailnet.NewClientService(
54-
logger, &coordPtr,
55-
time.Millisecond, func() *tailcfg.DERPMap { return <-derpMapCh },
56-
)
53+
svc, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
54+
Logger: logger,
55+
CoordPtr: &coordPtr,
56+
DERPMapUpdateFrequency: time.Millisecond,
57+
DERPMapFn: func() *tailcfg.DERPMap { return <-derpMapCh },
58+
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) {},
59+
})
5760
require.NoError(t, err)
5861

5962
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

enterprise/coderd/coderd.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,13 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
138138
}
139139
return api.fetchRegions(ctx)
140140
}
141-
api.tailnetService, err = tailnet.NewClientService(
142-
api.Logger.Named("tailnetclient"),
143-
&api.AGPL.TailnetCoordinator,
144-
api.Options.DERPMapUpdateFrequency,
145-
api.AGPL.DERPMap,
146-
)
141+
api.tailnetService, err = tailnet.NewClientService(agpltailnet.ClientServiceOptions{
142+
Logger: api.Logger.Named("tailnetclient"),
143+
CoordPtr: &api.AGPL.TailnetCoordinator,
144+
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
145+
DERPMapFn: api.AGPL.DERPMap,
146+
NetworkTelemetryHandler: api.AGPL.NetworkTelemetryBatcher.Handler,
147+
})
147148
if err != nil {
148149
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
149150
}

enterprise/tailnet/workspaceproxy.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ import (
66
"encoding/json"
77
"errors"
88
"net"
9-
"sync/atomic"
109
"time"
1110

1211
"github.com/google/uuid"
1312
"golang.org/x/xerrors"
14-
"tailscale.com/tailcfg"
1513

1614
"cdr.dev/slog"
1715
"github.com/coder/coder/v2/apiversion"
@@ -25,15 +23,14 @@ type ClientService struct {
2523

2624
// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
2725
// loaded on each processed connection.
28-
func NewClientService(
29-
logger slog.Logger,
30-
coordPtr *atomic.Pointer[agpl.Coordinator],
31-
derpMapUpdateFrequency time.Duration,
32-
derpMapFn func() *tailcfg.DERPMap,
33-
) (
34-
*ClientService, error,
35-
) {
36-
s, err := agpl.NewClientService(logger, coordPtr, derpMapUpdateFrequency, derpMapFn)
26+
func NewClientService(options agpl.ClientServiceOptions) (*ClientService, error) {
27+
s, err := agpl.NewClientService(agpl.ClientServiceOptions{
28+
Logger: options.Logger,
29+
CoordPtr: options.CoordPtr,
30+
DERPMapUpdateFrequency: options.DERPMapUpdateFrequency,
31+
DERPMapFn: options.DERPMapFn,
32+
NetworkTelemetryHandler: options.NetworkTelemetryHandler,
33+
})
3734
if err != nil {
3835
return nil, err
3936
}

enterprise/wsproxy/wsproxysdk/wsproxysdk_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,13 @@ func TestDialCoordinator(t *testing.T) {
171171

172172
coordPtr := atomic.Pointer[agpl.Coordinator]{}
173173
coordPtr.Store(&coord)
174-
cSrv, err := tailnet.NewClientService(
175-
logger, &coordPtr,
176-
time.Hour,
177-
func() *tailcfg.DERPMap { panic("not implemented") },
178-
)
174+
cSrv, err := tailnet.NewClientService(agpl.ClientServiceOptions{
175+
Logger: logger,
176+
CoordPtr: &coordPtr,
177+
DERPMapUpdateFrequency: time.Hour,
178+
DERPMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
179+
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) { panic("not implemented") },
180+
})
179181
require.NoError(t, err)
180182

181183
// buffer the channels here, so we don't need to read and write in goroutines to

0 commit comments

Comments
 (0)