-
Notifications
You must be signed in to change notification settings - Fork 1k
fix(coderd): remove rate limits from agent metadata #9308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ import ( | |
"database/sql" | ||
"encoding/json" | ||
"errors" | ||
"flag" | ||
"fmt" | ||
"io" | ||
"net" | ||
|
@@ -23,10 +22,9 @@ import ( | |
|
||
"github.com/go-chi/chi/v5" | ||
"github.com/google/uuid" | ||
"golang.org/x/exp/slices" | ||
"golang.org/x/exp/maps" | ||
"golang.org/x/mod/semver" | ||
"golang.org/x/sync/errgroup" | ||
"golang.org/x/time/rate" | ||
"golang.org/x/xerrors" | ||
"nhooyr.io/websocket" | ||
"tailscale.com/tailcfg" | ||
|
@@ -39,7 +37,6 @@ import ( | |
"github.com/coder/coder/v2/coderd/httpmw" | ||
"github.com/coder/coder/v2/coderd/rbac" | ||
"github.com/coder/coder/v2/coderd/util/ptr" | ||
"github.com/coder/coder/v2/coderd/util/slice" | ||
"github.com/coder/coder/v2/codersdk" | ||
"github.com/coder/coder/v2/codersdk/agentsdk" | ||
"github.com/coder/coder/v2/tailnet" | ||
|
@@ -1524,7 +1521,11 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque | |
key := chi.URLParam(r, "key") | ||
|
||
const ( | ||
maxValueLen = 32 << 10 | ||
// maxValueLen is set to 2048 to stay under the 8000 byte Postgres | ||
// NOTIFY limit. Since both value and error can be set, the real | ||
// payload limit is 2 * 2048 * 4/3 <base64 expansion> = 5461 bytes + a few hundred bytes for JSON | ||
// syntax, key names, and metadata. | ||
maxValueLen = 2048 | ||
maxErrorLen = maxValueLen | ||
) | ||
|
||
|
@@ -1567,7 +1568,13 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque | |
slog.F("value", ellipse(datum.Value, 16)), | ||
) | ||
|
||
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key)) | ||
datumJSON, err := json.Marshal(datum) | ||
if err != nil { | ||
httpapi.InternalServerError(rw, err) | ||
return | ||
} | ||
|
||
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON) | ||
if err != nil { | ||
httpapi.InternalServerError(rw, err) | ||
return | ||
|
@@ -1593,7 +1600,42 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ | |
) | ||
) | ||
|
||
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r) | ||
// We avoid channel-based synchronization here to avoid backpressure problems. | ||
var ( | ||
metadataMapMu sync.Mutex | ||
metadataMap = make(map[string]database.WorkspaceAgentMetadatum) | ||
// pendingChanges must only be mutated when metadataMapMu is held. | ||
pendingChanges atomic.Bool | ||
) | ||
|
||
// Send metadata on updates, we must ensure subscription before sending | ||
// initial metadata to guarantee that events in-between are not missed. | ||
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) { | ||
var update database.UpdateWorkspaceAgentMetadataParams | ||
err := json.Unmarshal(byt, &update) | ||
if err != nil { | ||
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err)) | ||
return | ||
} | ||
|
||
log.Debug(ctx, "received metadata update", "key", update.Key) | ||
|
||
metadataMapMu.Lock() | ||
defer metadataMapMu.Unlock() | ||
md := metadataMap[update.Key] | ||
md.Value = update.Value | ||
md.Error = update.Error | ||
md.CollectedAt = update.CollectedAt | ||
metadataMap[update.Key] = md | ||
pendingChanges.Store(true) | ||
}) | ||
if err != nil { | ||
httpapi.InternalServerError(rw, err) | ||
return | ||
} | ||
defer cancelSub() | ||
|
||
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r) | ||
if err != nil { | ||
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ | ||
Message: "Internal error setting up server-sent events.", | ||
|
@@ -1603,97 +1645,61 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ | |
} | ||
// Prevent handler from returning until the sender is closed. | ||
defer func() { | ||
<-senderClosed | ||
<-sseSenderClosed | ||
}() | ||
|
||
const refreshInterval = time.Second * 5 | ||
refreshTicker := time.NewTicker(refreshInterval) | ||
defer refreshTicker.Stop() | ||
// We send updates exactly every second. | ||
const sendInterval = time.Second * 1 | ||
sendTicker := time.NewTicker(sendInterval) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly every second was my intention — updated the comment. |
||
defer sendTicker.Stop() | ||
|
||
var ( | ||
lastDBMetaMu sync.Mutex | ||
lastDBMeta []database.WorkspaceAgentMetadatum | ||
) | ||
// We always use the original Request context because it contains | ||
// the RBAC actor. | ||
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID) | ||
if err != nil { | ||
// If we can't successfully pull the initial metadata, pubsub | ||
// updates will be no-op so we may as well terminate the | ||
// connection early. | ||
httpapi.InternalServerError(rw, err) | ||
return | ||
} | ||
|
||
sendMetadata := func(pull bool) { | ||
log.Debug(ctx, "sending metadata update", "pull", pull) | ||
lastDBMetaMu.Lock() | ||
defer lastDBMetaMu.Unlock() | ||
metadataMapMu.Lock() | ||
for _, datum := range md { | ||
metadataMap[datum.Key] = datum | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably hold the mutex lock here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah yeah 🤦🏽♂️ |
||
} | ||
metadataMapMu.Unlock() | ||
|
||
var err error | ||
if pull { | ||
// We always use the original Request context because it contains | ||
// the RBAC actor. | ||
lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID) | ||
if err != nil { | ||
_ = sendEvent(ctx, codersdk.ServerSentEvent{ | ||
Type: codersdk.ServerSentEventTypeError, | ||
Data: codersdk.Response{ | ||
Message: "Internal error getting metadata.", | ||
Detail: err.Error(), | ||
}, | ||
}) | ||
return | ||
} | ||
slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int { | ||
return slice.Ascending(a.Key, b.Key) | ||
}) | ||
// Send initial metadata. | ||
|
||
// Avoid sending refresh if the client is about to get a | ||
// fresh update. | ||
refreshTicker.Reset(refreshInterval) | ||
} | ||
var lastSend time.Time | ||
sendMetadata := func() { | ||
metadataMapMu.Lock() | ||
values := maps.Values(metadataMap) | ||
pendingChanges.Store(false) | ||
metadataMapMu.Unlock() | ||
|
||
_ = sendEvent(ctx, codersdk.ServerSentEvent{ | ||
lastSend = time.Now() | ||
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{ | ||
Type: codersdk.ServerSentEventTypeData, | ||
Data: convertWorkspaceAgentMetadata(lastDBMeta), | ||
Data: convertWorkspaceAgentMetadata(values), | ||
}) | ||
} | ||
|
||
// Note: we previously used a debounce here, but when the rate of metadata updates was too | ||
// high the debounce would never fire. | ||
// | ||
// The rate-limit has its own caveat. If the agent sends a burst of metadata | ||
// but then goes quiet, we will never pull the new metadata and the frontend | ||
// will go stale until refresh. This would only happen if the agent was | ||
// under extreme load. Under normal operations, the interval between metadata | ||
// updates is constant so there is no burst phenomenon. | ||
pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2) | ||
if flag.Lookup("test.v") != nil { | ||
// We essentially disable the rate-limit in tests for determinism. | ||
pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100) | ||
} | ||
|
||
// Send metadata on updates, we must ensure subscription before sending | ||
// initial metadata to guarantee that events in-between are not missed. | ||
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) { | ||
allow := pubsubRatelimit.Allow() | ||
log.Debug(ctx, "received metadata update", "allow", allow) | ||
if allow { | ||
sendMetadata(true) | ||
} | ||
}) | ||
if err != nil { | ||
httpapi.InternalServerError(rw, err) | ||
return | ||
} | ||
defer cancelSub() | ||
|
||
// Send initial metadata. | ||
sendMetadata(true) | ||
sendMetadata() | ||
|
||
for { | ||
select { | ||
case <-senderClosed: | ||
case <-sendTicker.C: | ||
// We send an update even if there's no change every 5 seconds | ||
// to ensure that the frontend always has an accurate "Result.Age". | ||
if !pendingChanges.Load() && time.Since(lastSend) < time.Second*5 { | ||
continue | ||
} | ||
sendMetadata() | ||
case <-sseSenderClosed: | ||
return | ||
case <-refreshTicker.C: | ||
} | ||
|
||
// Avoid spamming the DB with reads we know there are no updates. We want | ||
// to continue sending updates to the frontend so that "Result.Age" | ||
// is always accurate. This way, the frontend doesn't need to | ||
// sync its own clock with the backend. | ||
sendMetadata(false) | ||
} | ||
} | ||
|
||
|
@@ -1717,6 +1723,10 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code | |
}, | ||
}) | ||
} | ||
// Sorting prevents the metadata from jumping around in the frontend. | ||
sort.Slice(result, func(i, j int) bool { | ||
return result[i].Description.Key < result[j].Description.Key | ||
}) | ||
return result | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how limited the pubsub capacity is wrt payload size, but since we don't control the size (of e.g. values), we'll still need a different approach to propagate the value. Not against adding a few more fields to the publish though if they're helpful.
I see a test also caught this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm I think I left a comment but it was lost.
The 32k limit was too much anyways so I dropped it down to 4k and used gob encoding to avoid inflation.