Skip to content

Commit db448d7

Browse files
committed
Merge branch 'mes/one-way-ws-01' into mes/one-way-ws-02
2 parents 0824dd4 + bcd1429 commit db448d7

File tree

4 files changed

+216
-54
lines changed

4 files changed

+216
-54
lines changed

coderd/httpapi/httpapi.go

+11-22
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func WebsocketCloseSprintf(format string, vars ...any) string {
285285
return msg
286286
}
287287

288-
type InitializeConnectionCallback func(rw http.ResponseWriter, r *http.Request) (
288+
type EventSender func(rw http.ResponseWriter, r *http.Request) (
289289
sendEvent func(sse codersdk.ServerSentEvent) error,
290290
done <-chan struct{},
291291
err error,
@@ -326,16 +326,13 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (
326326
// Synchronized handling of events (no guarantee of order).
327327
go func() {
328328
defer close(closed)
329-
330-
// Send a heartbeat every 15 seconds to avoid the connection being killed.
331-
ticker := time.NewTicker(time.Second * 15)
329+
ticker := time.NewTicker(HeartbeatInterval)
332330
defer ticker.Stop()
333331

334332
for {
335333
var event sseEvent
336-
337334
select {
338-
case <-r.Context().Done():
335+
case <-ctx.Done():
339336
return
340337
case event = <-eventC:
341338
case <-ticker.C:
@@ -357,8 +354,6 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (
357354

358355
sendEvent := func(newEvent codersdk.ServerSentEvent) error {
359356
buf := &bytes.Buffer{}
360-
enc := json.NewEncoder(buf)
361-
362357
_, err := buf.WriteString(fmt.Sprintf("event: %s\n", newEvent.Type))
363358
if err != nil {
364359
return err
@@ -369,6 +364,8 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (
369364
if err != nil {
370365
return err
371366
}
367+
368+
enc := json.NewEncoder(buf)
372369
err = enc.Encode(newEvent.Data)
373370
if err != nil {
374371
return err
@@ -386,8 +383,6 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (
386383
}
387384

388385
select {
389-
case <-r.Context().Done():
390-
return r.Context().Err()
391386
case <-ctx.Done():
392387
return ctx.Err()
393388
case <-closed:
@@ -397,8 +392,6 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (
397392
// for early exit. We don't check closed here because it
398393
// can't happen while processing the event.
399394
select {
400-
case <-r.Context().Done():
401-
return r.Context().Err()
402395
case <-ctx.Done():
403396
return ctx.Err()
404397
case err := <-event.errC:
@@ -410,8 +403,8 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (
410403
return sendEvent, closed, nil
411404
}
412405

413-
// OneWayWebSocket establishes a new WebSocket connection that enforces one-way
414-
// communication from the server to the client.
406+
// OneWayWebSocketEventSender establishes a new WebSocket connection that
407+
// enforces one-way communication from the server to the client.
415408
//
416409
// The function returned allows you to send a single message to the client,
417410
// while the channel lets you listen for when the connection closes.
@@ -422,7 +415,7 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (
422415
// open a workspace in multiple tabs, the entire UI can start to lock up.
423416
// WebSockets have no such limitation, no matter what HTTP protocol was used to
424417
// establish the connection.
425-
func OneWayWebSocket(rw http.ResponseWriter, r *http.Request) (
418+
func OneWayWebSocketEventSender(rw http.ResponseWriter, r *http.Request) (
426419
func(event codersdk.ServerSentEvent) error,
427420
<-chan struct{},
428421
error,
@@ -436,12 +429,8 @@ func OneWayWebSocket(rw http.ResponseWriter, r *http.Request) (
436429
}
437430
go Heartbeat(ctx, socket)
438431

439-
type SocketError struct {
440-
Code websocket.StatusCode
441-
Reason string
442-
}
443432
eventC := make(chan codersdk.ServerSentEvent)
444-
socketErrC := make(chan SocketError, 1)
433+
socketErrC := make(chan websocket.CloseError, 1)
445434
closed := make(chan struct{})
446435
go func() {
447436
defer cancel()
@@ -477,13 +466,13 @@ func OneWayWebSocket(rw http.ResponseWriter, r *http.Request) (
477466
return
478467
}
479468
if err != nil {
480-
socketErrC <- SocketError{
469+
socketErrC <- websocket.CloseError{
481470
Code: websocket.StatusInternalError,
482471
Reason: "Unable to process invalid message from client",
483472
}
484473
return
485474
}
486-
socketErrC <- SocketError{
475+
socketErrC <- websocket.CloseError{
487476
Code: websocket.StatusProtocolError,
488477
Reason: "Clients cannot send messages for one-way WebSockets",
489478
}

0 commit comments

Comments
 (0)