Skip to content

Commit 5065455

Browse files
committed
chore: delete ServerSentEventSender
1 parent 0eba148 commit 5065455

File tree

3 files changed

+16
-124
lines changed

3 files changed

+16
-124
lines changed

coderd/httpapi/httpapi.go

Lines changed: 0 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package httpapi
22

33
import (
4-
"bytes"
54
"context"
65
"database/sql"
76
"encoding/json"
@@ -11,7 +10,6 @@ import (
1110
"net/http"
1211
"reflect"
1312
"strings"
14-
"time"
1513

1614
"github.com/go-playground/validator/v10"
1715
"golang.org/x/xerrors"
@@ -358,109 +356,3 @@ func OneWayWebSocket[T any](rw http.ResponseWriter, r *http.Request) (
358356

359357
return sendEvent, closed, nil
360358
}
361-
362-
func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent func(ctx context.Context, sse codersdk.ServerSentEvent) error, closed chan struct{}, err error) {
363-
h := rw.Header()
364-
h.Set("Content-Type", "text/event-stream")
365-
h.Set("Cache-Control", "no-cache")
366-
h.Set("Connection", "keep-alive")
367-
h.Set("X-Accel-Buffering", "no")
368-
369-
f, ok := rw.(http.Flusher)
370-
if !ok {
371-
panic("http.ResponseWriter is not http.Flusher")
372-
}
373-
374-
closed = make(chan struct{})
375-
type sseEvent struct {
376-
payload []byte
377-
errC chan error
378-
}
379-
eventC := make(chan sseEvent)
380-
381-
// Synchronized handling of events (no guarantee of order).
382-
go func() {
383-
defer close(closed)
384-
385-
// Send a heartbeat every 15 seconds to avoid the connection being killed.
386-
ticker := time.NewTicker(time.Second * 15)
387-
defer ticker.Stop()
388-
389-
for {
390-
var event sseEvent
391-
392-
select {
393-
case <-r.Context().Done():
394-
return
395-
case event = <-eventC:
396-
case <-ticker.C:
397-
event = sseEvent{
398-
payload: []byte(fmt.Sprintf("event: %s\n\n", codersdk.ServerSentEventTypePing)),
399-
}
400-
}
401-
402-
_, err := rw.Write(event.payload)
403-
if event.errC != nil {
404-
event.errC <- err
405-
}
406-
if err != nil {
407-
return
408-
}
409-
f.Flush()
410-
}
411-
}()
412-
413-
sendEvent = func(ctx context.Context, sse codersdk.ServerSentEvent) error {
414-
buf := &bytes.Buffer{}
415-
enc := json.NewEncoder(buf)
416-
417-
_, err := buf.WriteString(fmt.Sprintf("event: %s\n", sse.Type))
418-
if err != nil {
419-
return err
420-
}
421-
422-
if sse.Data != nil {
423-
_, err = buf.WriteString("data: ")
424-
if err != nil {
425-
return err
426-
}
427-
err = enc.Encode(sse.Data)
428-
if err != nil {
429-
return err
430-
}
431-
}
432-
433-
err = buf.WriteByte('\n')
434-
if err != nil {
435-
return err
436-
}
437-
438-
event := sseEvent{
439-
payload: buf.Bytes(),
440-
errC: make(chan error, 1), // Buffered to prevent deadlock.
441-
}
442-
443-
select {
444-
case <-r.Context().Done():
445-
return r.Context().Err()
446-
case <-ctx.Done():
447-
return ctx.Err()
448-
case <-closed:
449-
return xerrors.New("server sent event sender closed")
450-
case eventC <- event:
451-
// Re-check closure signals after sending the event to allow
452-
// for early exit. We don't check closed here because it
453-
// can't happen while processing the event.
454-
select {
455-
case <-r.Context().Done():
456-
return r.Context().Err()
457-
case <-ctx.Done():
458-
return ctx.Err()
459-
case err := <-event.errC:
460-
return err
461-
}
462-
}
463-
}
464-
465-
return sendEvent, closed, nil
466-
}

coderd/workspaceagents.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,7 +1159,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
11591159
//nolint:ineffassign // Release memory.
11601160
initialMD = nil
11611161

1162-
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
1162+
sendWsEvent, wsClosed, err := httpapi.OneWayWebSocket[codersdk.ServerSentEvent](rw, r)
11631163
if err != nil {
11641164
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
11651165
Message: "Internal error setting up server-sent events.",
@@ -1170,14 +1170,14 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
11701170
// Prevent handler from returning until the sender is closed.
11711171
defer func() {
11721172
cancel()
1173-
<-sseSenderClosed
1173+
<-wsClosed
11741174
}()
11751175
// Synchronize cancellation from SSE -> context, this lets us simplify the
11761176
// cancellation logic.
11771177
go func() {
11781178
select {
11791179
case <-ctx.Done():
1180-
case <-sseSenderClosed:
1180+
case <-wsClosed:
11811181
cancel()
11821182
}
11831183
}()
@@ -1189,7 +1189,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
11891189

11901190
log.Debug(ctx, "sending metadata", "num", len(values))
11911191

1192-
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
1192+
_ = sendWsEvent(codersdk.ServerSentEvent{
11931193
Type: codersdk.ServerSentEventTypeData,
11941194
Data: convertWorkspaceAgentMetadata(values),
11951195
})
@@ -1221,7 +1221,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
12211221
if err != nil {
12221222
if !database.IsQueryCanceledError(err) {
12231223
log.Error(ctx, "failed to get metadata", slog.Error(err))
1224-
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
1224+
_ = sendWsEvent(codersdk.ServerSentEvent{
12251225
Type: codersdk.ServerSentEventTypeError,
12261226
Data: codersdk.Response{
12271227
Message: "Failed to get metadata.",

coderd/workspaces.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,7 +1723,7 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
17231723
workspace := httpmw.WorkspaceParam(r)
17241724
apiKey := httpmw.APIKey(r)
17251725

1726-
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
1726+
sendWsEvent, wsClosed, err := httpapi.OneWayWebSocket[codersdk.ServerSentEvent](rw, r)
17271727
if err != nil {
17281728
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
17291729
Message: "Internal error setting up server-sent events.",
@@ -1733,13 +1733,13 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
17331733
}
17341734
// Prevent handler from returning until the sender is closed.
17351735
defer func() {
1736-
<-senderClosed
1736+
<-wsClosed
17371737
}()
17381738

17391739
sendUpdate := func(_ context.Context, _ []byte) {
17401740
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
17411741
if err != nil {
1742-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1742+
_ = sendWsEvent(codersdk.ServerSentEvent{
17431743
Type: codersdk.ServerSentEventTypeError,
17441744
Data: codersdk.Response{
17451745
Message: "Internal error fetching workspace.",
@@ -1751,7 +1751,7 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
17511751

17521752
data, err := api.workspaceData(ctx, []database.Workspace{workspace})
17531753
if err != nil {
1754-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1754+
_ = sendWsEvent(codersdk.ServerSentEvent{
17551755
Type: codersdk.ServerSentEventTypeError,
17561756
Data: codersdk.Response{
17571757
Message: "Internal error fetching workspace data.",
@@ -1761,7 +1761,7 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
17611761
return
17621762
}
17631763
if len(data.templates) == 0 {
1764-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1764+
_ = sendWsEvent(codersdk.ServerSentEvent{
17651765
Type: codersdk.ServerSentEventTypeError,
17661766
Data: codersdk.Response{
17671767
Message: "Forbidden reading template of selected workspace.",
@@ -1778,15 +1778,15 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
17781778
api.Options.AllowWorkspaceRenames,
17791779
)
17801780
if err != nil {
1781-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1781+
_ = sendWsEvent(codersdk.ServerSentEvent{
17821782
Type: codersdk.ServerSentEventTypeError,
17831783
Data: codersdk.Response{
17841784
Message: "Internal error converting workspace.",
17851785
Detail: err.Error(),
17861786
},
17871787
})
17881788
}
1789-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1789+
_ = sendWsEvent(codersdk.ServerSentEvent{
17901790
Type: codersdk.ServerSentEventTypeData,
17911791
Data: w,
17921792
})
@@ -1804,7 +1804,7 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
18041804
sendUpdate(ctx, nil)
18051805
}))
18061806
if err != nil {
1807-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1807+
_ = sendWsEvent(codersdk.ServerSentEvent{
18081808
Type: codersdk.ServerSentEventTypeError,
18091809
Data: codersdk.Response{
18101810
Message: "Internal error subscribing to workspace events.",
@@ -1818,7 +1818,7 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
18181818
// This is required to show whether the workspace is up-to-date.
18191819
cancelTemplateSubscribe, err := api.Pubsub.Subscribe(watchTemplateChannel(workspace.TemplateID), sendUpdate)
18201820
if err != nil {
1821-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1821+
_ = sendWsEvent(codersdk.ServerSentEvent{
18221822
Type: codersdk.ServerSentEventTypeError,
18231823
Data: codersdk.Response{
18241824
Message: "Internal error subscribing to template events.",
@@ -1831,7 +1831,7 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
18311831

18321832
// An initial ping signals to the request that the server is now ready
18331833
// and the client can begin servicing a channel with data.
1834-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1834+
_ = sendWsEvent(codersdk.ServerSentEvent{
18351835
Type: codersdk.ServerSentEventTypePing,
18361836
})
18371837
// Send updated workspace info after connection is established. This avoids
@@ -1842,7 +1842,7 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
18421842
select {
18431843
case <-ctx.Done():
18441844
return
1845-
case <-senderClosed:
1845+
case <-wsClosed:
18461846
return
18471847
}
18481848
}

0 commit comments

Comments
 (0)