Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Commit 27567c4

Browse files
committed
Add more logging to wsnet listener
1 parent 38b6e71 commit 27567c4

File tree

1 file changed

+62
-6
lines changed

1 file changed

+62
-6
lines changed

wsnet/listen.go

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"net"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/hashicorp/yamux"
@@ -60,6 +61,8 @@ func Listen(ctx context.Context, log slog.Logger, broker string, tcpProxy proxy.
6061
for {
6162
err := <-ch
6263
if errors.Is(err, io.EOF) || errors.Is(err, yamux.ErrKeepAliveTimeout) {
64+
l.log.Warn(ctx, "disconnected from broker", slog.Error(err))
65+
6366
// If we hit an EOF, then the connection to the broker
6467
// was interrupted. We'll take a short break then dial
6568
// again.
@@ -96,12 +99,16 @@ type listener struct {
9699
ws *websocket.Conn
97100
connClosers []io.Closer
98101
connClosersMut sync.Mutex
102+
103+
nextConnNumber int64
99104
}
100105

101106
func (l *listener) dial(ctx context.Context) (<-chan error, error) {
107+
l.log.Info(ctx, "connecting to broker", slog.F("broker_url", l.broker))
102108
if l.ws != nil {
103109
_ = l.ws.Close(websocket.StatusNormalClosure, "new connection inbound")
104110
}
111+
105112
conn, resp, err := websocket.Dial(ctx, l.broker, nil)
106113
if err != nil {
107114
if resp != nil {
@@ -110,13 +117,16 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
110117
return nil, err
111118
}
112119
l.ws = conn
120+
113121
nconn := websocket.NetConn(ctx, conn, websocket.MessageBinary)
114122
config := yamux.DefaultConfig()
115123
config.LogOutput = io.Discard
116124
session, err := yamux.Server(nconn, config)
117125
if err != nil {
118126
return nil, fmt.Errorf("create multiplex: %w", err)
119127
}
128+
129+
l.log.Info(ctx, "broker connection established")
120130
errCh := make(chan error)
121131
go func() {
122132
defer close(errCh)
@@ -126,19 +136,21 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
126136
errCh <- err
127137
break
128138
}
129-
go l.negotiate(conn)
139+
go l.negotiate(ctx, conn)
130140
}
131141
}()
142+
132143
return errCh, nil
133144
}
134145

135146
// Negotiates the handshake protocol over the connection provided.
136147
// This functions control-flow is important to readability,
137148
// so the cognitive overload linter has been disabled.
138149
// nolint:gocognit,nestif
139-
func (l *listener) negotiate(conn net.Conn) {
150+
func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
140151
var (
141152
err error
153+
id = atomic.AddInt64(&l.nextConnNumber, 1)
142154
decoder = json.NewDecoder(conn)
143155
rtc *webrtc.PeerConnection
144156
// If candidates are sent before an offer, we place them here.
@@ -148,6 +160,8 @@ func (l *listener) negotiate(conn net.Conn) {
148160
// Sends the error provided then closes the connection.
149161
// If RTC isn't connected, we'll close it.
150162
closeError = func(err error) {
163+
l.log.Warn(ctx, "negotiation error, closing connection", slog.Error(err))
164+
151165
d, _ := json.Marshal(&BrokerMessage{
152166
Error: err.Error(),
153167
})
@@ -162,13 +176,17 @@ func (l *listener) negotiate(conn net.Conn) {
162176
}
163177
)
164178

179+
ctx = slog.With(ctx, slog.F("conn_id", id))
180+
l.log.Info(ctx, "accepted new session from broker connection, negotiating")
181+
165182
for {
166183
var msg BrokerMessage
167184
err = decoder.Decode(&msg)
168185
if err != nil {
169186
closeError(err)
170187
return
171188
}
189+
l.log.Debug(ctx, "received broker message", slog.F("msg", msg))
172190

173191
if msg.Candidate != "" {
174192
c := webrtc.ICECandidateInit{
@@ -180,6 +198,7 @@ func (l *listener) negotiate(conn net.Conn) {
180198
continue
181199
}
182200

201+
l.log.Debug(ctx, "adding ICE candidate", slog.F("c", c))
183202
err = rtc.AddICECandidate(c)
184203
if err != nil {
185204
closeError(fmt.Errorf("accept ice candidate: %w", err))
@@ -198,59 +217,73 @@ func (l *listener) negotiate(conn net.Conn) {
198217
// so it will not validate.
199218
continue
200219
}
220+
221+
l.log.Debug(ctx, "validating ICE server", slog.F("s", server))
201222
err = DialICE(server, nil)
202223
if err != nil {
203224
closeError(fmt.Errorf("dial server %+v: %w", server.URLs, err))
204225
return
205226
}
206227
}
228+
207229
rtc, err = newPeerConnection(msg.Servers, l.tcpProxy)
208230
if err != nil {
209231
closeError(err)
210232
return
211233
}
212234
rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
235+
l.log.Debug(ctx, "connection state change", slog.F("state", pcs.String()))
213236
if pcs == webrtc.PeerConnectionStateConnecting {
214237
return
215238
}
216239
_ = conn.Close()
217240
})
241+
218242
flushCandidates := proxyICECandidates(rtc, conn)
219243
l.connClosersMut.Lock()
220244
l.connClosers = append(l.connClosers, rtc)
221245
l.connClosersMut.Unlock()
222-
rtc.OnDataChannel(l.handle(msg))
246+
rtc.OnDataChannel(l.handle(ctx, msg))
247+
248+
l.log.Debug(ctx, "set remote description", slog.F("offer", *msg.Offer))
223249
err = rtc.SetRemoteDescription(*msg.Offer)
224250
if err != nil {
225251
closeError(fmt.Errorf("apply offer: %w", err))
226252
return
227253
}
254+
228255
answer, err := rtc.CreateAnswer(nil)
229256
if err != nil {
230257
closeError(fmt.Errorf("create answer: %w", err))
231258
return
232259
}
260+
261+
l.log.Debug(ctx, "set local description", slog.F("answer", answer))
233262
err = rtc.SetLocalDescription(answer)
234263
if err != nil {
235264
closeError(fmt.Errorf("set local answer: %w", err))
236265
return
237266
}
238267
flushCandidates()
239268

240-
data, err := json.Marshal(&BrokerMessage{
269+
bmsg := &BrokerMessage{
241270
Answer: rtc.LocalDescription(),
242-
})
271+
}
272+
data, err := json.Marshal(bmsg)
243273
if err != nil {
244274
closeError(fmt.Errorf("marshal: %w", err))
245275
return
246276
}
277+
278+
l.log.Debug(ctx, "writing message", slog.F("msg", bmsg))
247279
_, err = conn.Write(data)
248280
if err != nil {
249281
closeError(fmt.Errorf("write: %w", err))
250282
return
251283
}
252284

253285
for _, candidate := range pendingCandidates {
286+
l.log.Debug(ctx, "adding pending ICE candidate", slog.F("c", candidate))
254287
err = rtc.AddICECandidate(candidate)
255288
if err != nil {
256289
closeError(fmt.Errorf("add pending candidate: %w", err))
@@ -262,19 +295,25 @@ func (l *listener) negotiate(conn net.Conn) {
262295
}
263296
}
264297

265-
func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
298+
// nolint:gocognit
299+
func (l *listener) handle(ctx context.Context, msg BrokerMessage) func(dc *webrtc.DataChannel) {
266300
return func(dc *webrtc.DataChannel) {
267301
if dc.Protocol() == controlChannel {
268302
// The control channel handles pings.
269303
dc.OnOpen(func() {
304+
l.log.Debug(ctx, "control channel open")
270305
rw, err := dc.Detach()
271306
if err != nil {
272307
return
273308
}
274309
// We'll read and write back a single byte for ping/pongin'.
275310
d := make([]byte, 1)
276311
for {
312+
l.log.Debug(ctx, "sending ping")
277313
_, err = rw.Read(d)
314+
if err != nil {
315+
l.log.Debug(ctx, "reading ping response failed", slog.Error(err))
316+
}
278317
if errors.Is(err, io.EOF) {
279318
return
280319
}
@@ -287,25 +326,36 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
287326
return
288327
}
289328

329+
ctx = slog.With(ctx,
330+
slog.F("dc_id", dc.ID()),
331+
slog.F("dc_label", dc.Label()),
332+
slog.F("dc_proto", dc.Protocol()),
333+
)
334+
290335
dc.OnOpen(func() {
336+
l.log.Info(ctx, "data channel opened")
291337
rw, err := dc.Detach()
292338
if err != nil {
293339
return
294340
}
295341

296342
var init DialChannelResponse
297343
sendInitMessage := func() {
344+
l.log.Debug(ctx, "sending dc init message", slog.F("msg", init))
298345
initData, err := json.Marshal(&init)
299346
if err != nil {
347+
l.log.Debug(ctx, "failed to marshal dc init message", slog.Error(err))
300348
rw.Close()
301349
return
302350
}
303351
_, err = rw.Write(initData)
304352
if err != nil {
353+
l.log.Debug(ctx, "failed to write dc init message", slog.Error(err))
305354
return
306355
}
307356
if init.Err != "" {
308357
// If an error occurred, we're safe to close the connection.
358+
l.log.Debug(ctx, "closing data channel due to error", slog.F("msg", init.Err))
309359
dc.Close()
310360
return
311361
}
@@ -323,8 +373,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
323373
return
324374
}
325375

376+
l.log.Debug(ctx, "dialing remote address", slog.F("network", network), slog.F("addr", addr))
326377
nc, err := net.Dial(network, addr)
327378
if err != nil {
379+
l.log.Debug(ctx, "failed to dial remote address")
328380
init.Code = CodeDialErr
329381
init.Err = err.Error()
330382
if op, ok := err.(*net.OpError); ok {
@@ -336,8 +388,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
336388
if init.Err != "" {
337389
return
338390
}
391+
339392
// Must wrap the data channel inside this connection
340393
// for buffering from the dialed endpoint to the client.
394+
l.log.Debug(ctx, "data channel initialized, tunnelling")
341395
co := &dataChannelConn{
342396
addr: nil,
343397
dc: dc,
@@ -357,6 +411,8 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
357411

358412
// Close closes the broker socket and all created RTC connections.
359413
func (l *listener) Close() error {
414+
l.log.Info(context.Background(), "listener closed")
415+
360416
l.connClosersMut.Lock()
361417
for _, connCloser := range l.connClosers {
362418
// We can ignore the error here... it doesn't

0 commit comments

Comments
 (0)