Skip to content

Commit 6a40a02

Browse files
committed
Comments
1 parent 479df1f commit 6a40a02

File tree

3 files changed

+58
-24
lines changed

3 files changed

+58
-24
lines changed

coderd/coderd.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1330,7 +1330,12 @@ type API struct {
13301330

13311331
// Close waits for all WebSocket connections to drain before returning.
13321332
func (api *API) Close() error {
1333-
api.cancel()
1333+
select {
1334+
case <-api.ctx.Done():
1335+
return xerrors.New("API already closed")
1336+
default:
1337+
api.cancel()
1338+
}
13341339
if api.derpCloseFunc != nil {
13351340
api.derpCloseFunc()
13361341
}

coderd/telemetry/telemetry.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,12 +1249,12 @@ type NetworkEvent struct {
12491249
DERPMap DERPMap `json:"derp_map"`
12501250
LatestNetcheck Netcheck `json:"latest_netcheck"`
12511251

1252-
ConnectionAge time.Duration `json:"connection_age"`
1253-
ConnectionSetup time.Duration `json:"connection_setup"`
1254-
P2PSetup time.Duration `json:"p2p_setup"`
1255-
DERPLatency time.Duration `json:"derp_latency"`
1256-
P2PLatency time.Duration `json:"p2p_latency"`
1257-
ThroughputMbits *float32 `json:"throughput_mbits"`
1252+
ConnectionAge *time.Duration `json:"connection_age"`
1253+
ConnectionSetup *time.Duration `json:"connection_setup"`
1254+
P2PSetup *time.Duration `json:"p2p_setup"`
1255+
DERPLatency *time.Duration `json:"derp_latency"`
1256+
P2PLatency *time.Duration `json:"p2p_latency"`
1257+
ThroughputMbits *float32 `json:"throughput_mbits"`
12581258
}
12591259

12601260
func protoFloat(f *wrapperspb.FloatValue) *float32 {
@@ -1264,6 +1264,14 @@ func protoFloat(f *wrapperspb.FloatValue) *float32 {
12641264
return &f.Value
12651265
}
12661266

1267+
func protoDurationNil(d *durationpb.Duration) *time.Duration {
1268+
if d == nil {
1269+
return nil
1270+
}
1271+
dur := d.AsDuration()
1272+
return &dur
1273+
}
1274+
12671275
func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, error) {
12681276
if proto == nil {
12691277
return NetworkEvent{}, xerrors.New("nil event")
@@ -1294,11 +1302,11 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er
12941302
DERPMap: derpMapFromProto(proto.DerpMap),
12951303
LatestNetcheck: netcheckFromProto(proto.LatestNetcheck),
12961304

1297-
ConnectionAge: proto.ConnectionAge.AsDuration(),
1298-
ConnectionSetup: proto.ConnectionSetup.AsDuration(),
1299-
P2PSetup: proto.P2PSetup.AsDuration(),
1300-
DERPLatency: proto.DerpLatency.AsDuration(),
1301-
P2PLatency: proto.P2PLatency.AsDuration(),
1305+
ConnectionAge: protoDurationNil(proto.ConnectionAge),
1306+
ConnectionSetup: protoDurationNil(proto.ConnectionSetup),
1307+
P2PSetup: protoDurationNil(proto.P2PSetup),
1308+
DERPLatency: protoDurationNil(proto.DerpLatency),
1309+
P2PLatency: protoDurationNil(proto.P2PLatency),
13021310
ThroughputMbits: protoFloat(proto.ThroughputMbits),
13031311
}, nil
13041312
}

tailnet/service.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -267,32 +267,44 @@ func NewNetworkTelemetryBatcher(clk clock.Clock, frequency time.Duration, maxSiz
267267

268268
func (b *NetworkTelemetryBatcher) Close() error {
269269
close(b.closed)
270-
<-b.done
270+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
271+
defer cancel()
272+
select {
273+
case <-ctx.Done():
274+
return xerrors.New("timed out waiting for batcher to close")
275+
case <-b.done:
276+
}
271277
return nil
272278
}
273279

274280
func (b *NetworkTelemetryBatcher) sendTelemetryBatch() {
275281
b.mu.Lock()
276282
defer b.mu.Unlock()
277283
events := b.pending
284+
if len(events) == 0 {
285+
return
286+
}
278287
b.pending = []*proto.TelemetryEvent{}
279-
go b.batchFn(events)
288+
b.batchFn(events)
280289
}
281290

282291
func (b *NetworkTelemetryBatcher) start() {
283-
b.mu.Lock()
284-
defer b.mu.Unlock()
285-
ticker := b.clock.NewTicker(b.frequency)
286-
b.ticker = ticker
292+
b.ticker = b.clock.NewTicker(b.frequency)
287293

288294
go func() {
289-
defer close(b.done)
290-
defer ticker.Stop()
295+
defer func() {
296+
// The lock prevents Handler from racing with Close.
297+
b.mu.Lock()
298+
defer b.mu.Unlock()
299+
close(b.done)
300+
b.ticker.Stop()
301+
}()
291302

292303
for {
293304
select {
294-
case <-ticker.C:
305+
case <-b.ticker.C:
295306
b.sendTelemetryBatch()
307+
b.ticker.Reset(b.frequency)
296308
case <-b.closed:
297309
// Send any remaining telemetry events before exiting.
298310
b.sendTelemetryBatch()
@@ -305,17 +317,26 @@ func (b *NetworkTelemetryBatcher) start() {
305317
func (b *NetworkTelemetryBatcher) Handler(events []*proto.TelemetryEvent) {
306318
b.mu.Lock()
307319
defer b.mu.Unlock()
320+
select {
321+
case <-b.closed:
322+
return
323+
default:
324+
}
308325

309326
for _, event := range events {
310327
b.pending = append(b.pending, event)
311328

312329
if len(b.pending) >= b.maxSize {
330+
// This can't call sendTelemetryBatch directly because we already
331+
// hold the lock.
313332
events := b.pending
314333
b.pending = []*proto.TelemetryEvent{}
334+
// Resetting the ticker is best effort. We don't care if the ticker
335+
// has already fired or has a pending message, because the only risk
336+
// is that we send two telemetry events in short succession (which
337+
// is totally fine).
338+
b.ticker.Reset(b.frequency)
315339
// Perform the send in a goroutine to avoid blocking the DRPC call.
316-
if b.ticker != nil {
317-
b.ticker.Reset(b.frequency)
318-
}
319340
go b.batchFn(events)
320341
}
321342
}

0 commit comments

Comments
 (0)