-
Notifications
You must be signed in to change notification settings - Fork 983
chore: add DRPC server implementation for network telemetry #13675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
659bae0
776f043
2148b62
c312349
479df1f
6a40a02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -267,32 +267,44 @@ func NewNetworkTelemetryBatcher(clk clock.Clock, frequency time.Duration, maxSiz | |
|
||
func (b *NetworkTelemetryBatcher) Close() error { | ||
close(b.closed) | ||
<-b.done | ||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
defer cancel() | ||
select { | ||
case <-ctx.Done(): | ||
return xerrors.New("timed out waiting for batcher to close") | ||
case <-b.done: | ||
} | ||
return nil | ||
} | ||
|
||
func (b *NetworkTelemetryBatcher) sendTelemetryBatch() { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
events := b.pending | ||
if len(events) == 0 { | ||
return | ||
} | ||
b.pending = []*proto.TelemetryEvent{} | ||
go b.batchFn(events) | ||
b.batchFn(events) | ||
} | ||
|
||
func (b *NetworkTelemetryBatcher) start() { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
ticker := b.clock.NewTicker(b.frequency) | ||
b.ticker = ticker | ||
b.ticker = b.clock.NewTicker(b.frequency) | ||
|
||
go func() { | ||
defer close(b.done) | ||
defer ticker.Stop() | ||
defer func() { | ||
// The lock prevents Handler from racing with Close. | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
close(b.done) | ||
b.ticker.Stop() | ||
}() | ||
|
||
for { | ||
select { | ||
case <-ticker.C: | ||
case <-b.ticker.C: | ||
b.sendTelemetryBatch() | ||
b.ticker.Reset(b.frequency) | ||
case <-b.closed: | ||
// Send any remaining telemetry events before exiting. | ||
b.sendTelemetryBatch() | ||
|
@@ -305,17 +317,26 @@ func (b *NetworkTelemetryBatcher) start() { | |
func (b *NetworkTelemetryBatcher) Handler(events []*proto.TelemetryEvent) { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
select { | ||
case <-b.closed: | ||
return | ||
default: | ||
} | ||
|
||
for _, event := range events { | ||
b.pending = append(b.pending, event) | ||
|
||
if len(b.pending) >= b.maxSize { | ||
// This can't call sendTelemetryBatch directly because we already | ||
// hold the lock. | ||
events := b.pending | ||
b.pending = []*proto.TelemetryEvent{} | ||
// Resetting the ticker is best effort. We don't care if the ticker | ||
// has already fired or has a pending message, because the only risk | ||
// is that we send two telemetry events in short succession (which | ||
// is totally fine). | ||
b.ticker.Reset(b.frequency) | ||
// Perform the send in a goroutine to avoid blocking the DRPC call. | ||
if b.ticker != nil { | ||
b.ticker.Reset(b.frequency) | ||
} | ||
go b.batchFn(events) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels a bit racey. We should probably have a separate channel to signal to the other goroutine that it should tick. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't racey because we hold a lock. If a tick happens while we're handling messages, the pending events list will be emptied and the periodic batcher goroutine will do nothing because there are no events. If somehow we went above the batch size anyways and overflow, it'll just send two telemetry events in short succession which is acceptable. I've added a comment that the ticker reset is best effort |
||
} | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.