Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 92 additions & 82 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"database/sql"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net"
Expand All @@ -23,10 +22,9 @@ import (

"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"golang.org/x/exp/slices"
"golang.org/x/exp/maps"
"golang.org/x/mod/semver"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"tailscale.com/tailcfg"
Expand All @@ -39,7 +37,6 @@ import (
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/coderd/util/slice"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/tailnet"
Expand Down Expand Up @@ -1524,7 +1521,11 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
key := chi.URLParam(r, "key")

const (
maxValueLen = 32 << 10
// maxValueLen is set to 2048 to stay under the 8000 byte Postgres
// NOTIFY limit. Since both value and error can be set, the real
// payload limit is 2 * 2048 * 4/3 <base64 expansion> = 5461 bytes + a few hundred bytes for JSON
// syntax, key names, and metadata.
maxValueLen = 2048
maxErrorLen = maxValueLen
)

Expand Down Expand Up @@ -1567,7 +1568,13 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
slog.F("value", ellipse(datum.Value, 16)),
)

err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key))
datumJSON, err := json.Marshal(datum)
if err != nil {
httpapi.InternalServerError(rw, err)
return
}

err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how limited the pubsub capacity is wrt payload size, but since we don't control the size (of e.g. values), we'll still need a different approach to propagate the value. Not against adding a few more fields to the publish though if they're helpful.

I see a test also caught this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I think I left a comment but it was lost.

The 32k limit was too much anyways so I dropped it down to 4k and used gob encoding to avoid inflation.

if err != nil {
httpapi.InternalServerError(rw, err)
return
Expand All @@ -1593,7 +1600,42 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
)
)

sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
// We avoid channel-based synchronization here to avoid backpressure problems.
var (
metadataMapMu sync.Mutex
metadataMap = make(map[string]database.WorkspaceAgentMetadatum)
// pendingChanges must only be mutated when metadataMapMu is held.
pendingChanges atomic.Bool
)

// Send metadata on updates, we must ensure subscription before sending
// initial metadata to guarantee that events in-between are not missed.
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
var update database.UpdateWorkspaceAgentMetadataParams
err := json.Unmarshal(byt, &update)
if err != nil {
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
return
}

log.Debug(ctx, "received metadata update", "key", update.Key)

metadataMapMu.Lock()
defer metadataMapMu.Unlock()
md := metadataMap[update.Key]
md.Value = update.Value
md.Error = update.Error
md.CollectedAt = update.CollectedAt
metadataMap[update.Key] = md
pendingChanges.Store(true)
})
if err != nil {
httpapi.InternalServerError(rw, err)
return
}
defer cancelSub()

sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error setting up server-sent events.",
Expand All @@ -1603,97 +1645,61 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
}
// Prevent handler from returning until the sender is closed.
defer func() {
<-senderClosed
<-sseSenderClosed
}()

const refreshInterval = time.Second * 5
refreshTicker := time.NewTicker(refreshInterval)
defer refreshTicker.Stop()
// We send updates exactly every second.
const sendInterval = time.Second * 1
sendTicker := time.NewTicker(sendInterval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A NewTimer might be better, coupled with Reset if we want to be literal with the comment above (at most every second). The next ticker could immediately be ready depending on how things went.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly every second was my intention — updated the comment.

defer sendTicker.Stop()

var (
lastDBMetaMu sync.Mutex
lastDBMeta []database.WorkspaceAgentMetadatum
)
// We always use the original Request context because it contains
// the RBAC actor.
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
if err != nil {
// If we can't successfully pull the initial metadata, pubsub
// updates will be no-op so we may as well terminate the
// connection early.
httpapi.InternalServerError(rw, err)
return
}

sendMetadata := func(pull bool) {
log.Debug(ctx, "sending metadata update", "pull", pull)
lastDBMetaMu.Lock()
defer lastDBMetaMu.Unlock()
metadataMapMu.Lock()
for _, datum := range md {
metadataMap[datum.Key] = datum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably hold the mutex lock here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah 🤦🏽‍♂️

}
metadataMapMu.Unlock()

var err error
if pull {
// We always use the original Request context because it contains
// the RBAC actor.
lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
if err != nil {
_ = sendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeError,
Data: codersdk.Response{
Message: "Internal error getting metadata.",
Detail: err.Error(),
},
})
return
}
slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int {
return slice.Ascending(a.Key, b.Key)
})
// Send initial metadata.

// Avoid sending refresh if the client is about to get a
// fresh update.
refreshTicker.Reset(refreshInterval)
}
var lastSend time.Time
sendMetadata := func() {
metadataMapMu.Lock()
values := maps.Values(metadataMap)
pendingChanges.Store(false)
metadataMapMu.Unlock()

_ = sendEvent(ctx, codersdk.ServerSentEvent{
lastSend = time.Now()
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspaceAgentMetadata(lastDBMeta),
Data: convertWorkspaceAgentMetadata(values),
})
}

// Note: we previously used a debounce here, but when the rate of metadata updates was too
// high the debounce would never fire.
//
// The rate-limit has its own caveat. If the agent sends a burst of metadata
// but then goes quiet, we will never pull the new metadata and the frontend
// will go stale until refresh. This would only happen if the agent was
// under extreme load. Under normal operations, the interval between metadata
// updates is constant so there is no burst phenomenon.
pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2)
if flag.Lookup("test.v") != nil {
// We essentially disable the rate-limit in tests for determinism.
pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100)
}

// Send metadata on updates, we must ensure subscription before sending
// initial metadata to guarantee that events in-between are not missed.
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
allow := pubsubRatelimit.Allow()
log.Debug(ctx, "received metadata update", "allow", allow)
if allow {
sendMetadata(true)
}
})
if err != nil {
httpapi.InternalServerError(rw, err)
return
}
defer cancelSub()

// Send initial metadata.
sendMetadata(true)
sendMetadata()

for {
select {
case <-senderClosed:
case <-sendTicker.C:
// We send an update even if there's no change every 5 seconds
// to ensure that the frontend always has an accurate "Result.Age".
if !pendingChanges.Load() && time.Since(lastSend) < time.Second*5 {
continue
}
sendMetadata()
case <-sseSenderClosed:
return
case <-refreshTicker.C:
}

// Avoid spamming the DB with reads we know there are no updates. We want
// to continue sending updates to the frontend so that "Result.Age"
// is always accurate. This way, the frontend doesn't need to
// sync its own clock with the backend.
sendMetadata(false)
}
}

Expand All @@ -1717,6 +1723,10 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code
},
})
}
// Sorting prevents the metadata from jumping around in the frontend.
sort.Slice(result, func(i, j int) bool {
return result[i].Description.Key < result[j].Description.Key
})
return result
}

Expand Down
2 changes: 1 addition & 1 deletion coderd/workspaceagents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
require.Len(t, update, 3)
check(wantMetadata1, update[0], true)

const maxValueLen = 32 << 10
const maxValueLen = 2048
tooLongValueMetadata := wantMetadata1
tooLongValueMetadata.Value = strings.Repeat("a", maxValueLen*2)
tooLongValueMetadata.Error = ""
Expand Down