From 5ff49026d3118f8647ceaa4c895ba1980a2ed233 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 15 Jun 2021 05:37:06 +0000 Subject: [PATCH 1/7] feat: Add Benchmarks to test WebRTC connections --- wsnet/conn.go | 41 ++++++++++++++++++++++++++- wsnet/dial.go | 7 +++-- wsnet/dial_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++++ wsnet/listen.go | 25 +++++++++++++++++ wsnet/rtc.go | 4 +++ wsnet/wsnet_test.go | 2 +- 6 files changed, 143 insertions(+), 4 deletions(-) diff --git a/wsnet/conn.go b/wsnet/conn.go index 7e18723b..787bf8bb 100644 --- a/wsnet/conn.go +++ b/wsnet/conn.go @@ -4,13 +4,22 @@ import ( "fmt" "net" "net/url" + "sync" "time" "github.com/pion/datachannel" + "github.com/pion/webrtc/v3" ) const ( httpScheme = "http" + + bufferedAmountLowThreshold uint64 = 512 * 1024 // 512 KB + maxBufferedAmount uint64 = 1024 * 1024 // 1 MB + // For some reason messages larger just don't work... + // This shouldn't be a huge deal for real-world usage. + // See: https://github.com/pion/datachannel/issues/59 + maxMessageLength = 32 * 1024 // 32 KB ) // TURNEndpoint returns the TURN address for a Coder baseURL. @@ -43,7 +52,25 @@ func ConnectEndpoint(baseURL *url.URL, workspace, token string) string { type conn struct { addr *net.UnixAddr + dc *webrtc.DataChannel rw datachannel.ReadWriteCloser + + sendMore chan struct{} + closedMutex sync.Mutex + closed bool +} + +func (c *conn) init() { + c.sendMore = make(chan struct{}) + c.dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) + c.dc.OnBufferedAmountLow(func() { + c.closedMutex.Lock() + defer c.closedMutex.Unlock() + if c.closed { + return + } + c.sendMore <- struct{}{} + }) } func (c *conn) Read(b []byte) (n int, err error) { @@ -51,11 +78,23 @@ func (c *conn) Read(b []byte) (n int, err error) { } func (c *conn) Write(b []byte) (n int, err error) { + if len(b) > maxMessageLength { + return 0, fmt.Errorf("outbound packet larger than maximum message size: %d", maxMessageLength) + } + if c.dc.BufferedAmount()+uint64(len(b)) >= maxBufferedAmount { + <-c.sendMore + } return c.rw.Write(b) } func (c *conn) Close() error { - return c.rw.Close() + c.closedMutex.Lock() + defer c.closedMutex.Unlock() + if !c.closed { + c.closed = true + close(c.sendMore) + } + return c.dc.Close() } func (c *conn) LocalAddr() net.Addr { diff --git a/wsnet/dial.go b/wsnet/dial.go index 23581eaf..9debf47a 100644 --- a/wsnet/dial.go +++ b/wsnet/dial.go @@ -249,11 +249,14 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net. return nil, ctx.Err() } - return &conn{ + c := &conn{ addr: &net.UnixAddr{ Name: address, Net: network, }, + dc: dc, rw: rw, - }, nil + } + c.init() + return c, nil } diff --git a/wsnet/dial_test.go b/wsnet/dial_test.go index 584c37d5..42392598 100644 --- a/wsnet/dial_test.go +++ b/wsnet/dial_test.go @@ -3,9 +3,11 @@ package wsnet import ( "bytes" "context" + "crypto/rand" "errors" "io" "net" + "strconv" "testing" "github.com/pion/webrtc/v3" @@ -160,3 +162,69 @@ func TestDial(t *testing.T) { } }) } + +func BenchmarkThroughput(b *testing.B) { + sizes := []int64{ + // 4, + // 16, + // 256, + // 1024, + // 4096, + // 16384, + 32768, + } + + listener, err := net.Listen("tcp", "0.0.0.0:0") + if err != nil { + b.Error(err) + return + } + go func() { + for { + conn, err := listener.Accept() + if err != nil { + b.Error(err) + return + } + go func() { + _, _ = io.Copy(io.Discard, conn) + }() + } + }() + connectAddr, listenAddr := createDumbBroker(b) + _, err = Listen(context.Background(), listenAddr) + if err != nil { + b.Error(err) + return + } + + dialer, err := DialWebsocket(context.Background(), connectAddr, nil) + if err != nil { + b.Error(err) + return + } + for _, size := range sizes { + size := size + bytes := make([]byte, size) + _, _ = rand.Read(bytes) + b.Run("Rand"+strconv.Itoa(int(size)), func(b *testing.B) { + b.SetBytes(size) + b.ReportAllocs() + + conn, err := dialer.DialContext(context.Background(), listener.Addr().Network(), listener.Addr().String()) + if err != nil { + b.Error(err) + return + } + defer conn.Close() + + for i := 0; i < b.N; i++ { + _, err := conn.Write(bytes) + if err != nil { + b.Error(err) + break + } + } + }) + } +} diff --git a/wsnet/listen.go b/wsnet/listen.go index 1496e19c..74e34f2c 100644 --- a/wsnet/listen.go +++ b/wsnet/listen.go @@ -331,6 +331,31 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) { _, _ = io.Copy(rw, conn) }() _, _ = io.Copy(conn, rw) + + // bufs := make(chan []byte, 32) + // go func() { + // defer close(bufs) + + // for { + // buf := <-bufs + // _, _ = conn.Write(buf) + // } + // }() + + // buf := make([]byte, maxMessageLength) + // for { + // nr, err := rw.Read(buf) + // if nr > 0 { + // select { + // case bufs <- buf[0:nr]: + // default: + // } + + // } + // if err != nil { + // break + // } + // } }) } } diff --git a/wsnet/rtc.go b/wsnet/rtc.go index f5c7c5f3..5bf922a7 100644 --- a/wsnet/rtc.go +++ b/wsnet/rtc.go @@ -14,6 +14,7 @@ import ( "github.com/pion/dtls/v2" "github.com/pion/ice/v2" + "github.com/pion/logging" "github.com/pion/turn/v2" "github.com/pion/webrtc/v3" ) @@ -159,6 +160,9 @@ func newPeerConnection(servers []webrtc.ICEServer) (*webrtc.PeerConnection, erro se.SetSrflxAcceptanceMinWait(0) se.DetachDataChannels() se.SetICETimeouts(time.Second*5, time.Second*5, time.Second*2) + lf := logging.NewDefaultLoggerFactory() + lf.DefaultLogLevel = logging.LogLevelDebug + se.LoggerFactory = lf // If one server is provided and we know it's TURN, we can set the // relay acceptable so the connection starts immediately. diff --git a/wsnet/wsnet_test.go b/wsnet/wsnet_test.go index 8452015d..93346f1e 100644 --- a/wsnet/wsnet_test.go +++ b/wsnet/wsnet_test.go @@ -26,7 +26,7 @@ import ( // createDumbBroker proxies sockets between /listen and /connect // to emulate an authenticated WebSocket pair. -func createDumbBroker(t *testing.T) (connectAddr string, listenAddr string) { +func createDumbBroker(t testing.TB) (connectAddr string, listenAddr string) { listener, err := net.Listen("tcp4", "127.0.0.1:0") if err != nil { t.Error(err) From 0dc0e56cfef10b2b1cb85603c2da7cd7e6a42a47 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 15 Jun 2021 18:10:57 +0000 Subject: [PATCH 2/7] Isolate race condition further --- wsnet/conn.go | 19 ++++++++++++++----- wsnet/dial_test.go | 3 ++- wsnet/rtc.go | 2 +- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/wsnet/conn.go b/wsnet/conn.go index 787bf8bb..11a3abfa 100644 --- a/wsnet/conn.go +++ b/wsnet/conn.go @@ -56,20 +56,25 @@ type conn struct { rw datachannel.ReadWriteCloser sendMore chan struct{} - closedMutex sync.Mutex + closedMutex sync.RWMutex closed bool + + writeMutex sync.Mutex } func (c *conn) init() { - c.sendMore = make(chan struct{}) + c.sendMore = make(chan struct{}, 1) c.dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) c.dc.OnBufferedAmountLow(func() { - c.closedMutex.Lock() - defer c.closedMutex.Unlock() + c.closedMutex.RLock() + defer c.closedMutex.RUnlock() if c.closed { return } - c.sendMore <- struct{}{} + select { + case c.sendMore <- struct{}{}: + default: + } }) } @@ -78,12 +83,16 @@ func (c *conn) Read(b []byte) (n int, err error) { } func (c *conn) Write(b []byte) (n int, err error) { + c.writeMutex.Lock() + defer c.writeMutex.Unlock() if len(b) > maxMessageLength { return 0, fmt.Errorf("outbound packet larger than maximum message size: %d", maxMessageLength) } if c.dc.BufferedAmount()+uint64(len(b)) >= maxBufferedAmount { <-c.sendMore } + // Uncomment this line for it to work. + // time.Sleep(time.Microsecond) return c.rw.Write(b) } diff --git a/wsnet/dial_test.go b/wsnet/dial_test.go index 42392598..7dc3b612 100644 --- a/wsnet/dial_test.go +++ b/wsnet/dial_test.go @@ -167,11 +167,12 @@ func BenchmarkThroughput(b *testing.B) { sizes := []int64{ // 4, // 16, + 128, // 256, // 1024, // 4096, // 16384, - 32768, + // 32768, } listener, err := net.Listen("tcp", "0.0.0.0:0") diff --git a/wsnet/rtc.go b/wsnet/rtc.go index 5bf922a7..0605fa29 100644 --- a/wsnet/rtc.go +++ b/wsnet/rtc.go @@ -161,7 +161,7 @@ func newPeerConnection(servers []webrtc.ICEServer) (*webrtc.PeerConnection, erro se.DetachDataChannels() se.SetICETimeouts(time.Second*5, time.Second*5, time.Second*2) lf := logging.NewDefaultLoggerFactory() - lf.DefaultLogLevel = logging.LogLevelDebug + lf.DefaultLogLevel = logging.LogLevelDisabled se.LoggerFactory = lf // If one server is provided and we know it's TURN, we can set the From c7b604cbc3a479039710e0696d115e0f123b28cf Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 15 Jun 2021 19:15:09 +0000 Subject: [PATCH 3/7] =?UTF-8?q?Add=20race=20condition=20=F0=9F=98=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + wsnet/conn.go | 9 +++++++-- wsnet/dial_test.go | 14 +++++++------- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 3823f149..3891770e 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/pion/datachannel v1.4.21 github.com/pion/dtls/v2 v2.0.9 github.com/pion/ice/v2 v2.1.7 + github.com/pion/logging v0.2.2 github.com/pion/turn/v2 v2.0.5 github.com/pion/webrtc/v3 v3.0.29 github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 diff --git a/wsnet/conn.go b/wsnet/conn.go index 11a3abfa..b5dea0a5 100644 --- a/wsnet/conn.go +++ b/wsnet/conn.go @@ -91,8 +91,13 @@ func (c *conn) Write(b []byte) (n int, err error) { if c.dc.BufferedAmount()+uint64(len(b)) >= maxBufferedAmount { <-c.sendMore } - // Uncomment this line for it to work. - // time.Sleep(time.Microsecond) + // TODO (@kyle): There's an obvious race-condition here. + // This is an edge-case, as most-frequently data won't + // be pooled so synchronously, but is definitely possible. + // + // See: https://github.com/pion/sctp/issues/181 + time.Sleep(time.Microsecond) + return c.rw.Write(b) } diff --git a/wsnet/dial_test.go b/wsnet/dial_test.go index 7dc3b612..50bdd938 100644 --- a/wsnet/dial_test.go +++ b/wsnet/dial_test.go @@ -165,14 +165,14 @@ func TestDial(t *testing.T) { func BenchmarkThroughput(b *testing.B) { sizes := []int64{ - // 4, - // 16, + 4, + 16, 128, - // 256, - // 1024, - // 4096, - // 16384, - // 32768, + 256, + 1024, + 4096, + 16384, + 32768, } listener, err := net.Listen("tcp", "0.0.0.0:0") From 43c11b0669bc7764d7025e7a2f9c177bf1fc3cd8 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 15 Jun 2021 19:20:54 +0000 Subject: [PATCH 4/7] Add benchmark step --- .github/workflows/test.yaml | 39 +++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 79a324a9..a0076df8 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -54,6 +54,45 @@ jobs: with: args: make -j test/coverage + bench: + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + + - uses: actions/cache@v2 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go- + + - uses: actions/cache@v2 + with: + path: ./cache + key: ${{ runner.os }}-go-bench + restore-keys: | + ${{ runner.os }}-go-bench- + + - name: bench + uses: ./ci/image + env: + COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }} + CODER_URL: ${{ secrets.CODER_URL }} + CODER_EMAIL: ${{ secrets.CODER_EMAIL }} + CODER_PASSWORD: ${{ secrets.CODER_PASSWORD }} + with: + args: sh -c 'go test -bench=. cdr.dev/coder-cli/wsnet | tee output.txt' + + - name: Store benchmark result + uses: rhysd/github-action-benchmark@v1 + with: + tool: 'go' + output-file-path: output.txt + external-data-json-path: ./cache/benchmark-data.json + fail-on-alert: true + github-token: ${{ secrets.GITHUB_TOKEN }} + comment-on-alert: true + gendocs: runs-on: ubuntu-20.04 steps: From 7a78d111f2322dc70f91246736066b2e45fa0483 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 15 Jun 2021 19:24:52 +0000 Subject: [PATCH 5/7] Fix linting --- .github/workflows/test.yaml | 2 +- wsnet/listen.go | 26 +------------------------- 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index a0076df8..5a0c5cd4 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -81,7 +81,7 @@ jobs: CODER_EMAIL: ${{ secrets.CODER_EMAIL }} CODER_PASSWORD: ${{ secrets.CODER_PASSWORD }} with: - args: sh -c 'go test -bench=. cdr.dev/coder-cli/wsnet | tee output.txt' + args: sh -c "go test -bench=. cdr.dev/coder-cli/wsnet | tee output.txt" - name: Store benchmark result uses: rhysd/github-action-benchmark@v1 diff --git a/wsnet/listen.go b/wsnet/listen.go index 74e34f2c..57080eae 100644 --- a/wsnet/listen.go +++ b/wsnet/listen.go @@ -326,36 +326,12 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) { } defer conn.Close() defer dc.Close() + defer rw.Close() go func() { _, _ = io.Copy(rw, conn) }() _, _ = io.Copy(conn, rw) - - // bufs := make(chan []byte, 32) - // go func() { - // defer close(bufs) - - // for { - // buf := <-bufs - // _, _ = conn.Write(buf) - // } - // }() - - // buf := make([]byte, maxMessageLength) - // for { - // nr, err := rw.Read(buf) - // if nr > 0 { - // select { - // case bufs <- buf[0:nr]: - // default: - // } - - // } - // if err != nil { - // break - // } - // } }) } } From df35d18608f85e9d4dd7319d97b74949c6aa5da4 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 15 Jun 2021 19:29:48 +0000 Subject: [PATCH 6/7] Suppress TURN output --- wsnet/listen.go | 19 +++++++++++++------ wsnet/wsnet_test.go | 4 ++++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/wsnet/listen.go b/wsnet/listen.go index 57080eae..3a6735f0 100644 --- a/wsnet/listen.go +++ b/wsnet/listen.go @@ -311,7 +311,7 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) { return } - conn, err := net.Dial(network, addr) + nc, err := net.Dial(network, addr) if err != nil { init.Code = CodeDialErr init.Err = err.Error() @@ -324,14 +324,21 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) { if init.Err != "" { return } - defer conn.Close() - defer dc.Close() - defer rw.Close() + // Must wrap the data channel inside this connection + // for buffering from the dialed endpoint to the client. + co := &conn{ + addr: nil, + dc: dc, + rw: rw, + } + co.init() + defer co.Close() + defer nc.Close() go func() { - _, _ = io.Copy(rw, conn) + _, _ = io.Copy(co, nc) }() - _, _ = io.Copy(conn, rw) + _, _ = io.Copy(nc, co) }) } } diff --git a/wsnet/wsnet_test.go b/wsnet/wsnet_test.go index 93346f1e..fc14cd3c 100644 --- a/wsnet/wsnet_test.go +++ b/wsnet/wsnet_test.go @@ -20,6 +20,7 @@ import ( "cdr.dev/slog/sloggers/slogtest/assert" "github.com/hashicorp/yamux" "github.com/pion/ice/v2" + "github.com/pion/logging" "github.com/pion/turn/v2" "nhooyr.io/websocket" ) @@ -128,6 +129,8 @@ func createTURNServer(t *testing.T, server ice.SchemeType, pass string) string { }} } + lf := logging.NewDefaultLoggerFactory() + lf.DefaultLogLevel = logging.LogLevelDisabled srv, err := turn.NewServer(turn.ServerConfig{ PacketConnConfigs: pcListeners, ListenerConfigs: listeners, @@ -135,6 +138,7 @@ func createTURNServer(t *testing.T, server ice.SchemeType, pass string) string { AuthHandler: func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) { return turn.GenerateAuthKey(username, realm, pass), true }, + LoggerFactory: lf, }) if err != nil { t.Error(err) From ed109f93d0776bf1b24db02c45a8503cf8669afe Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 15 Jun 2021 19:36:17 +0000 Subject: [PATCH 7/7] Remove bench action --- .github/workflows/test.yaml | 39 ------------------------------------- 1 file changed, 39 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 5a0c5cd4..79a324a9 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -54,45 +54,6 @@ jobs: with: args: make -j test/coverage - bench: - runs-on: ubuntu-20.04 - steps: - - uses: actions/checkout@v2 - - - uses: actions/cache@v2 - with: - path: ~/go/pkg/mod - key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go- - - - uses: actions/cache@v2 - with: - path: ./cache - key: ${{ runner.os }}-go-bench - restore-keys: | - ${{ runner.os }}-go-bench- - - - name: bench - uses: ./ci/image - env: - COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }} - CODER_URL: ${{ secrets.CODER_URL }} - CODER_EMAIL: ${{ secrets.CODER_EMAIL }} - CODER_PASSWORD: ${{ secrets.CODER_PASSWORD }} - with: - args: sh -c "go test -bench=. cdr.dev/coder-cli/wsnet | tee output.txt" - - - name: Store benchmark result - uses: rhysd/github-action-benchmark@v1 - with: - tool: 'go' - output-file-path: output.txt - external-data-json-path: ./cache/benchmark-data.json - fail-on-alert: true - github-token: ${{ secrets.GITHUB_TOKEN }} - comment-on-alert: true - gendocs: runs-on: ubuntu-20.04 steps: