Skip to content

Rewrite with compression support #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
Feb 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
8604dee
Increase TestWASM timeout
nhooyr Oct 15, 2019
e55ac18
Document compression API
nhooyr Oct 14, 2019
e142e08
Improve compression docs
nhooyr Nov 12, 2019
53c1aea
Implement compression extension negotiation
nhooyr Nov 12, 2019
2cf6c28
Implement compression writer and reader pooling
nhooyr Nov 12, 2019
a01afea
Support x-webkit-deflate-frame extension for Safari
nhooyr Nov 12, 2019
531d4fa
Improve general compression API and write docs
nhooyr Nov 12, 2019
d0a8049
Rewrite core
nhooyr Nov 19, 2019
dd107dd
Update CI
nhooyr Nov 28, 2019
6c6b8e9
Cleanup wspb and wsjson
nhooyr Nov 28, 2019
6b782a3
Run make fmt
nhooyr Nov 28, 2019
989ba2f
Change websocket to WebSocket in docs/errors
nhooyr Nov 28, 2019
9f15963
Simplify dial.go
nhooyr Nov 28, 2019
120911b
Remove use of math/rand.Init
nhooyr Nov 28, 2019
7ad1514
Update README.md comparison
nhooyr Nov 29, 2019
746140b
Further improve README
nhooyr Nov 29, 2019
43cb01e
Refactor read.go/write.go
nhooyr Nov 29, 2019
e8dfe27
Make CI pass
nhooyr Nov 29, 2019
f6137f3
Add minor improvements
nhooyr Dec 6, 2019
6f6fa43
Refactor autobahn
nhooyr Dec 31, 2019
8c87970
Add slidingWindowReader
nhooyr Jan 4, 2020
aaf4b45
Up test coverage of accept.go to 100%
nhooyr Jan 26, 2020
6b76536
Up dial coverage to 100%
nhooyr Jan 30, 2020
0f115ed
Add Go 1.12 support
nhooyr Jan 31, 2020
b6b56b7
Both modes seem to work :)
nhooyr Feb 5, 2020
9e32354
Fix randString method in tests
nhooyr Feb 6, 2020
78da35e
Get test with multiple messages working
nhooyr Feb 7, 2020
d092686
Autobahn tests fully pass :)
nhooyr Feb 8, 2020
6975801
Fix race in tests
nhooyr Feb 9, 2020
bbaf469
Fix test step
nhooyr Feb 9, 2020
faadcc9
Simplify tests
nhooyr Feb 9, 2020
3f2589f
Remove quite a bit of slog
nhooyr Feb 9, 2020
b53f306
Get Wasm tests working
nhooyr Feb 9, 2020
69ff675
More tests and fixes
nhooyr Feb 9, 2020
085e671
Get coverage to 85%
nhooyr Feb 9, 2020
51769b3
Add wspb test
nhooyr Feb 9, 2020
670be05
Merge in handshake improvements from master
nhooyr Feb 9, 2020
988b8f2
Merge remote-tracking branch 'origin/master' into compress
nhooyr Feb 9, 2020
3a526d8
Fix bug in closeHandshake
nhooyr Feb 9, 2020
999b812
Fix race in msgReader
nhooyr Feb 9, 2020
4b84d25
Fix a race with c.closed
nhooyr Feb 9, 2020
85f249d
Up timeouts
nhooyr Feb 9, 2020
6b38ebb
Test fixes
nhooyr Feb 9, 2020
6770421
Fix goroutine leak from deadlock when closing
nhooyr Feb 12, 2020
c752365
Make flateThreshold work
nhooyr Feb 12, 2020
0ea9466
Cleanup writeMu and flateThreshold
nhooyr Feb 12, 2020
b33d48c
Minor cleanup
nhooyr Feb 12, 2020
9c5bfab
Simplifications of conn_test.go
nhooyr Feb 13, 2020
3673c2c
Use basic test assertions
nhooyr Feb 13, 2020
c5b0a00
Fix badPing test duration
nhooyr Feb 13, 2020
1c7c14e
Pool sliding windows
nhooyr Feb 13, 2020
503b469
Simplify sliding window API
nhooyr Feb 13, 2020
dff4af3
Add conn benchmark
nhooyr Feb 13, 2020
2377cca
Switch to klauspost/compress
nhooyr Feb 15, 2020
d57b253
Report how efficient compression is in BenchmarkConn
nhooyr Feb 16, 2020
1bc100d
Update docs and random little issues
nhooyr Feb 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Get test with multiple messages working
  • Loading branch information
nhooyr committed Feb 9, 2020
commit 78da35ec5b221d5ec664ee9cbf0a8fb034d46f4c
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ go get nhooyr.io/websocket
- [net.Conn](https://godoc.org/nhooyr.io/websocket#NetConn) wrapper
- [Ping pong](https://godoc.org/nhooyr.io/websocket#Conn.Ping) API
- [RFC 7692](https://tools.ietf.org/html/rfc7692) permessage-deflate compression
- Can target [Wasm](https://godoc.org/nhooyr.io/websocket#hdr-Wasm)
- Compile to [Wasm](https://godoc.org/nhooyr.io/websocket#hdr-Wasm)

## Roadmap

Expand Down
79 changes: 59 additions & 20 deletions assert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package websocket_test
import (
"context"
"crypto/rand"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"cdr.dev/slog/sloggers/slogtest/assert"

"nhooyr.io/websocket"
Expand All @@ -20,26 +25,31 @@ func randBytes(t *testing.T, n int) []byte {
return b
}

func assertJSONEcho(t *testing.T, ctx context.Context, c *websocket.Conn, n int) {
t.Helper()
defer c.Close(websocket.StatusInternalError, "")
func echoJSON(t *testing.T, c *websocket.Conn, n int) {
slog.Helper()

exp := randString(t, n)
err := wsjson.Write(ctx, c, exp)
assert.Success(t, "wsjson.Write", err)
s := randString(t, n)
writeJSON(t, c, s)
readJSON(t, c, s)
}

assertJSONRead(t, ctx, c, exp)
func writeJSON(t *testing.T, c *websocket.Conn, v interface{}) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

c.Close(websocket.StatusNormalClosure, "")
err := wsjson.Write(ctx, c, v)
assert.Success(t, "wsjson.Write", err)
}

func assertJSONRead(t *testing.T, ctx context.Context, c *websocket.Conn, exp interface{}) {
func readJSON(t *testing.T, c *websocket.Conn, exp interface{}) {
slog.Helper()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

var act interface{}
err := wsjson.Read(ctx, c, &act)
assert.Success(t, "wsjson.Read", err)

assert.Equal(t, "json", exp, act)
}

Expand All @@ -58,7 +68,7 @@ func randString(t *testing.T, n int) string {
}

func assertEcho(t *testing.T, ctx context.Context, c *websocket.Conn, typ websocket.MessageType, n int) {
t.Helper()
slog.Helper()

p := randBytes(t, n)
err := c.Write(ctx, typ, p)
Expand All @@ -72,17 +82,46 @@ func assertEcho(t *testing.T, ctx context.Context, c *websocket.Conn, typ websoc
}

func assertSubprotocol(t *testing.T, c *websocket.Conn, exp string) {
t.Helper()
slog.Helper()

assert.Equal(t, "subprotocol", exp, c.Subprotocol())
}

func assertCloseStatus(t *testing.T, exp websocket.StatusCode, err error) {
t.Helper()
defer func() {
if t.Failed() {
t.Logf("error: %+v", err)
}
}()
assert.Equal(t, "closeStatus", exp, websocket.CloseStatus(err))
func assertCloseStatus(t testing.TB, exp websocket.StatusCode, err error) {
slog.Helper()

if websocket.CloseStatus(err) == -1 {
slogtest.Fatal(t, "expected websocket.CloseError", slogType(err), slog.Error(err))
}
if websocket.CloseStatus(err) != exp {
slogtest.Error(t, "unexpected close status",
slog.F("exp", exp),
slog.F("act", err),
)
}

}

func acceptWebSocket(t testing.TB, r *http.Request, w http.ResponseWriter, opts *websocket.AcceptOptions) *websocket.Conn {
c, err := websocket.Accept(w, r, opts)
assert.Success(t, "websocket.Accept", err)
return c
}

func dialWebSocket(t testing.TB, s *httptest.Server, opts *websocket.DialOptions) (*websocket.Conn, *http.Response) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

if opts == nil {
opts = &websocket.DialOptions{}
}
opts.HTTPClient = s.Client()

c, resp, err := websocket.Dial(ctx, wsURL(s), opts)
assert.Success(t, "websocket.Dial", err)
return c, resp
}

func slogType(v interface{}) slog.Field {
return slog.F("type", fmt.Sprintf("%T", v))
}
16 changes: 10 additions & 6 deletions autobahn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"strconv"
Expand Down Expand Up @@ -53,15 +54,18 @@ func TestAutobahn(t *testing.T) {
func testServerAutobahn(t *testing.T) {
t.Parallel()

s, closeFn := testServer(t, func(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c := acceptWebSocket(t, r, w, &websocket.AcceptOptions{
Subprotocols: []string{"echo"},
})
assert.Success(t, "accept", err)
err = echoLoop(r.Context(), c)
err := echoLoop(r.Context(), c)
assertCloseStatus(t, websocket.StatusNormalClosure, err)
}, false)
defer closeFn()
}))
closeFn := wsgrace(s.Config)
defer func() {
err := closeFn()
assert.Success(t, "closeFn", err)
}()

specFile, err := tempJSONFile(map[string]interface{}{
"outdir": "ci/out/wstestServerReports",
Expand Down
89 changes: 31 additions & 58 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package websocket_test

import (
"context"
"crypto/rand"
"io"
"math/big"
"net/http"
"net/http/httptest"
"strings"
Expand All @@ -18,77 +20,32 @@ import (
"nhooyr.io/websocket"
)

func TestFuzz(t *testing.T) {
t.Parallel()

s, closeFn := testServer(t, func(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
CompressionOptions: websocket.CompressionOptions{
Mode: websocket.CompressionContextTakeover,
},
})
assert.Success(t, "accept", err)
defer c.Close(websocket.StatusInternalError, "")

err = echoLoop(r.Context(), c)
assertCloseStatus(t, websocket.StatusNormalClosure, err)
}, false)
defer closeFn()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

opts := &websocket.DialOptions{
CompressionOptions: websocket.CompressionOptions{
Mode: websocket.CompressionContextTakeover,
},
}
opts.HTTPClient = s.Client()

c, _, err := websocket.Dial(ctx, wsURL(s), opts)
assert.Success(t, "dial", err)
assertJSONEcho(t, ctx, c, 8393)
}

func TestConn(t *testing.T) {
t.Parallel()

t.Run("json", func(t *testing.T) {
s, closeFn := testServer(t, func(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
Subprotocols: []string{"echo"},
CompressionOptions: websocket.CompressionOptions{
Mode: websocket.CompressionContextTakeover,
},
})
assert.Success(t, "accept", err)
defer c.Close(websocket.StatusInternalError, "")

err = echoLoop(r.Context(), c)
assertCloseStatus(t, websocket.StatusNormalClosure, err)
}, false)
t.Parallel()

s, closeFn := testEchoLoop(t)
defer closeFn()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
c, _ := dialWebSocket(t, s, nil)
defer c.Close(websocket.StatusInternalError, "")

opts := &websocket.DialOptions{
Subprotocols: []string{"echo"},
CompressionOptions: websocket.CompressionOptions{
Mode: websocket.CompressionContextTakeover,
},
c.SetReadLimit(1 << 30)

for i := 0; i < 10; i++ {
n := randInt(t, 1_048_576)
echoJSON(t, c, n)
}
opts.HTTPClient = s.Client()

c, _, err := websocket.Dial(ctx, wsURL(s), opts)
assert.Success(t, "dial", err)
assertJSONEcho(t, ctx, c, 8393)
c.Close(websocket.StatusNormalClosure, "")
})
}

func testServer(tb testing.TB, fn func(w http.ResponseWriter, r *http.Request), tls bool) (s *httptest.Server, closeFn func()) {
func testServer(tb testing.TB, fn func(w http.ResponseWriter, r *http.Request)) (s *httptest.Server, closeFn func()) {
h := http.HandlerFunc(fn)
if tls {
if randInt(tb, 2) == 1 {
s = httptest.NewTLSServer(h)
} else {
s = httptest.NewServer(h)
Expand Down Expand Up @@ -179,3 +136,19 @@ func echoLoop(ctx context.Context, c *websocket.Conn) error {
func wsURL(s *httptest.Server) string {
return strings.Replace(s.URL, "http", "ws", 1)
}

func testEchoLoop(t testing.TB) (*httptest.Server, func()) {
return testServer(t, func(w http.ResponseWriter, r *http.Request) {
c := acceptWebSocket(t, r, w, nil)
defer c.Close(websocket.StatusInternalError, "")

err := echoLoop(r.Context(), c)
assertCloseStatus(t, websocket.StatusNormalClosure, err)
})
}

func randInt(t testing.TB, max int) int {
x, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
assert.Success(t, "rand.Int", err)
return int(x.Int64())
}
2 changes: 0 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ func ExampleAccept() {
return
}

log.Printf("received: %v", v)

c.Close(websocket.StatusNormalClosure, "")
})

Expand Down
11 changes: 6 additions & 5 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (mr *msgReader) ensureFlate() {
mr.flateReader = getFlateReader(readerFunc(mr.read), nil)
}
mr.limitReader.r = mr.flateReader
mr.flateTail.Reset(deflateMessageTail)
}

func (mr *msgReader) returnFlateReader() {
Expand Down Expand Up @@ -328,12 +329,12 @@ type msgReader struct {
func (mr *msgReader) reset(ctx context.Context, h header) {
mr.ctx = ctx
mr.flate = h.rsv1
mr.limitReader.reset(readerFunc(mr.read))

if mr.flate {
mr.ensureFlate()
mr.flateTail.Reset(deflateMessageTail)
}

mr.limitReader.reset()
mr.setFrame(h)
}

Expand Down Expand Up @@ -423,13 +424,13 @@ func newLimitReader(c *Conn, r io.Reader, limit int64) *limitReader {
c: c,
}
lr.limit.Store(limit)
lr.r = r
lr.reset()
lr.reset(r)
return lr
}

func (lr *limitReader) reset() {
func (lr *limitReader) reset(r io.Reader) {
lr.n = lr.limit.Load()
lr.r = r
}

func (lr *limitReader) Read(p []byte) (int, error) {
Expand Down
1 change: 1 addition & 0 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (mw *msgWriter) ensureFlate() {
w: writerFunc(mw.write),
}
}
mw.trimWriter.reset()

mw.flateWriter = getFlateWriter(mw.trimWriter)
mw.flate = true
Expand Down
2 changes: 1 addition & 1 deletion ws_js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestEcho(t *testing.T) {

assertSubprotocol(t, c, "echo")
assert.Equalf(t, &http.Response{}, resp, "http.Response")
assertJSONEcho(t, ctx, c, 1024)
echoJSON(t, ctx, c, 1024)
assertEcho(t, ctx, c, websocket.MessageBinary, 1024)

err = c.Close(websocket.StatusNormalClosure, "")
Expand Down