Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 43 additions & 26 deletions coderd/httpapi/httpapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"reflect"
"strings"
"sync"
"time"

"github.com/go-playground/validator/v10"
"golang.org/x/xerrors"

"github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/codersdk"
Expand Down Expand Up @@ -174,8 +173,7 @@ func WebsocketCloseSprintf(format string, vars ...any) string {
return msg
}

func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (func(ctx context.Context, sse codersdk.ServerSentEvent) error, error) {
var mu sync.Mutex
func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent func(ctx context.Context, sse codersdk.ServerSentEvent) error, closed chan struct{}, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we can eliminate the error return from this as it's never set to a non-nil value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really relevant for this PR, but that's a good point. I'd like to see it kept and remove the panic in favor of an error, though.

h := rw.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache")
Expand All @@ -187,37 +185,50 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (func(ctx co
panic("http.ResponseWriter is not http.Flusher")
}

// Send a heartbeat every 15 seconds to avoid the connection being killed.
closed = make(chan struct{})
type sseEvent struct {
payload []byte
errC chan error
}
eventC := make(chan sseEvent)

// Synchronized handling of events (no guarantee of order).
go func() {
defer close(closed)

// Send a heartbeat every 15 seconds to avoid the connection being killed.
ticker := time.NewTicker(time.Second * 15)
defer ticker.Stop()

for {
var event sseEvent

select {
case <-r.Context().Done():
return
Comment on lines 207 to 208
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this prevent the write/flush after finish? We're not hijacking the connection so the context should still be cancelled on finish right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this code had the following race condition:

  1. Say watchWorkspace encountered an error and exited (this begins the process of all handlers and middleware exiting (r.Context() is not yet cancelled)
  2. Timer is triggered
  3. We write to rw which still succeeds as the teardown is still happening
  4. Teardown finishes, context is cancelled, but we're currently not listening to this signal
  5. We hit Flush after teardown so we get a nil pointer deref

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome explanation thank you

case event = <-eventC:
case <-ticker.C:
mu.Lock()
_, err := io.WriteString(rw, fmt.Sprintf("event: %s\n\n", codersdk.ServerSentEventTypePing))
if err != nil {
mu.Unlock()
return
event = sseEvent{
payload: []byte(fmt.Sprintf("event: %s\n\n", codersdk.ServerSentEventTypePing)),
}
f.Flush()
mu.Unlock()
}
}
}()

sendEvent := func(ctx context.Context, sse codersdk.ServerSentEvent) error {
if ctx.Err() != nil {
return ctx.Err()
_, err := rw.Write(event.payload)
if event.errC != nil {
event.errC <- err
}
if err != nil {
return
}
f.Flush()
}
}()

sendEvent = func(ctx context.Context, sse codersdk.ServerSentEvent) error {
buf := &bytes.Buffer{}
enc := json.NewEncoder(buf)

_, err := buf.Write([]byte(fmt.Sprintf("event: %s\ndata: ", sse.Type)))
_, err := buf.WriteString(fmt.Sprintf("event: %s\ndata: ", sse.Type))
if err != nil {
return err
}
Expand All @@ -232,16 +243,22 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (func(ctx co
return err
}

mu.Lock()
defer mu.Unlock()
_, err = rw.Write(buf.Bytes())
if err != nil {
return err
event := sseEvent{
payload: buf.Bytes(),
errC: make(chan error, 1),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you have to move the writing to the goroutine to fix the bug or is this just a separate change you did? The old flow was simpler to understand as it didn't involve extra write channels and response channels, but if it helps to fix the bug then 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not mandator per-se, but I think we'd want do it if we service-ified this implementation anyway. That said, asynchronous code will always introduce complexity somewhere, this change moves the core logic into one place which IMO simplifies it. It's easier to reason about where and in what order mutations/writes are happening.

This change also has the benefit of allowing us to listen to both r.Context() and ctx completion which we didn't before, which is more true to the given API (function signature).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, forgot to mention that this logic change also stops writes after an error is encountered, allowing the caller (if errors are checked) to stop using the sender.

Previously we exited the goroutine on write error, meaning keepalives were no longer sent, so it makes sense to disable writes at this point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The asynchronous logic introduces channel reads which can result in the goroutine hanging waiting for data indefinitely. For example, if you wrote an event to the events channel and the sender goroutine got closed before reading that event, you would be permanently stuck waiting for a response from the response channel as far as I can tell.

I think this logic should be kept the way it was to avoid the complexities and extra bugs introduced by having it be asynchronous.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait nevermind I forgot it's an unbuffered channel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case in point it's hard to reason with this logic since it's a chain of passing messages around when it could just be a simple write to writer. The goroutine could be refactored to use this same function for writing messages so the safety checks are all in the same place and duplication is reduced

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why that's a problem? The previous implementation was already asynchronous and required mutex locking and taking care not to use the resources after close. Here I've fixed those issues and ensured there are no hangs by listening on e.g. <-closed which is closed when there no longer is a listener for events. If you're considering a future developer making changes, I'd venture those changes would be "dangerous" in the previous implementation as well?

A motivation to keep this change is that AFAIK we'll be using SSEs more in the future, and we'll want to send different types of events on a single channel (due to browser limitations). At this point we'll need a service (sendEvent called from multiple places) and at that point the previous implementation of sendEvent becomes unsafe (same bug we're fixing here).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're rewriting this soon like you say then I think this is fine. I still feel like a safe implementation can be achieved in a less complicated way and that we should investigate it in the future, but I won't block your PR for it since this is to fix a panic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly don't know when we're rewriting/refactoring this, @f0ssel may have some idea. Personally I don't feel channels are any more dangerous than mutexes (you have deadlocks and missing unlocks to think about there), but I guess this is in the eye of the beholder. I'll certainly consider your input if I'm the one rewriting this 👍🏻.

f.Flush()

return nil
select {
case <-r.Context().Done():
return r.Context().Err()
case <-ctx.Done():
return ctx.Err()
case eventC <- event:
return <-event.errC
case <-closed:
return xerrors.New("server sent event sender closed")
}
}

return sendEvent, nil
return sendEvent, closed, nil
}
8 changes: 7 additions & 1 deletion coderd/workspaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,14 +867,18 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
return
}

sendEvent, err := httpapi.ServerSentEventSender(rw, r)
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error setting up server-sent events.",
Detail: err.Error(),
})
return
}
// Prevent handler from returning until the sender is closed.
defer func() {
<-senderClosed
}()

// Ignore all trace spans after this, they're not too useful.
ctx = trace.ContextWithSpan(ctx, tracing.NoopSpan)
Expand All @@ -885,6 +889,8 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
select {
case <-ctx.Done():
return
case <-senderClosed:
return
case <-t.C:
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
if err != nil {
Expand Down