Skip to content

Add *Conn.CloseRead for WASM #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,27 +406,6 @@ func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) {
return MessageType(h.opcode), r, nil
}

// CloseRead will start a goroutine to read from the connection until it is closed or a data message
// is received. If a data message is received, the connection will be closed with StatusPolicyViolation.
// Since CloseRead reads from the connection, it will respond to ping, pong and close frames.
// After calling this method, you cannot read any data messages from the connection.
// The returned context will be cancelled when the connection is closed.
//
// Use this when you do not want to read data messages from the connection anymore but will
// want to write messages to it.
func (c *Conn) CloseRead(ctx context.Context) context.Context {
atomic.StoreInt64(&c.readClosed, 1)

ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
// We use the unexported reader so that we don't get the read closed error.
c.reader(ctx)
c.Close(StatusPolicyViolation, "unexpected data message")
}()
return ctx
}

// messageReader enables reading a data frame from the WebSocket connection.
type messageReader struct {
c *Conn
Expand Down
6 changes: 3 additions & 3 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
//
// See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
//
// Thus the unsupported features when compiling to WASM are:
// Thus the unsupported features (not compiled in) for WASM are:
// - Accept and AcceptOptions
// - Conn's Reader, Writer, SetReadLimit, Ping methods
// - HTTPClient and HTTPHeader dial options
// - Conn's Reader, Writer, SetReadLimit and Ping methods
// - HTTPClient and HTTPHeader fields in DialOptions
//
// The *http.Response returned by Dial will always either be nil or &http.Response{} as
// we do not have access to the handshake response in the browser.
Expand Down
25 changes: 25 additions & 0 deletions netconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"math"
"net"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -159,3 +160,27 @@ func (c *netConn) SetReadDeadline(t time.Time) error {
}
return nil
}

// CloseRead will start a goroutine to read from the connection until it is closed or a data message
// is received. If a data message is received, the connection will be closed with StatusPolicyViolation.
// Since CloseRead reads from the connection, it will respond to ping, pong and close frames.
// After calling this method, you cannot read any data messages from the connection.
// The returned context will be cancelled when the connection is closed.
//
// Use this when you do not want to read data messages from the connection anymore but will
// want to write messages to it.
func (c *Conn) CloseRead(ctx context.Context) context.Context {
atomic.StoreInt64(&c.readClosed, 1)

ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
// We use the unexported reader method so that we don't get the read closed error.
c.reader(ctx)
// Either the connection is already closed since there was a read error
// or the context was cancelled or a message was read and we should close
// the connection.
c.Close(StatusPolicyViolation, "unexpected data message")
}()
return ctx
}
19 changes: 16 additions & 3 deletions websocket_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"
"runtime"
"sync"
"sync/atomic"
"syscall/js"

"nhooyr.io/websocket/internal/wsjs"
Expand All @@ -19,9 +20,10 @@ import (
type Conn struct {
ws wsjs.WebSocket

closeOnce sync.Once
closed chan struct{}
closeErr error
readClosed int64
closeOnce sync.Once
closed chan struct{}
closeErr error

releaseOnClose func()
releaseOnMessage func()
Expand Down Expand Up @@ -67,6 +69,10 @@ func (c *Conn) init() {
// Read attempts to read a message from the connection.
// The maximum time spent waiting is bounded by the context.
func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
if atomic.LoadInt64(&c.readClosed) == 1 {
return 0, nil, fmt.Errorf("websocket connection read closed")
}

typ, p, err := c.read(ctx)
if err != nil {
return 0, nil, fmt.Errorf("failed to read: %w", err)
Expand All @@ -78,6 +84,7 @@ func (c *Conn) read(ctx context.Context) (MessageType, []byte, error) {
var me wsjs.MessageEvent
select {
case <-ctx.Done():
c.Close(StatusPolicyViolation, "read timed out")
return 0, nil, ctx.Err()
case me = <-c.readch:
case <-c.closed:
Expand Down Expand Up @@ -198,6 +205,7 @@ func dial(ctx context.Context, url string, opts *DialOptions) (*Conn, *http.Resp

select {
case <-ctx.Done():
c.Close(StatusPolicyViolation, "dial timed out")
return nil, nil, ctx.Err()
case <-opench:
case <-c.closed:
Expand All @@ -215,3 +223,8 @@ func (c *netConn) netConnReader(ctx context.Context) (MessageType, io.Reader, er
}
return typ, bytes.NewReader(p), nil
}

// Only implemented for use by *Conn.CloseRead in netconn.go
func (c *Conn) reader(ctx context.Context) {
c.read(ctx)
}