Skip to content

Commit ae56de6

Browse files
committed
Ensure message order with a buffer
1 parent 480d0eb commit ae56de6

File tree

1 file changed

+29
-5
lines changed

1 file changed

+29
-5
lines changed

websocket_js.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ type Conn struct {
3131
releaseOnClose func()
3232
releaseOnMessage func()
3333

34-
readch chan wsjs.MessageEvent
34+
readSignal chan struct{}
35+
readBufMu sync.Mutex
36+
readBuf []wsjs.MessageEvent
3537
}
3638

3739
func (c *Conn) close(err error) {
@@ -45,7 +47,7 @@ func (c *Conn) close(err error) {
4547

4648
func (c *Conn) init() {
4749
c.closed = make(chan struct{})
48-
c.readch = make(chan wsjs.MessageEvent, 1)
50+
c.readSignal = make(chan struct{}, 1)
4951
c.msgReadLimit = 32768
5052

5153
c.releaseOnClose = c.ws.OnClose(func(e wsjs.CloseEvent) {
@@ -61,7 +63,16 @@ func (c *Conn) init() {
6163
})
6264

6365
c.releaseOnMessage = c.ws.OnMessage(func(e wsjs.MessageEvent) {
64-
c.readch <- e
66+
c.readBufMu.Lock()
67+
defer c.readBufMu.Unlock()
68+
69+
c.readBuf = append(c.readBuf, e)
70+
71+
// Lets the read goroutine know there is definitely something in readBuf.
72+
select {
73+
case c.readSignal <- struct{}{}:
74+
default:
75+
}
6576
})
6677

6778
runtime.SetFinalizer(c, func(c *Conn) {
@@ -89,16 +100,29 @@ func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
89100
}
90101

91102
func (c *Conn) read(ctx context.Context) (MessageType, []byte, error) {
92-
var me wsjs.MessageEvent
93103
select {
94104
case <-ctx.Done():
95105
c.Close(StatusPolicyViolation, "read timed out")
96106
return 0, nil, ctx.Err()
97-
case me = <-c.readch:
107+
case <-c.readSignal:
98108
case <-c.closed:
99109
return 0, nil, c.closeErr
100110
}
101111

112+
c.readBufMu.Lock()
113+
defer c.readBufMu.Unlock()
114+
115+
me := c.readBuf[0]
116+
c.readBuf = c.readBuf[1:]
117+
118+
if len(c.readBuf) > 0 {
119+
// Next time we read, we'll grab the message.
120+
select {
121+
case c.readSignal <- struct{}{}:
122+
default:
123+
}
124+
}
125+
102126
switch p := me.Data.(type) {
103127
case string:
104128
return MessageText, []byte(p), nil

0 commit comments

Comments
 (0)