@@ -20,6 +20,7 @@ import (
20
20
"github.com/coder/coder/v2/coderd/tracing"
21
21
"github.com/coder/coder/v2/codersdk"
22
22
"github.com/coder/websocket"
23
+ "github.com/coder/websocket/wsjson"
23
24
)
24
25
25
26
var Validate * validator.Validate
@@ -285,7 +286,7 @@ type WebSocketEvent[T any] struct {
285
286
}
286
287
287
288
func OneWayWebSocket [T any ](rw http.ResponseWriter , r * http.Request ) (
288
- sendEvent func (ctx context. Context , wsEvent T ) error ,
289
+ sendEvent func (wsEvent T ) error ,
289
290
closed chan struct {},
290
291
err error ,
291
292
) {
@@ -302,17 +303,29 @@ func OneWayWebSocket[T any](rw http.ResponseWriter, r *http.Request) (
302
303
Code websocket.StatusCode
303
304
Reason string
304
305
}
306
+
307
+ eventC := make (chan T )
305
308
socketErrC := make (chan SocketError , 1 )
306
- closed = make (chan struct {}, 1 )
309
+ closed = make (chan struct {})
307
310
go func () {
308
- select {
309
- case err := <- socketErrC :
310
- _ = socket .Close (err .Code , err .Reason )
311
- case <- ctx .Done ():
312
- _ = socket .Close (websocket .StatusNormalClosure , "Connection closed" )
311
+ defer cancel ()
312
+ defer close (closed )
313
+
314
+ for {
315
+ select {
316
+ case event := <- eventC :
317
+ err := wsjson .Write (ctx , socket , event )
318
+ if err == nil {
319
+ continue
320
+ }
321
+ _ = socket .Close (websocket .StatusInternalError , "Unable to send newest message" )
322
+ case err := <- socketErrC :
323
+ _ = socket .Close (err .Code , err .Reason )
324
+ case <- ctx .Done ():
325
+ _ = socket .Close (websocket .StatusNormalClosure , "Connection closed" )
326
+ }
327
+ return
313
328
}
314
- cancel ()
315
- close (closed )
316
329
}()
317
330
318
331
// We have some tools in the UI code to help enforce one-way WebSocket
@@ -321,7 +334,7 @@ func OneWayWebSocket[T any](rw http.ResponseWriter, r *http.Request) (
321
334
// forgot to use those tools, and communication probably can't be trusted.
322
335
// Better to just close the socket and force the UI to fix its mess
323
336
go func () {
324
- msgType , _ , err := socket .Read (ctx )
337
+ _ , _ , err := socket .Read (ctx )
325
338
if errors .Is (err , context .Canceled ) {
326
339
return
327
340
}
@@ -332,33 +345,18 @@ func OneWayWebSocket[T any](rw http.ResponseWriter, r *http.Request) (
332
345
}
333
346
return
334
347
}
335
- switch msgType {
336
- case websocket .MessageBinary , websocket .MessageText :
337
- socketErrC <- SocketError {
338
- Code : websocket .StatusProtocolError ,
339
- Reason : "Clients cannot send messages for one-way WebSockets" ,
340
- }
348
+ socketErrC <- SocketError {
349
+ Code : websocket .StatusProtocolError ,
350
+ Reason : "Clients cannot send messages for one-way WebSockets" ,
341
351
}
342
352
}()
343
353
344
- sendEvent = func (context.Context , T ) error {
345
- // Using multiple selects because we want possible errors to be
346
- // processed deterministically
347
- select {
348
- case _ , open := <- ctx .Done ():
349
- if ! open {
350
- return xerrors .New ("connection closed from client" )
351
- }
352
- default :
353
- }
354
+ sendEvent = func (event T ) error {
354
355
select {
355
- case _ , open := <- closed :
356
- if ! open {
357
- return xerrors .New ("connection closed internally" )
358
- }
359
- default :
356
+ case eventC <- event :
357
+ case <- ctx .Done ():
358
+ return ctx .Err ()
360
359
}
361
-
362
360
return nil
363
361
}
364
362
0 commit comments