Skip to content

Commit 630ec55

Browse files
authored
fix(coderd): remove rate limits from agent metadata (coder#9308)
Include the full update message in the PubSub notification so that we don't have to refresh metadata from the DB and can avoid rate limiting.
1 parent 7f14b50 commit 630ec55

File tree

2 files changed

+93
-83
lines changed

2 files changed

+93
-83
lines changed

coderd/workspaceagents.go

+92-82
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"database/sql"
77
"encoding/json"
88
"errors"
9-
"flag"
109
"fmt"
1110
"io"
1211
"net"
@@ -23,10 +22,9 @@ import (
2322

2423
"github.com/go-chi/chi/v5"
2524
"github.com/google/uuid"
26-
"golang.org/x/exp/slices"
25+
"golang.org/x/exp/maps"
2726
"golang.org/x/mod/semver"
2827
"golang.org/x/sync/errgroup"
29-
"golang.org/x/time/rate"
3028
"golang.org/x/xerrors"
3129
"nhooyr.io/websocket"
3230
"tailscale.com/tailcfg"
@@ -39,7 +37,6 @@ import (
3937
"github.com/coder/coder/v2/coderd/httpmw"
4038
"github.com/coder/coder/v2/coderd/rbac"
4139
"github.com/coder/coder/v2/coderd/util/ptr"
42-
"github.com/coder/coder/v2/coderd/util/slice"
4340
"github.com/coder/coder/v2/codersdk"
4441
"github.com/coder/coder/v2/codersdk/agentsdk"
4542
"github.com/coder/coder/v2/tailnet"
@@ -1528,7 +1525,11 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15281525
key := chi.URLParam(r, "key")
15291526

15301527
const (
1531-
maxValueLen = 32 << 10
1528+
// maxValueLen is set to 2048 to stay under the 8000 byte Postgres
1529+
// NOTIFY limit. Since both value and error can be set, the real
1530+
// payload limit is 2 * 2048 * 4/3 <base64 expansion> = 5461 bytes + a few hundred bytes for JSON
1531+
// syntax, key names, and metadata.
1532+
maxValueLen = 2048
15321533
maxErrorLen = maxValueLen
15331534
)
15341535

@@ -1571,7 +1572,13 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15711572
slog.F("value", ellipse(datum.Value, 16)),
15721573
)
15731574

1574-
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key))
1575+
datumJSON, err := json.Marshal(datum)
1576+
if err != nil {
1577+
httpapi.InternalServerError(rw, err)
1578+
return
1579+
}
1580+
1581+
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON)
15751582
if err != nil {
15761583
httpapi.InternalServerError(rw, err)
15771584
return
@@ -1597,7 +1604,42 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
15971604
)
15981605
)
15991606

1600-
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
1607+
// We avoid channel-based synchronization here to avoid backpressure problems.
1608+
var (
1609+
metadataMapMu sync.Mutex
1610+
metadataMap = make(map[string]database.WorkspaceAgentMetadatum)
1611+
// pendingChanges must only be mutated when metadataMapMu is held.
1612+
pendingChanges atomic.Bool
1613+
)
1614+
1615+
// Send metadata on updates, we must ensure subscription before sending
1616+
// initial metadata to guarantee that events in-between are not missed.
1617+
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
1618+
var update database.UpdateWorkspaceAgentMetadataParams
1619+
err := json.Unmarshal(byt, &update)
1620+
if err != nil {
1621+
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
1622+
return
1623+
}
1624+
1625+
log.Debug(ctx, "received metadata update", "key", update.Key)
1626+
1627+
metadataMapMu.Lock()
1628+
defer metadataMapMu.Unlock()
1629+
md := metadataMap[update.Key]
1630+
md.Value = update.Value
1631+
md.Error = update.Error
1632+
md.CollectedAt = update.CollectedAt
1633+
metadataMap[update.Key] = md
1634+
pendingChanges.Store(true)
1635+
})
1636+
if err != nil {
1637+
httpapi.InternalServerError(rw, err)
1638+
return
1639+
}
1640+
defer cancelSub()
1641+
1642+
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
16011643
if err != nil {
16021644
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
16031645
Message: "Internal error setting up server-sent events.",
@@ -1607,97 +1649,61 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16071649
}
16081650
// Prevent handler from returning until the sender is closed.
16091651
defer func() {
1610-
<-senderClosed
1652+
<-sseSenderClosed
16111653
}()
16121654

1613-
const refreshInterval = time.Second * 5
1614-
refreshTicker := time.NewTicker(refreshInterval)
1615-
defer refreshTicker.Stop()
1655+
// We send updates exactly every second.
1656+
const sendInterval = time.Second * 1
1657+
sendTicker := time.NewTicker(sendInterval)
1658+
defer sendTicker.Stop()
16161659

1617-
var (
1618-
lastDBMetaMu sync.Mutex
1619-
lastDBMeta []database.WorkspaceAgentMetadatum
1620-
)
1660+
// We always use the original Request context because it contains
1661+
// the RBAC actor.
1662+
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
1663+
if err != nil {
1664+
// If we can't successfully pull the initial metadata, pubsub
1665+
// updates will be no-op so we may as well terminate the
1666+
// connection early.
1667+
httpapi.InternalServerError(rw, err)
1668+
return
1669+
}
16211670

1622-
sendMetadata := func(pull bool) {
1623-
log.Debug(ctx, "sending metadata update", "pull", pull)
1624-
lastDBMetaMu.Lock()
1625-
defer lastDBMetaMu.Unlock()
1671+
metadataMapMu.Lock()
1672+
for _, datum := range md {
1673+
metadataMap[datum.Key] = datum
1674+
}
1675+
metadataMapMu.Unlock()
16261676

1627-
var err error
1628-
if pull {
1629-
// We always use the original Request context because it contains
1630-
// the RBAC actor.
1631-
lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
1632-
if err != nil {
1633-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1634-
Type: codersdk.ServerSentEventTypeError,
1635-
Data: codersdk.Response{
1636-
Message: "Internal error getting metadata.",
1637-
Detail: err.Error(),
1638-
},
1639-
})
1640-
return
1641-
}
1642-
slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int {
1643-
return slice.Ascending(a.Key, b.Key)
1644-
})
1677+
// Send initial metadata.
16451678

1646-
// Avoid sending refresh if the client is about to get a
1647-
// fresh update.
1648-
refreshTicker.Reset(refreshInterval)
1649-
}
1679+
var lastSend time.Time
1680+
sendMetadata := func() {
1681+
metadataMapMu.Lock()
1682+
values := maps.Values(metadataMap)
1683+
pendingChanges.Store(false)
1684+
metadataMapMu.Unlock()
16501685

1651-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1686+
lastSend = time.Now()
1687+
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
16521688
Type: codersdk.ServerSentEventTypeData,
1653-
Data: convertWorkspaceAgentMetadata(lastDBMeta),
1689+
Data: convertWorkspaceAgentMetadata(values),
16541690
})
16551691
}
16561692

1657-
// Note: we previously used a debounce here, but when the rate of metadata updates was too
1658-
// high the debounce would never fire.
1659-
//
1660-
// The rate-limit has its own caveat. If the agent sends a burst of metadata
1661-
// but then goes quiet, we will never pull the new metadata and the frontend
1662-
// will go stale until refresh. This would only happen if the agent was
1663-
// under extreme load. Under normal operations, the interval between metadata
1664-
// updates is constant so there is no burst phenomenon.
1665-
pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2)
1666-
if flag.Lookup("test.v") != nil {
1667-
// We essentially disable the rate-limit in tests for determinism.
1668-
pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100)
1669-
}
1670-
1671-
// Send metadata on updates, we must ensure subscription before sending
1672-
// initial metadata to guarantee that events in-between are not missed.
1673-
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
1674-
allow := pubsubRatelimit.Allow()
1675-
log.Debug(ctx, "received metadata update", "allow", allow)
1676-
if allow {
1677-
sendMetadata(true)
1678-
}
1679-
})
1680-
if err != nil {
1681-
httpapi.InternalServerError(rw, err)
1682-
return
1683-
}
1684-
defer cancelSub()
1685-
1686-
// Send initial metadata.
1687-
sendMetadata(true)
1693+
sendMetadata()
16881694

16891695
for {
16901696
select {
1691-
case <-senderClosed:
1697+
case <-sendTicker.C:
1698+
// We send an update even if there's no change every 5 seconds
1699+
// to ensure that the frontend always has an accurate "Result.Age".
1700+
if !pendingChanges.Load() && time.Since(lastSend) < time.Second*5 {
1701+
continue
1702+
}
1703+
sendMetadata()
1704+
case <-sseSenderClosed:
16921705
return
1693-
case <-refreshTicker.C:
16941706
}
1695-
1696-
// Avoid spamming the DB with reads we know there are no updates. We want
1697-
// to continue sending updates to the frontend so that "Result.Age"
1698-
// is always accurate. This way, the frontend doesn't need to
1699-
// sync its own clock with the backend.
1700-
sendMetadata(false)
17011707
}
17021708
}
17031709

@@ -1721,6 +1727,10 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code
17211727
},
17221728
})
17231729
}
1730+
// Sorting prevents the metadata from jumping around in the frontend.
1731+
sort.Slice(result, func(i, j int) bool {
1732+
return result[i].Description.Key < result[j].Description.Key
1733+
})
17241734
return result
17251735
}
17261736

coderd/workspaceagents_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
11531153
require.Len(t, update, 3)
11541154
check(wantMetadata1, update[0], true)
11551155

1156-
const maxValueLen = 32 << 10
1156+
const maxValueLen = 2048
11571157
tooLongValueMetadata := wantMetadata1
11581158
tooLongValueMetadata.Value = strings.Repeat("a", maxValueLen*2)
11591159
tooLongValueMetadata.Error = ""

0 commit comments

Comments
 (0)