Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Add logs to wsnet listener #388

Merged
merged 4 commits into from
Jul 20, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add more logging to wsnet listener
  • Loading branch information
deansheather committed Jul 20, 2021
commit 6af71930785cb7f55414c6061570db1a1cebad46
68 changes: 62 additions & 6 deletions wsnet/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/yamux"
Expand Down Expand Up @@ -61,6 +62,8 @@ func Listen(ctx context.Context, log slog.Logger, broker string, turnProxyAuthTo
for {
err := <-ch
if errors.Is(err, io.EOF) || errors.Is(err, yamux.ErrKeepAliveTimeout) {
l.log.Warn(ctx, "disconnected from broker", slog.Error(err))

// If we hit an EOF, then the connection to the broker
// was interrupted. We'll take a short break then dial
// again.
Expand Down Expand Up @@ -97,12 +100,16 @@ type listener struct {
ws *websocket.Conn
connClosers []io.Closer
connClosersMut sync.Mutex

nextConnNumber int64
}

func (l *listener) dial(ctx context.Context) (<-chan error, error) {
l.log.Info(ctx, "connecting to broker", slog.F("broker_url", l.broker))
if l.ws != nil {
_ = l.ws.Close(websocket.StatusNormalClosure, "new connection inbound")
}

conn, resp, err := websocket.Dial(ctx, l.broker, nil)
if err != nil {
if resp != nil {
Expand All @@ -111,13 +118,16 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
return nil, err
}
l.ws = conn

nconn := websocket.NetConn(ctx, conn, websocket.MessageBinary)
config := yamux.DefaultConfig()
config.LogOutput = io.Discard
session, err := yamux.Server(nconn, config)
if err != nil {
return nil, fmt.Errorf("create multiplex: %w", err)
}

l.log.Info(ctx, "broker connection established")
errCh := make(chan error)
go func() {
defer close(errCh)
Expand All @@ -127,19 +137,21 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
errCh <- err
break
}
go l.negotiate(conn)
go l.negotiate(ctx, conn)
}
}()

return errCh, nil
}

// Negotiates the handshake protocol over the connection provided.
// This functions control-flow is important to readability,
// so the cognitive overload linter has been disabled.
// nolint:gocognit,nestif
func (l *listener) negotiate(conn net.Conn) {
func (l *listener) negotiate(ctx context.Context, conn net.Conn) {
var (
err error
id = atomic.AddInt64(&l.nextConnNumber, 1)
decoder = json.NewDecoder(conn)
rtc *webrtc.PeerConnection
// If candidates are sent before an offer, we place them here.
Expand All @@ -149,6 +161,8 @@ func (l *listener) negotiate(conn net.Conn) {
// Sends the error provided then closes the connection.
// If RTC isn't connected, we'll close it.
closeError = func(err error) {
l.log.Warn(ctx, "negotiation error, closing connection", slog.Error(err))

d, _ := json.Marshal(&BrokerMessage{
Error: err.Error(),
})
Expand All @@ -163,13 +177,17 @@ func (l *listener) negotiate(conn net.Conn) {
}
)

ctx = slog.With(ctx, slog.F("conn_id", id))
l.log.Info(ctx, "accepted new session from broker connection, negotiating")

for {
var msg BrokerMessage
err = decoder.Decode(&msg)
if err != nil {
closeError(err)
return
}
l.log.Debug(ctx, "received broker message", slog.F("msg", msg))

if msg.Candidate != "" {
c := webrtc.ICECandidateInit{
Expand All @@ -181,6 +199,7 @@ func (l *listener) negotiate(conn net.Conn) {
continue
}

l.log.Debug(ctx, "adding ICE candidate", slog.F("c", c))
err = rtc.AddICECandidate(c)
if err != nil {
closeError(fmt.Errorf("accept ice candidate: %w", err))
Expand All @@ -199,12 +218,15 @@ func (l *listener) negotiate(conn net.Conn) {
// so it will not validate.
continue
}

l.log.Debug(ctx, "validating ICE server", slog.F("s", server))
err = DialICE(server, nil)
if err != nil {
closeError(fmt.Errorf("dial server %+v: %w", server.URLs, err))
return
}
}

var turnProxy proxy.Dialer
if msg.TURNProxyURL != "" {
u, err := url.Parse(msg.TURNProxyURL)
Expand All @@ -223,47 +245,58 @@ func (l *listener) negotiate(conn net.Conn) {
return
}
rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
l.log.Debug(ctx, "connection state change", slog.F("state", pcs.String()))
if pcs == webrtc.PeerConnectionStateConnecting {
return
}
_ = conn.Close()
})

flushCandidates := proxyICECandidates(rtc, conn)
l.connClosersMut.Lock()
l.connClosers = append(l.connClosers, rtc)
l.connClosersMut.Unlock()
rtc.OnDataChannel(l.handle(msg))
rtc.OnDataChannel(l.handle(ctx, msg))

l.log.Debug(ctx, "set remote description", slog.F("offer", *msg.Offer))
err = rtc.SetRemoteDescription(*msg.Offer)
if err != nil {
closeError(fmt.Errorf("apply offer: %w", err))
return
}

answer, err := rtc.CreateAnswer(nil)
if err != nil {
closeError(fmt.Errorf("create answer: %w", err))
return
}

l.log.Debug(ctx, "set local description", slog.F("answer", answer))
err = rtc.SetLocalDescription(answer)
if err != nil {
closeError(fmt.Errorf("set local answer: %w", err))
return
}
flushCandidates()

data, err := json.Marshal(&BrokerMessage{
bmsg := &BrokerMessage{
Answer: rtc.LocalDescription(),
})
}
data, err := json.Marshal(bmsg)
if err != nil {
closeError(fmt.Errorf("marshal: %w", err))
return
}

l.log.Debug(ctx, "writing message", slog.F("msg", bmsg))
_, err = conn.Write(data)
if err != nil {
closeError(fmt.Errorf("write: %w", err))
return
}

for _, candidate := range pendingCandidates {
l.log.Debug(ctx, "adding pending ICE candidate", slog.F("c", candidate))
err = rtc.AddICECandidate(candidate)
if err != nil {
closeError(fmt.Errorf("add pending candidate: %w", err))
Expand All @@ -275,19 +308,25 @@ func (l *listener) negotiate(conn net.Conn) {
}
}

func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
// nolint:gocognit
func (l *listener) handle(ctx context.Context, msg BrokerMessage) func(dc *webrtc.DataChannel) {
return func(dc *webrtc.DataChannel) {
if dc.Protocol() == controlChannel {
// The control channel handles pings.
dc.OnOpen(func() {
l.log.Debug(ctx, "control channel open")
rw, err := dc.Detach()
if err != nil {
return
}
// We'll read and write back a single byte for ping/pongin'.
d := make([]byte, 1)
for {
l.log.Debug(ctx, "sending ping")
_, err = rw.Read(d)
if err != nil {
l.log.Debug(ctx, "reading ping response failed", slog.Error(err))
}
if errors.Is(err, io.EOF) {
return
}
Expand All @@ -300,25 +339,36 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
return
}

ctx = slog.With(ctx,
slog.F("dc_id", dc.ID()),
slog.F("dc_label", dc.Label()),
slog.F("dc_proto", dc.Protocol()),
)

dc.OnOpen(func() {
l.log.Info(ctx, "data channel opened")
rw, err := dc.Detach()
if err != nil {
return
}

var init DialChannelResponse
sendInitMessage := func() {
l.log.Debug(ctx, "sending dc init message", slog.F("msg", init))
initData, err := json.Marshal(&init)
if err != nil {
l.log.Debug(ctx, "failed to marshal dc init message", slog.Error(err))
rw.Close()
return
}
_, err = rw.Write(initData)
if err != nil {
l.log.Debug(ctx, "failed to write dc init message", slog.Error(err))
return
}
if init.Err != "" {
// If an error occurred, we're safe to close the connection.
l.log.Debug(ctx, "closing data channel due to error", slog.F("msg", init.Err))
dc.Close()
return
}
Expand All @@ -336,8 +386,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
return
}

l.log.Debug(ctx, "dialing remote address", slog.F("network", network), slog.F("addr", addr))
nc, err := net.Dial(network, addr)
if err != nil {
l.log.Debug(ctx, "failed to dial remote address")
init.Code = CodeDialErr
init.Err = err.Error()
if op, ok := err.(*net.OpError); ok {
Expand All @@ -349,8 +401,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
if init.Err != "" {
return
}

// Must wrap the data channel inside this connection
// for buffering from the dialed endpoint to the client.
l.log.Debug(ctx, "data channel initialized, tunnelling")
co := &dataChannelConn{
addr: nil,
dc: dc,
Expand All @@ -370,6 +424,8 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {

// Close closes the broker socket and all created RTC connections.
func (l *listener) Close() error {
l.log.Info(context.Background(), "listener closed")

l.connClosersMut.Lock()
for _, connCloser := range l.connClosers {
// We can ignore the error here... it doesn't
Expand Down