Skip to content

Commit b40ed05

Browse files
committed
fix(coderd): remove rate limits from agent metadata
Include the full update message in the PubSub notification so that we don't have to refresh metadata from the DB and can avoid rate limiting.
1 parent af939d1 commit b40ed05

File tree

2 files changed

+93
-85
lines changed

2 files changed

+93
-85
lines changed

coderd/workspaceagents.go

Lines changed: 92 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"database/sql"
77
"encoding/json"
88
"errors"
9-
"flag"
109
"fmt"
1110
"io"
1211
"net"
@@ -23,10 +22,9 @@ import (
2322

2423
"github.com/go-chi/chi/v5"
2524
"github.com/google/uuid"
26-
"golang.org/x/exp/slices"
25+
"golang.org/x/exp/maps"
2726
"golang.org/x/mod/semver"
2827
"golang.org/x/sync/errgroup"
29-
"golang.org/x/time/rate"
3028
"golang.org/x/xerrors"
3129
"nhooyr.io/websocket"
3230
"tailscale.com/tailcfg"
@@ -39,7 +37,6 @@ import (
3937
"github.com/coder/coder/v2/coderd/httpmw"
4038
"github.com/coder/coder/v2/coderd/rbac"
4139
"github.com/coder/coder/v2/coderd/util/ptr"
42-
"github.com/coder/coder/v2/coderd/util/slice"
4340
"github.com/coder/coder/v2/codersdk"
4441
"github.com/coder/coder/v2/codersdk/agentsdk"
4542
"github.com/coder/coder/v2/tailnet"
@@ -1524,7 +1521,11 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15241521
key := chi.URLParam(r, "key")
15251522

15261523
const (
1527-
maxValueLen = 32 << 10
1524+
// maxValueLen is set to 2048 to stay under the 8000 byte Postgres
1525+
// NOTIFY limit. Since both value and error can be set, the real
1526+
// payload limit is 2 * 2048 * 4/3 <base64 expansion> = 5461 bytes + a few hundred bytes for JSON
1527+
// syntax, key names, and metadata.
1528+
maxValueLen = 2048
15281529
maxErrorLen = maxValueLen
15291530
)
15301531

@@ -1567,7 +1568,15 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15671568
slog.F("value", ellipse(datum.Value, 16)),
15681569
)
15691570

1570-
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key))
1571+
// gob is used instead of JSON due to the potential large performance
1572+
// overhead of agent metadata.
1573+
datumJSON, err := json.Marshal(datum)
1574+
if err != nil {
1575+
httpapi.InternalServerError(rw, err)
1576+
return
1577+
}
1578+
1579+
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON)
15711580
if err != nil {
15721581
httpapi.InternalServerError(rw, err)
15731582
return
@@ -1593,7 +1602,42 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
15931602
)
15941603
)
15951604

1596-
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
1605+
// We avoid channel-based synchronization here to avoid backpressure problems.
1606+
var (
1607+
metadataMapMu sync.Mutex
1608+
metadataMap = make(map[string]database.WorkspaceAgentMetadatum, 128)
1609+
// pendingChanges must only be mutated when metadataMapMu is held.
1610+
pendingChanges atomic.Bool
1611+
)
1612+
1613+
// Send metadata on updates, we must ensure subscription before sending
1614+
// initial metadata to guarantee that events in-between are not missed.
1615+
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
1616+
var update database.UpdateWorkspaceAgentMetadataParams
1617+
err := json.Unmarshal(byt, &update)
1618+
if err != nil {
1619+
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
1620+
return
1621+
}
1622+
1623+
log.Debug(ctx, "received metadata update", "key", update.Key)
1624+
1625+
metadataMapMu.Lock()
1626+
defer metadataMapMu.Unlock()
1627+
md := metadataMap[update.Key]
1628+
md.Value = update.Value
1629+
md.Error = update.Error
1630+
md.CollectedAt = update.CollectedAt
1631+
metadataMap[update.Key] = md
1632+
pendingChanges.Store(true)
1633+
})
1634+
if err != nil {
1635+
httpapi.InternalServerError(rw, err)
1636+
return
1637+
}
1638+
defer cancelSub()
1639+
1640+
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
15971641
if err != nil {
15981642
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
15991643
Message: "Internal error setting up server-sent events.",
@@ -1603,97 +1647,57 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16031647
}
16041648
// Prevent handler from returning until the sender is closed.
16051649
defer func() {
1606-
<-senderClosed
1650+
<-sseSenderClosed
16071651
}()
16081652

1609-
const refreshInterval = time.Second * 5
1610-
refreshTicker := time.NewTicker(refreshInterval)
1611-
defer refreshTicker.Stop()
1653+
// We send updates at most every second.
1654+
const sendInterval = time.Second * 1
1655+
sendTicker := time.NewTicker(sendInterval)
1656+
defer sendTicker.Stop()
16121657

1613-
var (
1614-
lastDBMetaMu sync.Mutex
1615-
lastDBMeta []database.WorkspaceAgentMetadatum
1616-
)
1617-
1618-
sendMetadata := func(pull bool) {
1619-
log.Debug(ctx, "sending metadata update", "pull", pull)
1620-
lastDBMetaMu.Lock()
1621-
defer lastDBMetaMu.Unlock()
1622-
1623-
var err error
1624-
if pull {
1625-
// We always use the original Request context because it contains
1626-
// the RBAC actor.
1627-
lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
1628-
if err != nil {
1629-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1630-
Type: codersdk.ServerSentEventTypeError,
1631-
Data: codersdk.Response{
1632-
Message: "Internal error getting metadata.",
1633-
Detail: err.Error(),
1634-
},
1635-
})
1636-
return
1637-
}
1638-
slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int {
1639-
return slice.Ascending(a.Key, b.Key)
1640-
})
1641-
1642-
// Avoid sending refresh if the client is about to get a
1643-
// fresh update.
1644-
refreshTicker.Reset(refreshInterval)
1645-
}
1646-
1647-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1648-
Type: codersdk.ServerSentEventTypeData,
1649-
Data: convertWorkspaceAgentMetadata(lastDBMeta),
1650-
})
1651-
}
1652-
1653-
// Note: we previously used a debounce here, but when the rate of metadata updates was too
1654-
// high the debounce would never fire.
1655-
//
1656-
// The rate-limit has its own caveat. If the agent sends a burst of metadata
1657-
// but then goes quiet, we will never pull the new metadata and the frontend
1658-
// will go stale until refresh. This would only happen if the agent was
1659-
// under extreme load. Under normal operations, the interval between metadata
1660-
// updates is constant so there is no burst phenomenon.
1661-
pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2)
1662-
if flag.Lookup("test.v") != nil {
1663-
// We essentially disable the rate-limit in tests for determinism.
1664-
pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100)
1665-
}
1666-
1667-
// Send metadata on updates, we must ensure subscription before sending
1668-
// initial metadata to guarantee that events in-between are not missed.
1669-
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
1670-
allow := pubsubRatelimit.Allow()
1671-
log.Debug(ctx, "received metadata update", "allow", allow)
1672-
if allow {
1673-
sendMetadata(true)
1674-
}
1675-
})
1658+
// We always use the original Request context because it contains
1659+
// the RBAC actor.
1660+
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
16761661
if err != nil {
1662+
// If we can't successfully pull the initial metadata, pubsub
1663+
// updates will be no-op so we may as well terminate the
1664+
// connection early.
16771665
httpapi.InternalServerError(rw, err)
16781666
return
16791667
}
1680-
defer cancelSub()
1668+
for _, datum := range md {
1669+
metadataMap[datum.Key] = datum
1670+
}
16811671

16821672
// Send initial metadata.
1683-
sendMetadata(true)
1673+
1674+
var lastSend time.Time
1675+
sendMetadata := func() {
1676+
lastSend = time.Now()
1677+
metadataMapMu.Lock()
1678+
values := maps.Values(metadataMap)
1679+
pendingChanges.Store(false)
1680+
metadataMapMu.Unlock()
1681+
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
1682+
Type: codersdk.ServerSentEventTypeData,
1683+
Data: convertWorkspaceAgentMetadata(values),
1684+
})
1685+
}
1686+
1687+
sendMetadata()
16841688

16851689
for {
16861690
select {
1687-
case <-senderClosed:
1691+
case <-sendTicker.C:
1692+
// We send an update even if there's no change every 5 seconds
1693+
// to ensure that the frontend always has an accurate "Result.Age".
1694+
if !pendingChanges.Load() && time.Since(lastSend) < time.Second*5 {
1695+
continue
1696+
}
1697+
sendMetadata()
1698+
case <-sseSenderClosed:
16881699
return
1689-
case <-refreshTicker.C:
16901700
}
1691-
1692-
// Avoid spamming the DB with reads we know there are no updates. We want
1693-
// to continue sending updates to the frontend so that "Result.Age"
1694-
// is always accurate. This way, the frontend doesn't need to
1695-
// sync its own clock with the backend.
1696-
sendMetadata(false)
16971701
}
16981702
}
16991703

@@ -1717,6 +1721,10 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code
17171721
},
17181722
})
17191723
}
1724+
// Sorting prevents the metadata from jumping around in the frontend.
1725+
sort.Slice(result, func(i, j int) bool {
1726+
return result[i].Description.Key < result[j].Description.Key
1727+
})
17201728
return result
17211729
}
17221730

coderd/workspaceagents_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
11531153
require.Len(t, update, 3)
11541154
check(wantMetadata1, update[0], true)
11551155

1156-
const maxValueLen = 32 << 10
1156+
const maxValueLen = 2048
11571157
tooLongValueMetadata := wantMetadata1
11581158
tooLongValueMetadata.Value = strings.Repeat("a", maxValueLen*2)
11591159
tooLongValueMetadata.Error = ""

0 commit comments

Comments
 (0)