Skip to content

Implement more of the API for WASM #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Conn struct {
msgReadLimit int64

// Used to ensure a previous writer is not used after being closed.
activeWriter *messageWriter
activeWriter atomic.Value
// messageWriter state.
writeMsgOpcode opcode
writeMsgCtx context.Context
Expand Down Expand Up @@ -526,16 +526,6 @@ func (c *Conn) readFramePayload(ctx context.Context, p []byte) (int, error) {
return n, err
}

// SetReadLimit sets the max number of bytes to read for a single message.
// It applies to the Reader and Read methods.
//
// By default, the connection has a message read limit of 32768 bytes.
//
// When the limit is hit, the connection will be closed with StatusMessageTooBig.
func (c *Conn) SetReadLimit(n int64) {
c.msgReadLimit = n
}

// Read is a convenience method to read a single message from the connection.
//
// See the Reader method if you want to be able to reuse buffers or want to stream a message.
Expand Down Expand Up @@ -575,7 +565,7 @@ func (c *Conn) writer(ctx context.Context, typ MessageType) (io.WriteCloser, err
w := &messageWriter{
c: c,
}
c.activeWriter = w
c.activeWriter.Store(w)
return w, nil
}

Expand Down Expand Up @@ -607,7 +597,7 @@ type messageWriter struct {
}

func (w *messageWriter) closed() bool {
return w != w.c.activeWriter
return w != w.c.activeWriter.Load()
}

// Write writes the given bytes to the WebSocket connection.
Expand Down Expand Up @@ -645,7 +635,7 @@ func (w *messageWriter) close() error {
if w.closed() {
return fmt.Errorf("cannot use closed writer")
}
w.c.activeWriter = nil
w.c.activeWriter.Store((*messageWriter)(nil))

_, err := w.c.writeFrame(w.c.writeMsgCtx, true, w.c.writeMsgOpcode, nil)
if err != nil {
Expand Down Expand Up @@ -925,7 +915,3 @@ func (c *Conn) extractBufioWriterBuf(w io.Writer) {

c.bw.Reset(w)
}

func (c *netConn) netConnReader(ctx context.Context) (MessageType, io.Reader, error) {
return c.c.Reader(c.readContext)
}
15 changes: 14 additions & 1 deletion netconn.go → conn_common.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// This file contains *Conn symbols relevant to both
// WASM and non WASM builds.

package websocket

import (
Expand Down Expand Up @@ -99,7 +102,7 @@ func (c *netConn) Read(p []byte) (int, error) {
}

if c.reader == nil {
typ, r, err := c.netConnReader(c.readContext)
typ, r, err := c.c.Reader(c.readContext)
if err != nil {
var ce CloseError
if errors.As(err, &ce) && (ce.Code == StatusNormalClosure) || (ce.Code == StatusGoingAway) {
Expand Down Expand Up @@ -189,3 +192,13 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context {
}()
return ctx
}

// SetReadLimit sets the max number of bytes to read for a single message.
// It applies to the Reader and Read methods.
//
// By default, the connection has a message read limit of 32768 bytes.
//
// When the limit is hit, the connection will be closed with StatusMessageTooBig.
func (c *Conn) SetReadLimit(n int64) {
c.msgReadLimit = n
}
12 changes: 11 additions & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,23 @@
// See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
//
// Thus the unsupported features (not compiled in) for WASM are:
//
// - Accept and AcceptOptions
// - Conn's Reader, Writer, SetReadLimit and Ping methods
// - Conn.Ping
// - HTTPClient and HTTPHeader fields in DialOptions
//
// The *http.Response returned by Dial will always either be nil or &http.Response{} as
// we do not have access to the handshake response in the browser.
//
// The Writer method on the Conn buffers everything in memory and then sends it as a message
// when the writer is closed.
//
// The Reader method also reads the entire response and then returns a reader that
// reads from the byte slice.
//
// SetReadLimit cannot actually limit the number of bytes read from the connection so instead
// when a message beyond the limit is fully read, it throws an error.
//
// Writes are also always async so the passed context is no-op.
//
// Everything else is fully supported. This includes the wsjson and wspb helper packages.
Expand Down
66 changes: 64 additions & 2 deletions websocket_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (
"sync/atomic"
"syscall/js"

"nhooyr.io/websocket/internal/bpool"
"nhooyr.io/websocket/internal/wsjs"
)

// Conn provides a wrapper around the browser WebSocket API.
type Conn struct {
ws wsjs.WebSocket

msgReadLimit int64

readClosed int64
closeOnce sync.Once
closed chan struct{}
Expand All @@ -43,6 +46,7 @@ func (c *Conn) close(err error) {
func (c *Conn) init() {
c.closed = make(chan struct{})
c.readch = make(chan wsjs.MessageEvent, 1)
c.msgReadLimit = 32768

c.releaseOnClose = c.ws.OnClose(func(e wsjs.CloseEvent) {
cerr := CloseError{
Expand Down Expand Up @@ -77,6 +81,10 @@ func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
if err != nil {
return 0, nil, fmt.Errorf("failed to read: %w", err)
}
if int64(len(p)) > c.msgReadLimit {
c.Close(StatusMessageTooBig, fmt.Sprintf("read limited at %v bytes", c.msgReadLimit))
return 0, nil, c.closeErr
}
return typ, p, nil
}

Expand Down Expand Up @@ -106,6 +114,11 @@ func (c *Conn) read(ctx context.Context) (MessageType, []byte, error) {
func (c *Conn) Write(ctx context.Context, typ MessageType, p []byte) error {
err := c.write(ctx, typ, p)
if err != nil {
// Have to ensure the WebSocket is closed after a write error
// to match the Go API. It can only error if the message type
// is unexpected or the passed bytes contain invalid UTF-8 for
// MessageText.
c.Close(StatusInternalError, "something went wrong")
return fmt.Errorf("failed to write: %w", err)
}
return nil
Expand Down Expand Up @@ -216,8 +229,10 @@ func dial(ctx context.Context, url string, opts *DialOptions) (*Conn, *http.Resp
return c, &http.Response{}, nil
}

func (c *netConn) netConnReader(ctx context.Context) (MessageType, io.Reader, error) {
typ, p, err := c.c.Read(ctx)
// Reader attempts to read a message from the connection.
// The maximum time spent waiting is bounded by the context.
func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) {
typ, p, err := c.Read(ctx)
if err != nil {
return 0, nil, err
}
Expand All @@ -228,3 +243,50 @@ func (c *netConn) netConnReader(ctx context.Context) (MessageType, io.Reader, er
func (c *Conn) reader(ctx context.Context) {
c.read(ctx)
}

// Writer returns a writer to write a WebSocket data message to the connection.
// It buffers the entire message in memory and then sends it when the writer
// is closed.
func (c *Conn) Writer(ctx context.Context, typ MessageType) (io.WriteCloser, error) {
return writer{
c: c,
ctx: ctx,
typ: typ,
b: bpool.Get(),
}, nil
}

type writer struct {
closed bool

c *Conn
ctx context.Context
typ MessageType

b *bytes.Buffer
}

func (w writer) Write(p []byte) (int, error) {
if w.closed {
return 0, errors.New("cannot write to closed writer")
}
n, err := w.b.Write(p)
if err != nil {
return n, fmt.Errorf("failed to write message: %w", err)
}
return n, nil
}

func (w writer) Close() error {
if w.closed {
return errors.New("cannot close closed writer")
}
w.closed = true
defer bpool.Put(w.b)

err := w.c.Write(w.ctx, w.typ, w.b.Bytes())
if err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}
return nil
}
6 changes: 1 addition & 5 deletions wsjson/wsjson.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build !js

// Package wsjson provides websocket helpers for JSON messages.
package wsjson // import "nhooyr.io/websocket/wsjson"

Expand Down Expand Up @@ -34,9 +32,7 @@ func read(ctx context.Context, c *websocket.Conn, v interface{}) error {
}

b := bpool.Get()
defer func() {
bpool.Put(b)
}()
defer bpool.Put(b)

_, err = b.ReadFrom(r)
if err != nil {
Expand Down
58 changes: 0 additions & 58 deletions wsjson/wsjson_js.go

This file was deleted.

6 changes: 1 addition & 5 deletions wspb/wspb.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build !js

// Package wspb provides websocket helpers for protobuf messages.
package wspb // import "nhooyr.io/websocket/wspb"

Expand Down Expand Up @@ -36,9 +34,7 @@ func read(ctx context.Context, c *websocket.Conn, v proto.Message) error {
}

b := bpool.Get()
defer func() {
bpool.Put(b)
}()
defer bpool.Put(b)

_, err = b.ReadFrom(r)
if err != nil {
Expand Down
67 changes: 0 additions & 67 deletions wspb/wspb_js.go

This file was deleted.