@@ -13,7 +13,10 @@ import (
13
13
"github.com/hashicorp/yamux"
14
14
"github.com/stretchr/testify/assert"
15
15
"github.com/stretchr/testify/require"
16
+ "golang.org/x/xerrors"
16
17
"nhooyr.io/websocket"
18
+ "storj.io/drpc"
19
+ "storj.io/drpc/drpcerr"
17
20
"tailscale.com/tailcfg"
18
21
19
22
"cdr.dev/slog"
@@ -139,6 +142,140 @@ func TestTailnetAPIConnector_UplevelVersion(t *testing.T) {
139
142
require .NotEmpty (t , sdkErr .Helper )
140
143
}
141
144
145
+ func TestTailnetAPIConnector_TelemetrySuccess (t * testing.T ) {
146
+ t .Parallel ()
147
+ ctx := testutil .Context (t , testutil .WaitShort )
148
+ logger := slogtest .Make (t , nil ).Leveled (slog .LevelDebug )
149
+ agentID := uuid.UUID {0x55 }
150
+ clientID := uuid.UUID {0x66 }
151
+ fCoord := tailnettest .NewFakeCoordinator ()
152
+ var coord tailnet.Coordinator = fCoord
153
+ coordPtr := atomic.Pointer [tailnet.Coordinator ]{}
154
+ coordPtr .Store (& coord )
155
+ derpMapCh := make (chan * tailcfg.DERPMap )
156
+ defer close (derpMapCh )
157
+ eventCh := make (chan []* proto.TelemetryEvent , 1 )
158
+ svc , err := tailnet .NewClientService (tailnet.ClientServiceOptions {
159
+ Logger : logger ,
160
+ CoordPtr : & coordPtr ,
161
+ DERPMapUpdateFrequency : time .Millisecond ,
162
+ DERPMapFn : func () * tailcfg.DERPMap { return <- derpMapCh },
163
+ NetworkTelemetryHandler : func (batch []* proto.TelemetryEvent ) {
164
+ eventCh <- batch
165
+ },
166
+ })
167
+ require .NoError (t , err )
168
+
169
+ svr := httptest .NewServer (http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
170
+ sws , err := websocket .Accept (w , r , nil )
171
+ if ! assert .NoError (t , err ) {
172
+ return
173
+ }
174
+ ctx , nc := codersdk .WebsocketNetConn (r .Context (), sws , websocket .MessageBinary )
175
+ err = svc .ServeConnV2 (ctx , nc , tailnet.StreamID {
176
+ Name : "client" ,
177
+ ID : clientID ,
178
+ Auth : tailnet.ClientCoordinateeAuth {AgentID : agentID },
179
+ })
180
+ assert .NoError (t , err )
181
+ }))
182
+
183
+ fConn := newFakeTailnetConn ()
184
+
185
+ uut := newTailnetAPIConnector (ctx , logger , agentID , svr .URL , & websocket.DialOptions {})
186
+ uut .runConnector (fConn )
187
+ require .Eventually (t , func () bool {
188
+ uut .clientMu .Lock ()
189
+ defer uut .clientMu .Unlock ()
190
+ return uut .client != nil
191
+ }, testutil .WaitShort , testutil .IntervalFast )
192
+
193
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {
194
+ Id : []byte ("test event" ),
195
+ })
196
+
197
+ testEvents := testutil .RequireRecvCtx (ctx , t , eventCh )
198
+
199
+ require .Len (t , testEvents , 1 )
200
+ require .Equal (t , []byte ("test event" ), testEvents [0 ].Id )
201
+ }
202
+
203
+ // Server doesn't support telemetry / server unimplemented telemetry
204
+
205
+ func TestTailnetAPIConnector_TelemetryUnimplemented (t * testing.T ) {
206
+ t .Parallel ()
207
+ ctx := testutil .Context (t , testutil .WaitShort )
208
+ logger := slogtest .Make (t , nil ).Leveled (slog .LevelDebug )
209
+ agentID := uuid.UUID {0x55 }
210
+ fConn := newFakeTailnetConn ()
211
+
212
+ fakeDRPCClient := newFakeDRPCClient ()
213
+ uut := & tailnetAPIConnector {
214
+ ctx : ctx ,
215
+ logger : logger ,
216
+ agentID : agentID ,
217
+ coordinateURL : "" ,
218
+ dialOptions : & websocket.DialOptions {},
219
+ conn : nil ,
220
+ connected : make (chan error , 1 ),
221
+ closed : make (chan struct {}),
222
+ customDialFn : func () (proto.DRPCTailnetClient , error ) {
223
+ return fakeDRPCClient , nil
224
+ },
225
+ }
226
+ uut .runConnector (fConn )
227
+ require .Eventually (t , func () bool {
228
+ uut .clientMu .Lock ()
229
+ defer uut .clientMu .Unlock ()
230
+ return uut .client != nil
231
+ }, testutil .WaitShort , testutil .IntervalFast )
232
+
233
+ fakeDRPCClient .telemeteryErorr = drpcerr .WithCode (xerrors .New ("Unimplemented" ), 0 )
234
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {})
235
+ require .False (t , uut .telemetryUnavailable .Load ())
236
+
237
+ fakeDRPCClient .telemeteryErorr = drpcerr .WithCode (xerrors .New ("Unimplemented" ), drpcerr .Unimplemented )
238
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {})
239
+ require .True (t , uut .telemetryUnavailable .Load ())
240
+ }
241
+
242
+ func TestTailnetAPIConnector_TelemetryNotRecognised (t * testing.T ) {
243
+ t .Parallel ()
244
+ ctx := testutil .Context (t , testutil .WaitShort )
245
+ logger := slogtest .Make (t , nil ).Leveled (slog .LevelDebug )
246
+ agentID := uuid.UUID {0x55 }
247
+ fConn := newFakeTailnetConn ()
248
+
249
+ fakeDRPCClient := newFakeDRPCClient ()
250
+ uut := & tailnetAPIConnector {
251
+ ctx : ctx ,
252
+ logger : logger ,
253
+ agentID : agentID ,
254
+ coordinateURL : "" ,
255
+ dialOptions : & websocket.DialOptions {},
256
+ conn : nil ,
257
+ connected : make (chan error , 1 ),
258
+ closed : make (chan struct {}),
259
+ customDialFn : func () (proto.DRPCTailnetClient , error ) {
260
+ return fakeDRPCClient , nil
261
+ },
262
+ }
263
+ uut .runConnector (fConn )
264
+ require .Eventually (t , func () bool {
265
+ uut .clientMu .Lock ()
266
+ defer uut .clientMu .Unlock ()
267
+ return uut .client != nil
268
+ }, testutil .WaitShort , testutil .IntervalFast )
269
+
270
+ fakeDRPCClient .telemeteryErorr = drpc .ProtocolError .New ("Protocol Error" )
271
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {})
272
+ require .False (t , uut .telemetryUnavailable .Load ())
273
+
274
+ fakeDRPCClient .telemeteryErorr = drpc .ProtocolError .New ("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry" )
275
+ uut .SendTelemetryEvent (& proto.TelemetryEvent {})
276
+ require .True (t , uut .telemetryUnavailable .Load ())
277
+ }
278
+
142
279
type fakeTailnetConn struct {}
143
280
144
281
func (* fakeTailnetConn ) UpdatePeers ([]* proto.CoordinateResponse_PeerUpdate ) error {
@@ -157,3 +294,120 @@ func (*fakeTailnetConn) SetTunnelDestination(uuid.UUID) {}
157
294
func newFakeTailnetConn () * fakeTailnetConn {
158
295
return & fakeTailnetConn {}
159
296
}
297
+
298
+ type fakeDRPCClient struct {
299
+ telemeteryErorr error
300
+ fakeDRPPCMapStream
301
+ }
302
+
303
+ var _ proto.DRPCTailnetClient = & fakeDRPCClient {}
304
+
305
+ func newFakeDRPCClient () * fakeDRPCClient {
306
+ return & fakeDRPCClient {
307
+ fakeDRPPCMapStream : fakeDRPPCMapStream {
308
+ fakeDRPCStream : fakeDRPCStream {
309
+ ch : make (chan struct {}),
310
+ },
311
+ },
312
+ }
313
+ }
314
+
315
+ // Coordinate implements proto.DRPCTailnetClient.
316
+ func (f * fakeDRPCClient ) Coordinate (_ context.Context ) (proto.DRPCTailnet_CoordinateClient , error ) {
317
+ return & f .fakeDRPCStream , nil
318
+ }
319
+
320
+ // DRPCConn implements proto.DRPCTailnetClient.
321
+ func (* fakeDRPCClient ) DRPCConn () drpc.Conn {
322
+ return & fakeDRPCConn {}
323
+ }
324
+
325
+ // PostTelemetry implements proto.DRPCTailnetClient.
326
+ func (f * fakeDRPCClient ) PostTelemetry (_ context.Context , in * proto.TelemetryRequest ) (* proto.TelemetryResponse , error ) {
327
+ return nil , f .telemeteryErorr
328
+ }
329
+
330
+ // StreamDERPMaps implements proto.DRPCTailnetClient.
331
+ func (f * fakeDRPCClient ) StreamDERPMaps (_ context.Context , _ * proto.StreamDERPMapsRequest ) (proto.DRPCTailnet_StreamDERPMapsClient , error ) {
332
+ return & f .fakeDRPPCMapStream , nil
333
+ }
334
+
335
+ type fakeDRPCConn struct {}
336
+
337
+ var _ drpc.Conn = & fakeDRPCConn {}
338
+
339
+ // Close implements drpc.Conn.
340
+ func (* fakeDRPCConn ) Close () error {
341
+ return nil
342
+ }
343
+
344
+ // Closed implements drpc.Conn.
345
+ func (* fakeDRPCConn ) Closed () <- chan struct {} {
346
+ return nil
347
+ }
348
+
349
+ // Invoke implements drpc.Conn.
350
+ func (* fakeDRPCConn ) Invoke (_ context.Context , _ string , _ drpc.Encoding , _ drpc.Message , _ drpc.Message ) error {
351
+ return nil
352
+ }
353
+
354
+ // NewStream implements drpc.Conn.
355
+ func (* fakeDRPCConn ) NewStream (_ context.Context , _ string , _ drpc.Encoding ) (drpc.Stream , error ) {
356
+ return nil , nil
357
+ }
358
+
359
+ type fakeDRPCStream struct {
360
+ ch chan struct {}
361
+ }
362
+
363
+ var _ proto.DRPCTailnet_CoordinateClient = & fakeDRPCStream {}
364
+
365
+ // Close implements proto.DRPCTailnet_CoordinateClient.
366
+ func (f * fakeDRPCStream ) Close () error {
367
+ close (f .ch )
368
+ return nil
369
+ }
370
+
371
+ // CloseSend implements proto.DRPCTailnet_CoordinateClient.
372
+ func (* fakeDRPCStream ) CloseSend () error {
373
+ return nil
374
+ }
375
+
376
+ // Context implements proto.DRPCTailnet_CoordinateClient.
377
+ func (* fakeDRPCStream ) Context () context.Context {
378
+ return nil
379
+ }
380
+
381
+ // MsgRecv implements proto.DRPCTailnet_CoordinateClient.
382
+ func (* fakeDRPCStream ) MsgRecv (_ drpc.Message , _ drpc.Encoding ) error {
383
+ return nil
384
+ }
385
+
386
+ // MsgSend implements proto.DRPCTailnet_CoordinateClient.
387
+ func (* fakeDRPCStream ) MsgSend (_ drpc.Message , _ drpc.Encoding ) error {
388
+ return nil
389
+ }
390
+
391
+ // Recv implements proto.DRPCTailnet_CoordinateClient.
392
+ func (f * fakeDRPCStream ) Recv () (* proto.CoordinateResponse , error ) {
393
+ <- f .ch
394
+ return & proto.CoordinateResponse {}, nil
395
+ }
396
+
397
+ // Send implements proto.DRPCTailnet_CoordinateClient.
398
+ func (f * fakeDRPCStream ) Send (* proto.CoordinateRequest ) error {
399
+ <- f .ch
400
+ return nil
401
+ }
402
+
403
+ type fakeDRPPCMapStream struct {
404
+ fakeDRPCStream
405
+ }
406
+
407
+ var _ proto.DRPCTailnet_StreamDERPMapsClient = & fakeDRPPCMapStream {}
408
+
409
+ // Recv implements proto.DRPCTailnet_StreamDERPMapsClient.
410
+ func (f * fakeDRPPCMapStream ) Recv () (* proto.DERPMap , error ) {
411
+ <- f .fakeDRPCStream .ch
412
+ return & proto.DERPMap {}, nil
413
+ }
0 commit comments