Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Commit f85e054

Browse files
authored
feat: Add Benchmarks to test WebRTC connections (#368)
* feat: Add Benchmarks to test WebRTC connections * Isolate race condition further * Add race condition 😥 * Add benchmark step * Fix linting * Suppress TURN output * Remove bench action
1 parent 177ee84 commit f85e054

File tree

7 files changed

+151
-9
lines changed

7 files changed

+151
-9
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/pion/datachannel v1.4.21
1717
github.com/pion/dtls/v2 v2.0.9
1818
github.com/pion/ice/v2 v2.1.7
19+
github.com/pion/logging v0.2.2
1920
github.com/pion/turn/v2 v2.0.5
2021
github.com/pion/webrtc/v3 v3.0.29
2122
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4

wsnet/conn.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,22 @@ import (
44
"fmt"
55
"net"
66
"net/url"
7+
"sync"
78
"time"
89

910
"github.com/pion/datachannel"
11+
"github.com/pion/webrtc/v3"
1012
)
1113

1214
const (
1315
httpScheme = "http"
16+
17+
bufferedAmountLowThreshold uint64 = 512 * 1024 // 512 KB
18+
maxBufferedAmount uint64 = 1024 * 1024 // 1 MB
19+
// For some reason messages larger just don't work...
20+
// This shouldn't be a huge deal for real-world usage.
21+
// See: https://github.com/pion/datachannel/issues/59
22+
maxMessageLength = 32 * 1024 // 32 KB
1423
)
1524

1625
// TURNEndpoint returns the TURN address for a Coder baseURL.
@@ -43,19 +52,63 @@ func ConnectEndpoint(baseURL *url.URL, workspace, token string) string {
4352

4453
type conn struct {
4554
addr *net.UnixAddr
55+
dc *webrtc.DataChannel
4656
rw datachannel.ReadWriteCloser
57+
58+
sendMore chan struct{}
59+
closedMutex sync.RWMutex
60+
closed bool
61+
62+
writeMutex sync.Mutex
63+
}
64+
65+
func (c *conn) init() {
66+
c.sendMore = make(chan struct{}, 1)
67+
c.dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold)
68+
c.dc.OnBufferedAmountLow(func() {
69+
c.closedMutex.RLock()
70+
defer c.closedMutex.RUnlock()
71+
if c.closed {
72+
return
73+
}
74+
select {
75+
case c.sendMore <- struct{}{}:
76+
default:
77+
}
78+
})
4779
}
4880

4981
func (c *conn) Read(b []byte) (n int, err error) {
5082
return c.rw.Read(b)
5183
}
5284

5385
func (c *conn) Write(b []byte) (n int, err error) {
86+
c.writeMutex.Lock()
87+
defer c.writeMutex.Unlock()
88+
if len(b) > maxMessageLength {
89+
return 0, fmt.Errorf("outbound packet larger than maximum message size: %d", maxMessageLength)
90+
}
91+
if c.dc.BufferedAmount()+uint64(len(b)) >= maxBufferedAmount {
92+
<-c.sendMore
93+
}
94+
// TODO (@kyle): There's an obvious race-condition here.
95+
// This is an edge-case, as most-frequently data won't
96+
// be pooled so synchronously, but is definitely possible.
97+
//
98+
// See: https://github.com/pion/sctp/issues/181
99+
time.Sleep(time.Microsecond)
100+
54101
return c.rw.Write(b)
55102
}
56103

57104
func (c *conn) Close() error {
58-
return c.rw.Close()
105+
c.closedMutex.Lock()
106+
defer c.closedMutex.Unlock()
107+
if !c.closed {
108+
c.closed = true
109+
close(c.sendMore)
110+
}
111+
return c.dc.Close()
59112
}
60113

61114
func (c *conn) LocalAddr() net.Addr {

wsnet/dial.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,14 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
249249
return nil, ctx.Err()
250250
}
251251

252-
return &conn{
252+
c := &conn{
253253
addr: &net.UnixAddr{
254254
Name: address,
255255
Net: network,
256256
},
257+
dc: dc,
257258
rw: rw,
258-
}, nil
259+
}
260+
c.init()
261+
return c, nil
259262
}

wsnet/dial_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package wsnet
33
import (
44
"bytes"
55
"context"
6+
"crypto/rand"
67
"errors"
78
"io"
89
"net"
10+
"strconv"
911
"testing"
1012

1113
"github.com/pion/webrtc/v3"
@@ -160,3 +162,70 @@ func TestDial(t *testing.T) {
160162
}
161163
})
162164
}
165+
166+
func BenchmarkThroughput(b *testing.B) {
167+
sizes := []int64{
168+
4,
169+
16,
170+
128,
171+
256,
172+
1024,
173+
4096,
174+
16384,
175+
32768,
176+
}
177+
178+
listener, err := net.Listen("tcp", "0.0.0.0:0")
179+
if err != nil {
180+
b.Error(err)
181+
return
182+
}
183+
go func() {
184+
for {
185+
conn, err := listener.Accept()
186+
if err != nil {
187+
b.Error(err)
188+
return
189+
}
190+
go func() {
191+
_, _ = io.Copy(io.Discard, conn)
192+
}()
193+
}
194+
}()
195+
connectAddr, listenAddr := createDumbBroker(b)
196+
_, err = Listen(context.Background(), listenAddr)
197+
if err != nil {
198+
b.Error(err)
199+
return
200+
}
201+
202+
dialer, err := DialWebsocket(context.Background(), connectAddr, nil)
203+
if err != nil {
204+
b.Error(err)
205+
return
206+
}
207+
for _, size := range sizes {
208+
size := size
209+
bytes := make([]byte, size)
210+
_, _ = rand.Read(bytes)
211+
b.Run("Rand"+strconv.Itoa(int(size)), func(b *testing.B) {
212+
b.SetBytes(size)
213+
b.ReportAllocs()
214+
215+
conn, err := dialer.DialContext(context.Background(), listener.Addr().Network(), listener.Addr().String())
216+
if err != nil {
217+
b.Error(err)
218+
return
219+
}
220+
defer conn.Close()
221+
222+
for i := 0; i < b.N; i++ {
223+
_, err := conn.Write(bytes)
224+
if err != nil {
225+
b.Error(err)
226+
break
227+
}
228+
}
229+
})
230+
}
231+
}

wsnet/listen.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
311311
return
312312
}
313313

314-
conn, err := net.Dial(network, addr)
314+
nc, err := net.Dial(network, addr)
315315
if err != nil {
316316
init.Code = CodeDialErr
317317
init.Err = err.Error()
@@ -324,13 +324,21 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
324324
if init.Err != "" {
325325
return
326326
}
327-
defer conn.Close()
328-
defer dc.Close()
327+
// Must wrap the data channel inside this connection
328+
// for buffering from the dialed endpoint to the client.
329+
co := &conn{
330+
addr: nil,
331+
dc: dc,
332+
rw: rw,
333+
}
334+
co.init()
335+
defer co.Close()
336+
defer nc.Close()
329337

330338
go func() {
331-
_, _ = io.Copy(rw, conn)
339+
_, _ = io.Copy(co, nc)
332340
}()
333-
_, _ = io.Copy(conn, rw)
341+
_, _ = io.Copy(nc, co)
334342
})
335343
}
336344
}

wsnet/rtc.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/pion/dtls/v2"
1616
"github.com/pion/ice/v2"
17+
"github.com/pion/logging"
1718
"github.com/pion/turn/v2"
1819
"github.com/pion/webrtc/v3"
1920
)
@@ -159,6 +160,9 @@ func newPeerConnection(servers []webrtc.ICEServer) (*webrtc.PeerConnection, erro
159160
se.SetSrflxAcceptanceMinWait(0)
160161
se.DetachDataChannels()
161162
se.SetICETimeouts(time.Second*5, time.Second*5, time.Second*2)
163+
lf := logging.NewDefaultLoggerFactory()
164+
lf.DefaultLogLevel = logging.LogLevelDisabled
165+
se.LoggerFactory = lf
162166

163167
// If one server is provided and we know it's TURN, we can set the
164168
// relay acceptable so the connection starts immediately.

wsnet/wsnet_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ import (
2020
"cdr.dev/slog/sloggers/slogtest/assert"
2121
"github.com/hashicorp/yamux"
2222
"github.com/pion/ice/v2"
23+
"github.com/pion/logging"
2324
"github.com/pion/turn/v2"
2425
"nhooyr.io/websocket"
2526
)
2627

2728
// createDumbBroker proxies sockets between /listen and /connect
2829
// to emulate an authenticated WebSocket pair.
29-
func createDumbBroker(t *testing.T) (connectAddr string, listenAddr string) {
30+
func createDumbBroker(t testing.TB) (connectAddr string, listenAddr string) {
3031
listener, err := net.Listen("tcp4", "127.0.0.1:0")
3132
if err != nil {
3233
t.Error(err)
@@ -128,13 +129,16 @@ func createTURNServer(t *testing.T, server ice.SchemeType, pass string) string {
128129
}}
129130
}
130131

132+
lf := logging.NewDefaultLoggerFactory()
133+
lf.DefaultLogLevel = logging.LogLevelDisabled
131134
srv, err := turn.NewServer(turn.ServerConfig{
132135
PacketConnConfigs: pcListeners,
133136
ListenerConfigs: listeners,
134137
Realm: "coder",
135138
AuthHandler: func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) {
136139
return turn.GenerateAuthKey(username, realm, pass), true
137140
},
141+
LoggerFactory: lf,
138142
})
139143
if err != nil {
140144
t.Error(err)

0 commit comments

Comments
 (0)