@@ -31,7 +31,9 @@ type Conn struct {
31
31
releaseOnClose func ()
32
32
releaseOnMessage func ()
33
33
34
- readch chan wsjs.MessageEvent
34
+ readSignal chan struct {}
35
+ readBufMu sync.Mutex
36
+ readBuf []wsjs.MessageEvent
35
37
}
36
38
37
39
func (c * Conn ) close (err error ) {
@@ -45,7 +47,7 @@ func (c *Conn) close(err error) {
45
47
46
48
func (c * Conn ) init () {
47
49
c .closed = make (chan struct {})
48
- c .readch = make (chan wsjs. MessageEvent , 1 )
50
+ c .readSignal = make (chan struct {} , 1 )
49
51
c .msgReadLimit = 32768
50
52
51
53
c .releaseOnClose = c .ws .OnClose (func (e wsjs.CloseEvent ) {
@@ -61,7 +63,16 @@ func (c *Conn) init() {
61
63
})
62
64
63
65
c .releaseOnMessage = c .ws .OnMessage (func (e wsjs.MessageEvent ) {
64
- c .readch <- e
66
+ c .readBufMu .Lock ()
67
+ defer c .readBufMu .Unlock ()
68
+
69
+ c .readBuf = append (c .readBuf , e )
70
+
71
+ // Lets the read goroutine know there is definitely something in readBuf.
72
+ select {
73
+ case c .readSignal <- struct {}{}:
74
+ default :
75
+ }
65
76
})
66
77
67
78
runtime .SetFinalizer (c , func (c * Conn ) {
@@ -89,16 +100,29 @@ func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
89
100
}
90
101
91
102
func (c * Conn ) read (ctx context.Context ) (MessageType , []byte , error ) {
92
- var me wsjs.MessageEvent
93
103
select {
94
104
case <- ctx .Done ():
95
105
c .Close (StatusPolicyViolation , "read timed out" )
96
106
return 0 , nil , ctx .Err ()
97
- case me = <- c .readch :
107
+ case <- c .readSignal :
98
108
case <- c .closed :
99
109
return 0 , nil , c .closeErr
100
110
}
101
111
112
+ c .readBufMu .Lock ()
113
+ defer c .readBufMu .Unlock ()
114
+
115
+ me := c .readBuf [0 ]
116
+ c .readBuf = c .readBuf [1 :]
117
+
118
+ if len (c .readBuf ) > 0 {
119
+ // Next time we read, we'll grab the message.
120
+ select {
121
+ case c .readSignal <- struct {}{}:
122
+ default :
123
+ }
124
+ }
125
+
102
126
switch p := me .Data .(type ) {
103
127
case string :
104
128
return MessageText , []byte (p ), nil
0 commit comments