@@ -10,16 +10,14 @@ import (
10
10
11
11
"github.com/google/uuid"
12
12
"github.com/hashicorp/yamux"
13
+ "golang.org/x/xerrors"
13
14
"storj.io/drpc/drpcmux"
14
15
"storj.io/drpc/drpcserver"
15
16
"tailscale.com/tailcfg"
16
17
17
18
"cdr.dev/slog"
18
19
"github.com/coder/coder/v2/apiversion"
19
- "github.com/coder/coder/v2/coderd/telemetry"
20
20
"github.com/coder/coder/v2/tailnet/proto"
21
-
22
- "golang.org/x/xerrors"
23
21
)
24
22
25
23
type streamIDContextKey struct {}
@@ -125,13 +123,14 @@ type DRPCService struct {
125
123
DerpMapFn func () * tailcfg.DERPMap
126
124
NetworkTelemetryBatchFrequency time.Duration
127
125
NetworkTelemetryBatchMaxSize int
128
- NetworkTelemetryBatchFn func (batch []telemetry. NetworkEvent )
126
+ NetworkTelemetryBatchFn func (batch []* proto. TelemetryEvent )
129
127
130
- mu sync.Mutex
131
- pendingNetworkEvents []telemetry.NetworkEvent
128
+ mu sync.Mutex
129
+ networkEventBatchTicker * time.Ticker
130
+ pendingNetworkEvents []* proto.TelemetryEvent
132
131
}
133
132
134
- func (s * DRPCService ) writeTelemetryEvents (events []telemetry. NetworkEvent ) {
133
+ func (s * DRPCService ) writeTelemetryEvents (events []* proto. TelemetryEvent ) {
135
134
if s .NetworkTelemetryBatchFn == nil {
136
135
return
137
136
}
@@ -142,13 +141,14 @@ func (s *DRPCService) sendTelemetryBatch() {
142
141
s .mu .Lock ()
143
142
defer s .mu .Unlock ()
144
143
events := s .pendingNetworkEvents
145
- s .pendingNetworkEvents = []telemetry. NetworkEvent {}
144
+ s .pendingNetworkEvents = []* proto. TelemetryEvent {}
146
145
go s .writeTelemetryEvents (events )
147
146
}
148
147
149
148
// PeriodicTelemetryBatcher starts a goroutine to periodically send telemetry
150
149
// events to the telemetry backend. The returned function is a cleanup function
151
- // that should be called when the service is no longer needed.
150
+ // that should be called when the service is no longer needed. Calling this more
151
+ // than once will panic.
152
152
//
153
153
// Note: calling the returned function does not unblock any in-flight calls to
154
154
// the underlying telemetry backend that come from PostTelemetry due to
@@ -162,9 +162,16 @@ func (s *DRPCService) PeriodicTelemetryBatcher() func() {
162
162
return func () {}
163
163
}
164
164
165
+ s .mu .Lock ()
166
+ defer s .mu .Unlock ()
167
+ if s .networkEventBatchTicker != nil {
168
+ panic ("PeriodicTelemetryBatcher called more than once" )
169
+ }
170
+ ticker := time .NewTicker (s .NetworkTelemetryBatchFrequency )
171
+ s .networkEventBatchTicker = ticker
172
+
165
173
go func () {
166
174
defer close (done )
167
- ticker := time .NewTicker (s .NetworkTelemetryBatchFrequency )
168
175
defer ticker .Stop ()
169
176
170
177
for {
@@ -189,19 +196,16 @@ func (s *DRPCService) PostTelemetry(_ context.Context, req *proto.TelemetryReque
189
196
s .mu .Lock ()
190
197
defer s .mu .Unlock ()
191
198
192
- for _ , pEvent := range req .Events {
193
- tEvent , err := telemetry .NetworkEventFromProto (pEvent )
194
- if err != nil {
195
- // TODO(@deansheather): log? return an error?
196
- continue
197
- }
198
-
199
- s .pendingNetworkEvents = append (s .pendingNetworkEvents , tEvent )
199
+ for _ , event := range req .Events {
200
+ s .pendingNetworkEvents = append (s .pendingNetworkEvents , event )
200
201
201
202
if len (s .pendingNetworkEvents ) >= s .NetworkTelemetryBatchMaxSize {
202
203
events := s .pendingNetworkEvents
203
- s .pendingNetworkEvents = []telemetry. NetworkEvent {}
204
+ s .pendingNetworkEvents = []* proto. TelemetryEvent {}
204
205
// Perform the send in a goroutine to avoid blocking the DRPC call.
206
+ if s .networkEventBatchTicker != nil {
207
+ s .networkEventBatchTicker .Reset (s .NetworkTelemetryBatchFrequency )
208
+ }
205
209
go s .writeTelemetryEvents (events )
206
210
}
207
211
}
0 commit comments