8
8
"io"
9
9
"net"
10
10
"sync"
11
+ "sync/atomic"
11
12
"time"
12
13
13
14
"github.com/hashicorp/yamux"
@@ -60,6 +61,8 @@ func Listen(ctx context.Context, log slog.Logger, broker string, tcpProxy proxy.
60
61
for {
61
62
err := <- ch
62
63
if errors .Is (err , io .EOF ) || errors .Is (err , yamux .ErrKeepAliveTimeout ) {
64
+ l .log .Warn (ctx , "disconnected from broker" , slog .Error (err ))
65
+
63
66
// If we hit an EOF, then the connection to the broker
64
67
// was interrupted. We'll take a short break then dial
65
68
// again.
@@ -96,12 +99,16 @@ type listener struct {
96
99
ws * websocket.Conn
97
100
connClosers []io.Closer
98
101
connClosersMut sync.Mutex
102
+
103
+ nextConnNumber int64
99
104
}
100
105
101
106
func (l * listener ) dial (ctx context.Context ) (<- chan error , error ) {
107
+ l .log .Info (ctx , "connecting to broker" , slog .F ("broker_url" , l .broker ))
102
108
if l .ws != nil {
103
109
_ = l .ws .Close (websocket .StatusNormalClosure , "new connection inbound" )
104
110
}
111
+
105
112
conn , resp , err := websocket .Dial (ctx , l .broker , nil )
106
113
if err != nil {
107
114
if resp != nil {
@@ -110,13 +117,16 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
110
117
return nil , err
111
118
}
112
119
l .ws = conn
120
+
113
121
nconn := websocket .NetConn (ctx , conn , websocket .MessageBinary )
114
122
config := yamux .DefaultConfig ()
115
123
config .LogOutput = io .Discard
116
124
session , err := yamux .Server (nconn , config )
117
125
if err != nil {
118
126
return nil , fmt .Errorf ("create multiplex: %w" , err )
119
127
}
128
+
129
+ l .log .Info (ctx , "broker connection established" )
120
130
errCh := make (chan error )
121
131
go func () {
122
132
defer close (errCh )
@@ -126,19 +136,21 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
126
136
errCh <- err
127
137
break
128
138
}
129
- go l .negotiate (conn )
139
+ go l .negotiate (ctx , conn )
130
140
}
131
141
}()
142
+
132
143
return errCh , nil
133
144
}
134
145
135
146
// Negotiates the handshake protocol over the connection provided.
136
147
// This functions control-flow is important to readability,
137
148
// so the cognitive overload linter has been disabled.
138
149
// nolint:gocognit,nestif
139
- func (l * listener ) negotiate (conn net.Conn ) {
150
+ func (l * listener ) negotiate (ctx context. Context , conn net.Conn ) {
140
151
var (
141
152
err error
153
+ id = atomic .AddInt64 (& l .nextConnNumber , 1 )
142
154
decoder = json .NewDecoder (conn )
143
155
rtc * webrtc.PeerConnection
144
156
// If candidates are sent before an offer, we place them here.
@@ -148,6 +160,8 @@ func (l *listener) negotiate(conn net.Conn) {
148
160
// Sends the error provided then closes the connection.
149
161
// If RTC isn't connected, we'll close it.
150
162
closeError = func (err error ) {
163
+ l .log .Warn (ctx , "negotiation error, closing connection" , slog .Error (err ))
164
+
151
165
d , _ := json .Marshal (& BrokerMessage {
152
166
Error : err .Error (),
153
167
})
@@ -162,13 +176,17 @@ func (l *listener) negotiate(conn net.Conn) {
162
176
}
163
177
)
164
178
179
+ ctx = slog .With (ctx , slog .F ("conn_id" , id ))
180
+ l .log .Info (ctx , "accepted new session from broker connection, negotiating" )
181
+
165
182
for {
166
183
var msg BrokerMessage
167
184
err = decoder .Decode (& msg )
168
185
if err != nil {
169
186
closeError (err )
170
187
return
171
188
}
189
+ l .log .Debug (ctx , "received broker message" , slog .F ("msg" , msg ))
172
190
173
191
if msg .Candidate != "" {
174
192
c := webrtc.ICECandidateInit {
@@ -180,6 +198,7 @@ func (l *listener) negotiate(conn net.Conn) {
180
198
continue
181
199
}
182
200
201
+ l .log .Debug (ctx , "adding ICE candidate" , slog .F ("c" , c ))
183
202
err = rtc .AddICECandidate (c )
184
203
if err != nil {
185
204
closeError (fmt .Errorf ("accept ice candidate: %w" , err ))
@@ -198,59 +217,73 @@ func (l *listener) negotiate(conn net.Conn) {
198
217
// so it will not validate.
199
218
continue
200
219
}
220
+
221
+ l .log .Debug (ctx , "validating ICE server" , slog .F ("s" , server ))
201
222
err = DialICE (server , nil )
202
223
if err != nil {
203
224
closeError (fmt .Errorf ("dial server %+v: %w" , server .URLs , err ))
204
225
return
205
226
}
206
227
}
228
+
207
229
rtc , err = newPeerConnection (msg .Servers , l .tcpProxy )
208
230
if err != nil {
209
231
closeError (err )
210
232
return
211
233
}
212
234
rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
235
+ l .log .Debug (ctx , "connection state change" , slog .F ("state" , pcs .String ()))
213
236
if pcs == webrtc .PeerConnectionStateConnecting {
214
237
return
215
238
}
216
239
_ = conn .Close ()
217
240
})
241
+
218
242
flushCandidates := proxyICECandidates (rtc , conn )
219
243
l .connClosersMut .Lock ()
220
244
l .connClosers = append (l .connClosers , rtc )
221
245
l .connClosersMut .Unlock ()
222
- rtc .OnDataChannel (l .handle (msg ))
246
+ rtc .OnDataChannel (l .handle (ctx , msg ))
247
+
248
+ l .log .Debug (ctx , "set remote description" , slog .F ("offer" , * msg .Offer ))
223
249
err = rtc .SetRemoteDescription (* msg .Offer )
224
250
if err != nil {
225
251
closeError (fmt .Errorf ("apply offer: %w" , err ))
226
252
return
227
253
}
254
+
228
255
answer , err := rtc .CreateAnswer (nil )
229
256
if err != nil {
230
257
closeError (fmt .Errorf ("create answer: %w" , err ))
231
258
return
232
259
}
260
+
261
+ l .log .Debug (ctx , "set local description" , slog .F ("answer" , answer ))
233
262
err = rtc .SetLocalDescription (answer )
234
263
if err != nil {
235
264
closeError (fmt .Errorf ("set local answer: %w" , err ))
236
265
return
237
266
}
238
267
flushCandidates ()
239
268
240
- data , err := json . Marshal ( & BrokerMessage {
269
+ bmsg := & BrokerMessage {
241
270
Answer : rtc .LocalDescription (),
242
- })
271
+ }
272
+ data , err := json .Marshal (bmsg )
243
273
if err != nil {
244
274
closeError (fmt .Errorf ("marshal: %w" , err ))
245
275
return
246
276
}
277
+
278
+ l .log .Debug (ctx , "writing message" , slog .F ("msg" , bmsg ))
247
279
_ , err = conn .Write (data )
248
280
if err != nil {
249
281
closeError (fmt .Errorf ("write: %w" , err ))
250
282
return
251
283
}
252
284
253
285
for _ , candidate := range pendingCandidates {
286
+ l .log .Debug (ctx , "adding pending ICE candidate" , slog .F ("c" , candidate ))
254
287
err = rtc .AddICECandidate (candidate )
255
288
if err != nil {
256
289
closeError (fmt .Errorf ("add pending candidate: %w" , err ))
@@ -262,19 +295,25 @@ func (l *listener) negotiate(conn net.Conn) {
262
295
}
263
296
}
264
297
265
- func (l * listener ) handle (msg BrokerMessage ) func (dc * webrtc.DataChannel ) {
298
+ // nolint:gocognit
299
+ func (l * listener ) handle (ctx context.Context , msg BrokerMessage ) func (dc * webrtc.DataChannel ) {
266
300
return func (dc * webrtc.DataChannel ) {
267
301
if dc .Protocol () == controlChannel {
268
302
// The control channel handles pings.
269
303
dc .OnOpen (func () {
304
+ l .log .Debug (ctx , "control channel open" )
270
305
rw , err := dc .Detach ()
271
306
if err != nil {
272
307
return
273
308
}
274
309
// We'll read and write back a single byte for ping/pongin'.
275
310
d := make ([]byte , 1 )
276
311
for {
312
+ l .log .Debug (ctx , "sending ping" )
277
313
_ , err = rw .Read (d )
314
+ if err != nil {
315
+ l .log .Debug (ctx , "reading ping response failed" , slog .Error (err ))
316
+ }
278
317
if errors .Is (err , io .EOF ) {
279
318
return
280
319
}
@@ -287,25 +326,36 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
287
326
return
288
327
}
289
328
329
+ ctx = slog .With (ctx ,
330
+ slog .F ("dc_id" , dc .ID ()),
331
+ slog .F ("dc_label" , dc .Label ()),
332
+ slog .F ("dc_proto" , dc .Protocol ()),
333
+ )
334
+
290
335
dc .OnOpen (func () {
336
+ l .log .Info (ctx , "data channel opened" )
291
337
rw , err := dc .Detach ()
292
338
if err != nil {
293
339
return
294
340
}
295
341
296
342
var init DialChannelResponse
297
343
sendInitMessage := func () {
344
+ l .log .Debug (ctx , "sending dc init message" , slog .F ("msg" , init ))
298
345
initData , err := json .Marshal (& init )
299
346
if err != nil {
347
+ l .log .Debug (ctx , "failed to marshal dc init message" , slog .Error (err ))
300
348
rw .Close ()
301
349
return
302
350
}
303
351
_ , err = rw .Write (initData )
304
352
if err != nil {
353
+ l .log .Debug (ctx , "failed to write dc init message" , slog .Error (err ))
305
354
return
306
355
}
307
356
if init .Err != "" {
308
357
// If an error occurred, we're safe to close the connection.
358
+ l .log .Debug (ctx , "closing data channel due to error" , slog .F ("msg" , init .Err ))
309
359
dc .Close ()
310
360
return
311
361
}
@@ -323,8 +373,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
323
373
return
324
374
}
325
375
376
+ l .log .Debug (ctx , "dialing remote address" , slog .F ("network" , network ), slog .F ("addr" , addr ))
326
377
nc , err := net .Dial (network , addr )
327
378
if err != nil {
379
+ l .log .Debug (ctx , "failed to dial remote address" )
328
380
init .Code = CodeDialErr
329
381
init .Err = err .Error ()
330
382
if op , ok := err .(* net.OpError ); ok {
@@ -336,8 +388,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
336
388
if init .Err != "" {
337
389
return
338
390
}
391
+
339
392
// Must wrap the data channel inside this connection
340
393
// for buffering from the dialed endpoint to the client.
394
+ l .log .Debug (ctx , "data channel initialized, tunnelling" )
341
395
co := & dataChannelConn {
342
396
addr : nil ,
343
397
dc : dc ,
@@ -357,6 +411,8 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
357
411
358
412
// Close closes the broker socket and all created RTC connections.
359
413
func (l * listener ) Close () error {
414
+ l .log .Info (context .Background (), "listener closed" )
415
+
360
416
l .connClosersMut .Lock ()
361
417
for _ , connCloser := range l .connClosers {
362
418
// We can ignore the error here... it doesn't
0 commit comments