Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Commit 6af7193

Browse files
committed
Add more logging to wsnet listener
1 parent b87b4c6 commit 6af7193

File tree

1 file changed

+62
-6
lines changed

1 file changed

+62
-6
lines changed

wsnet/listen.go

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net"
1010
"net/url"
1111
"sync"
12+
"sync/atomic"
1213
"time"
1314

1415
"github.com/hashicorp/yamux"
@@ -61,6 +62,8 @@ func Listen(ctx context.Context, log slog.Logger, broker string, turnProxyAuthTo
6162
for {
6263
err := <-ch
6364
if errors.Is(err, io.EOF) || errors.Is(err, yamux.ErrKeepAliveTimeout) {
65+
l.log.Warn(ctx, "disconnected from broker", slog.Error(err))
66+
6467
// If we hit an EOF, then the connection to the broker
6568
// was interrupted. We'll take a short break then dial
6669
// again.
@@ -97,12 +100,16 @@ type listener struct {
97100
ws *websocket.Conn
98101
connClosers []io.Closer
99102
connClosersMut sync.Mutex
103+
104+
nextConnNumber int64
100105
}
101106

102107
func (l *listener) dial(ctx context.Context) (<-chan error, error) {
108+
l.log.Info(ctx, "connecting to broker", slog.F("broker_url", l.broker))
103109
if l.ws != nil {
104110
_ = l.ws.Close(websocket.StatusNormalClosure, "new connection inbound")
105111
}
112+
106113
conn, resp, err := websocket.Dial(ctx, l.broker, nil)
107114
if err != nil {
108115
if resp != nil {
@@ -111,13 +118,16 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
111118
return nil, err
112119
}
113120
l.ws = conn
121+
114122
nconn := websocket.NetConn(ctx, conn, websocket.MessageBinary)
115123
config := yamux.DefaultConfig()
116124
config.LogOutput = io.Discard
117125
session, err := yamux.Server(nconn, config)
118126
if err != nil {
119127
return nil, fmt.Errorf("create multiplex: %w", err)
120128
}
129+
130+
l.log.Info(ctx, "broker connection established")
121131
errCh := make(chan error)
122132
go func() {
123133
defer close(errCh)
@@ -127,19 +137,21 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
127137
errCh <- err
128138
break
129139
}
130-
go l.negotiate(conn)
140+
go l.negotiate(ctx, conn)
131141
}
132142
}()
143+
133144
return errCh, nil
134145
}
135146

136147
// Negotiates the handshake protocol over the connection provided.
137148
// This functions control-flow is important to readability,
138149
// so the cognitive overload linter has been disabled.
139150
// nolint:gocognit,nestif
140-
func (l *listener) negotiate(conn net.Conn) {
151+
func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
141152
var (
142153
err error
154+
id = atomic.AddInt64(&l.nextConnNumber, 1)
143155
decoder = json.NewDecoder(conn)
144156
rtc *webrtc.PeerConnection
145157
// If candidates are sent before an offer, we place them here.
@@ -149,6 +161,8 @@ func (l *listener) negotiate(conn net.Conn) {
149161
// Sends the error provided then closes the connection.
150162
// If RTC isn't connected, we'll close it.
151163
closeError = func(err error) {
164+
l.log.Warn(ctx, "negotiation error, closing connection", slog.Error(err))
165+
152166
d, _ := json.Marshal(&BrokerMessage{
153167
Error: err.Error(),
154168
})
@@ -163,13 +177,17 @@ func (l *listener) negotiate(conn net.Conn) {
163177
}
164178
)
165179

180+
ctx = slog.With(ctx, slog.F("conn_id", id))
181+
l.log.Info(ctx, "accepted new session from broker connection, negotiating")
182+
166183
for {
167184
var msg BrokerMessage
168185
err = decoder.Decode(&msg)
169186
if err != nil {
170187
closeError(err)
171188
return
172189
}
190+
l.log.Debug(ctx, "received broker message", slog.F("msg", msg))
173191

174192
if msg.Candidate != "" {
175193
c := webrtc.ICECandidateInit{
@@ -181,6 +199,7 @@ func (l *listener) negotiate(conn net.Conn) {
181199
continue
182200
}
183201

202+
l.log.Debug(ctx, "adding ICE candidate", slog.F("c", c))
184203
err = rtc.AddICECandidate(c)
185204
if err != nil {
186205
closeError(fmt.Errorf("accept ice candidate: %w", err))
@@ -199,12 +218,15 @@ func (l *listener) negotiate(conn net.Conn) {
199218
// so it will not validate.
200219
continue
201220
}
221+
222+
l.log.Debug(ctx, "validating ICE server", slog.F("s", server))
202223
err = DialICE(server, nil)
203224
if err != nil {
204225
closeError(fmt.Errorf("dial server %+v: %w", server.URLs, err))
205226
return
206227
}
207228
}
229+
208230
var turnProxy proxy.Dialer
209231
if msg.TURNProxyURL != "" {
210232
u, err := url.Parse(msg.TURNProxyURL)
@@ -223,47 +245,58 @@ func (l *listener) negotiate(conn net.Conn) {
223245
return
224246
}
225247
rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
248+
l.log.Debug(ctx, "connection state change", slog.F("state", pcs.String()))
226249
if pcs == webrtc.PeerConnectionStateConnecting {
227250
return
228251
}
229252
_ = conn.Close()
230253
})
254+
231255
flushCandidates := proxyICECandidates(rtc, conn)
232256
l.connClosersMut.Lock()
233257
l.connClosers = append(l.connClosers, rtc)
234258
l.connClosersMut.Unlock()
235-
rtc.OnDataChannel(l.handle(msg))
259+
rtc.OnDataChannel(l.handle(ctx, msg))
260+
261+
l.log.Debug(ctx, "set remote description", slog.F("offer", *msg.Offer))
236262
err = rtc.SetRemoteDescription(*msg.Offer)
237263
if err != nil {
238264
closeError(fmt.Errorf("apply offer: %w", err))
239265
return
240266
}
267+
241268
answer, err := rtc.CreateAnswer(nil)
242269
if err != nil {
243270
closeError(fmt.Errorf("create answer: %w", err))
244271
return
245272
}
273+
274+
l.log.Debug(ctx, "set local description", slog.F("answer", answer))
246275
err = rtc.SetLocalDescription(answer)
247276
if err != nil {
248277
closeError(fmt.Errorf("set local answer: %w", err))
249278
return
250279
}
251280
flushCandidates()
252281

253-
data, err := json.Marshal(&BrokerMessage{
282+
bmsg := &BrokerMessage{
254283
Answer: rtc.LocalDescription(),
255-
})
284+
}
285+
data, err := json.Marshal(bmsg)
256286
if err != nil {
257287
closeError(fmt.Errorf("marshal: %w", err))
258288
return
259289
}
290+
291+
l.log.Debug(ctx, "writing message", slog.F("msg", bmsg))
260292
_, err = conn.Write(data)
261293
if err != nil {
262294
closeError(fmt.Errorf("write: %w", err))
263295
return
264296
}
265297

266298
for _, candidate := range pendingCandidates {
299+
l.log.Debug(ctx, "adding pending ICE candidate", slog.F("c", candidate))
267300
err = rtc.AddICECandidate(candidate)
268301
if err != nil {
269302
closeError(fmt.Errorf("add pending candidate: %w", err))
@@ -275,19 +308,25 @@ func (l *listener) negotiate(conn net.Conn) {
275308
}
276309
}
277310

278-
func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
311+
// nolint:gocognit
312+
func (l *listener) handle(ctx context.Context, msg BrokerMessage) func(dc *webrtc.DataChannel) {
279313
return func(dc *webrtc.DataChannel) {
280314
if dc.Protocol() == controlChannel {
281315
// The control channel handles pings.
282316
dc.OnOpen(func() {
317+
l.log.Debug(ctx, "control channel open")
283318
rw, err := dc.Detach()
284319
if err != nil {
285320
return
286321
}
287322
// We'll read and write back a single byte for ping/pongin'.
288323
d := make([]byte, 1)
289324
for {
325+
l.log.Debug(ctx, "sending ping")
290326
_, err = rw.Read(d)
327+
if err != nil {
328+
l.log.Debug(ctx, "reading ping response failed", slog.Error(err))
329+
}
291330
if errors.Is(err, io.EOF) {
292331
return
293332
}
@@ -300,25 +339,36 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
300339
return
301340
}
302341

342+
ctx = slog.With(ctx,
343+
slog.F("dc_id", dc.ID()),
344+
slog.F("dc_label", dc.Label()),
345+
slog.F("dc_proto", dc.Protocol()),
346+
)
347+
303348
dc.OnOpen(func() {
349+
l.log.Info(ctx, "data channel opened")
304350
rw, err := dc.Detach()
305351
if err != nil {
306352
return
307353
}
308354

309355
var init DialChannelResponse
310356
sendInitMessage := func() {
357+
l.log.Debug(ctx, "sending dc init message", slog.F("msg", init))
311358
initData, err := json.Marshal(&init)
312359
if err != nil {
360+
l.log.Debug(ctx, "failed to marshal dc init message", slog.Error(err))
313361
rw.Close()
314362
return
315363
}
316364
_, err = rw.Write(initData)
317365
if err != nil {
366+
l.log.Debug(ctx, "failed to write dc init message", slog.Error(err))
318367
return
319368
}
320369
if init.Err != "" {
321370
// If an error occurred, we're safe to close the connection.
371+
l.log.Debug(ctx, "closing data channel due to error", slog.F("msg", init.Err))
322372
dc.Close()
323373
return
324374
}
@@ -336,8 +386,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
336386
return
337387
}
338388

389+
l.log.Debug(ctx, "dialing remote address", slog.F("network", network), slog.F("addr", addr))
339390
nc, err := net.Dial(network, addr)
340391
if err != nil {
392+
l.log.Debug(ctx, "failed to dial remote address")
341393
init.Code = CodeDialErr
342394
init.Err = err.Error()
343395
if op, ok := err.(*net.OpError); ok {
@@ -349,8 +401,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
349401
if init.Err != "" {
350402
return
351403
}
404+
352405
// Must wrap the data channel inside this connection
353406
// for buffering from the dialed endpoint to the client.
407+
l.log.Debug(ctx, "data channel initialized, tunnelling")
354408
co := &dataChannelConn{
355409
addr: nil,
356410
dc: dc,
@@ -370,6 +424,8 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
370424

371425
// Close closes the broker socket and all created RTC connections.
372426
func (l *listener) Close() error {
427+
l.log.Info(context.Background(), "listener closed")
428+
373429
l.connClosersMut.Lock()
374430
for _, connCloser := range l.connClosers {
375431
// We can ignore the error here... it doesn't

0 commit comments

Comments
 (0)