Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Comments
  • Loading branch information
deansheather committed Jul 1, 2024
commit 6a40a02280e8bda89b8d4ec06f6cf5e36422ef1d
7 changes: 6 additions & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,12 @@ type API struct {

// Close waits for all WebSocket connections to drain before returning.
func (api *API) Close() error {
api.cancel()
select {
case <-api.ctx.Done():
return xerrors.New("API already closed")
default:
api.cancel()
}
if api.derpCloseFunc != nil {
api.derpCloseFunc()
}
Expand Down
30 changes: 19 additions & 11 deletions coderd/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,12 +1249,12 @@ type NetworkEvent struct {
DERPMap DERPMap `json:"derp_map"`
LatestNetcheck Netcheck `json:"latest_netcheck"`

ConnectionAge time.Duration `json:"connection_age"`
ConnectionSetup time.Duration `json:"connection_setup"`
P2PSetup time.Duration `json:"p2p_setup"`
DERPLatency time.Duration `json:"derp_latency"`
P2PLatency time.Duration `json:"p2p_latency"`
ThroughputMbits *float32 `json:"throughput_mbits"`
ConnectionAge *time.Duration `json:"connection_age"`
ConnectionSetup *time.Duration `json:"connection_setup"`
P2PSetup *time.Duration `json:"p2p_setup"`
DERPLatency *time.Duration `json:"derp_latency"`
P2PLatency *time.Duration `json:"p2p_latency"`
ThroughputMbits *float32 `json:"throughput_mbits"`
}

func protoFloat(f *wrapperspb.FloatValue) *float32 {
Expand All @@ -1264,6 +1264,14 @@ func protoFloat(f *wrapperspb.FloatValue) *float32 {
return &f.Value
}

func protoDurationNil(d *durationpb.Duration) *time.Duration {
if d == nil {
return nil
}
dur := d.AsDuration()
return &dur
}

func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, error) {
if proto == nil {
return NetworkEvent{}, xerrors.New("nil event")
Expand Down Expand Up @@ -1294,11 +1302,11 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er
DERPMap: derpMapFromProto(proto.DerpMap),
LatestNetcheck: netcheckFromProto(proto.LatestNetcheck),

ConnectionAge: proto.ConnectionAge.AsDuration(),
ConnectionSetup: proto.ConnectionSetup.AsDuration(),
P2PSetup: proto.P2PSetup.AsDuration(),
DERPLatency: proto.DerpLatency.AsDuration(),
P2PLatency: proto.P2PLatency.AsDuration(),
ConnectionAge: protoDurationNil(proto.ConnectionAge),
ConnectionSetup: protoDurationNil(proto.ConnectionSetup),
P2PSetup: protoDurationNil(proto.P2PSetup),
DERPLatency: protoDurationNil(proto.DerpLatency),
P2PLatency: protoDurationNil(proto.P2PLatency),
ThroughputMbits: protoFloat(proto.ThroughputMbits),
}, nil
}
Expand Down
45 changes: 33 additions & 12 deletions tailnet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,32 +267,44 @@ func NewNetworkTelemetryBatcher(clk clock.Clock, frequency time.Duration, maxSiz

func (b *NetworkTelemetryBatcher) Close() error {
close(b.closed)
<-b.done
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case <-ctx.Done():
return xerrors.New("timed out waiting for batcher to close")
case <-b.done:
}
return nil
}

func (b *NetworkTelemetryBatcher) sendTelemetryBatch() {
b.mu.Lock()
defer b.mu.Unlock()
events := b.pending
if len(events) == 0 {
return
}
b.pending = []*proto.TelemetryEvent{}
go b.batchFn(events)
b.batchFn(events)
}

func (b *NetworkTelemetryBatcher) start() {
b.mu.Lock()
defer b.mu.Unlock()
ticker := b.clock.NewTicker(b.frequency)
b.ticker = ticker
b.ticker = b.clock.NewTicker(b.frequency)

go func() {
defer close(b.done)
defer ticker.Stop()
defer func() {
// The lock prevents Handler from racing with Close.
b.mu.Lock()
defer b.mu.Unlock()
close(b.done)
b.ticker.Stop()
}()

for {
select {
case <-ticker.C:
case <-b.ticker.C:
b.sendTelemetryBatch()
b.ticker.Reset(b.frequency)
case <-b.closed:
// Send any remaining telemetry events before exiting.
b.sendTelemetryBatch()
Expand All @@ -305,17 +317,26 @@ func (b *NetworkTelemetryBatcher) start() {
func (b *NetworkTelemetryBatcher) Handler(events []*proto.TelemetryEvent) {
b.mu.Lock()
defer b.mu.Unlock()
select {
case <-b.closed:
return
default:
}

for _, event := range events {
b.pending = append(b.pending, event)

if len(b.pending) >= b.maxSize {
// This can't call sendTelemetryBatch directly because we already
// hold the lock.
events := b.pending
b.pending = []*proto.TelemetryEvent{}
// Resetting the ticker is best effort. We don't care if the ticker
// has already fired or has a pending message, because the only risk
// is that we send two telemetry events in short succession (which
// is totally fine).
b.ticker.Reset(b.frequency)
// Perform the send in a goroutine to avoid blocking the DRPC call.
if b.ticker != nil {
b.ticker.Reset(b.frequency)
}
go b.batchFn(events)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a bit racey. We should probably have a separate channel to signal to the other goroutine that it should tick.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't racey because we hold a lock. If a tick happens while we're handling messages, the pending events list will be emptied and the periodic batcher goroutine will do nothing because there are no events. If somehow we went above the batch size anyways and overflow, it'll just send two telemetry events in short succession which is acceptable. I've added a comment that the ticker reset is best effort

}
}
Expand Down