@@ -39,6 +39,7 @@ import (
39
39
"cdr.dev/slog"
40
40
agentproto "github.com/coder/coder/v2/agent/proto"
41
41
"github.com/coder/coder/v2/buildinfo"
42
+ "github.com/coder/coder/v2/clock"
42
43
_ "github.com/coder/coder/v2/coderd/apidoc" // Used for swagger docs.
43
44
"github.com/coder/coder/v2/coderd/appearance"
44
45
"github.com/coder/coder/v2/coderd/audit"
@@ -142,14 +143,16 @@ type Options struct {
142
143
DERPServer * derp.Server
143
144
// BaseDERPMap is used as the base DERP map for all clients and agents.
144
145
// Proxies are added to this list.
145
- BaseDERPMap * tailcfg.DERPMap
146
- DERPMapUpdateFrequency time.Duration
147
- SwaggerEndpoint bool
148
- SetUserGroups func (ctx context.Context , logger slog.Logger , tx database.Store , userID uuid.UUID , orgGroupNames map [uuid.UUID ][]string , createMissingGroups bool ) error
149
- SetUserSiteRoles func (ctx context.Context , logger slog.Logger , tx database.Store , userID uuid.UUID , roles []string ) error
150
- TemplateScheduleStore * atomic.Pointer [schedule.TemplateScheduleStore ]
151
- UserQuietHoursScheduleStore * atomic.Pointer [schedule.UserQuietHoursScheduleStore ]
152
- AccessControlStore * atomic.Pointer [dbauthz.AccessControlStore ]
146
+ BaseDERPMap * tailcfg.DERPMap
147
+ DERPMapUpdateFrequency time.Duration
148
+ NetworkTelemetryBatchFrequency time.Duration
149
+ NetworkTelemetryBatchMaxSize int
150
+ SwaggerEndpoint bool
151
+ SetUserGroups func (ctx context.Context , logger slog.Logger , tx database.Store , userID uuid.UUID , orgGroupNames map [uuid.UUID ][]string , createMissingGroups bool ) error
152
+ SetUserSiteRoles func (ctx context.Context , logger slog.Logger , tx database.Store , userID uuid.UUID , roles []string ) error
153
+ TemplateScheduleStore * atomic.Pointer [schedule.TemplateScheduleStore ]
154
+ UserQuietHoursScheduleStore * atomic.Pointer [schedule.UserQuietHoursScheduleStore ]
155
+ AccessControlStore * atomic.Pointer [dbauthz.AccessControlStore ]
153
156
// AppSecurityKey is the crypto key used to sign and encrypt tokens related to
154
157
// workspace applications. It consists of both a signing and encryption key.
155
158
AppSecurityKey workspaceapps.SecurityKey
@@ -305,6 +308,12 @@ func New(options *Options) *API {
305
308
if options .DERPMapUpdateFrequency == 0 {
306
309
options .DERPMapUpdateFrequency = 5 * time .Second
307
310
}
311
+ if options .NetworkTelemetryBatchFrequency == 0 {
312
+ options .NetworkTelemetryBatchFrequency = 1 * time .Minute
313
+ }
314
+ if options .NetworkTelemetryBatchMaxSize == 0 {
315
+ options .NetworkTelemetryBatchMaxSize = 1_000
316
+ }
308
317
if options .TailnetCoordinator == nil {
309
318
options .TailnetCoordinator = tailnet .NewCoordinator (options .Logger )
310
319
}
@@ -539,12 +548,19 @@ func New(options *Options) *API {
539
548
if options .DeploymentValues .Prometheus .Enable {
540
549
options .PrometheusRegistry .MustRegister (stn )
541
550
}
542
- api .TailnetClientService , err = tailnet .NewClientService (
543
- api . Logger . Named ( "tailnetclient" ),
544
- & api .TailnetCoordinator ,
545
- api .Options .DERPMapUpdateFrequency ,
546
- api .DERPMap ,
551
+ api .NetworkTelemetryBatcher = tailnet .NewNetworkTelemetryBatcher (
552
+ clock . NewReal ( ),
553
+ api .Options . NetworkTelemetryBatchFrequency ,
554
+ api .Options .NetworkTelemetryBatchMaxSize ,
555
+ api .handleNetworkTelemetry ,
547
556
)
557
+ api .TailnetClientService , err = tailnet .NewClientService (tailnet.ClientServiceOptions {
558
+ Logger : api .Logger .Named ("tailnetclient" ),
559
+ CoordPtr : & api .TailnetCoordinator ,
560
+ DERPMapUpdateFrequency : api .Options .DERPMapUpdateFrequency ,
561
+ DERPMapFn : api .DERPMap ,
562
+ NetworkTelemetryHandler : api .NetworkTelemetryBatcher .Handler ,
563
+ })
548
564
if err != nil {
549
565
api .Logger .Fatal (api .ctx , "failed to initialize tailnet client service" , slog .Error (err ))
550
566
}
@@ -1255,6 +1271,7 @@ type API struct {
1255
1271
Auditor atomic.Pointer [audit.Auditor ]
1256
1272
WorkspaceClientCoordinateOverride atomic.Pointer [func (rw http.ResponseWriter ) bool ]
1257
1273
TailnetCoordinator atomic.Pointer [tailnet.Coordinator ]
1274
+ NetworkTelemetryBatcher * tailnet.NetworkTelemetryBatcher
1258
1275
TailnetClientService * tailnet.ClientService
1259
1276
QuotaCommitter atomic.Pointer [proto.QuotaCommitter ]
1260
1277
AppearanceFetcher atomic.Pointer [appearance.Fetcher ]
@@ -1313,7 +1330,12 @@ type API struct {
1313
1330
1314
1331
// Close waits for all WebSocket connections to drain before returning.
1315
1332
func (api * API ) Close () error {
1316
- api .cancel ()
1333
+ select {
1334
+ case <- api .ctx .Done ():
1335
+ return xerrors .New ("API already closed" )
1336
+ default :
1337
+ api .cancel ()
1338
+ }
1317
1339
if api .derpCloseFunc != nil {
1318
1340
api .derpCloseFunc ()
1319
1341
}
@@ -1348,6 +1370,7 @@ func (api *API) Close() error {
1348
1370
}
1349
1371
_ = api .agentProvider .Close ()
1350
1372
_ = api .statsReporter .Close ()
1373
+ _ = api .NetworkTelemetryBatcher .Close ()
1351
1374
return nil
1352
1375
}
1353
1376
0 commit comments