Skip to content

Commit 68b2633

Browse files
committed
fix(coderd): remove rate limits from agent metadata
Include the full update message in the PubSub notification so that we don't have to refresh metadata from the DB and can avoid rate limiting.
1 parent af939d1 commit 68b2633

File tree

2 files changed

+93
-83
lines changed

2 files changed

+93
-83
lines changed

coderd/workspaceagents.go

Lines changed: 92 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"database/sql"
77
"encoding/json"
88
"errors"
9-
"flag"
109
"fmt"
1110
"io"
1211
"net"
@@ -23,10 +22,9 @@ import (
2322

2423
"github.com/go-chi/chi/v5"
2524
"github.com/google/uuid"
26-
"golang.org/x/exp/slices"
25+
"golang.org/x/exp/maps"
2726
"golang.org/x/mod/semver"
2827
"golang.org/x/sync/errgroup"
29-
"golang.org/x/time/rate"
3028
"golang.org/x/xerrors"
3129
"nhooyr.io/websocket"
3230
"tailscale.com/tailcfg"
@@ -39,7 +37,6 @@ import (
3937
"github.com/coder/coder/v2/coderd/httpmw"
4038
"github.com/coder/coder/v2/coderd/rbac"
4139
"github.com/coder/coder/v2/coderd/util/ptr"
42-
"github.com/coder/coder/v2/coderd/util/slice"
4340
"github.com/coder/coder/v2/codersdk"
4441
"github.com/coder/coder/v2/codersdk/agentsdk"
4542
"github.com/coder/coder/v2/tailnet"
@@ -1524,7 +1521,11 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15241521
key := chi.URLParam(r, "key")
15251522

15261523
const (
1527-
maxValueLen = 32 << 10
1524+
// maxValueLen is set to 2048 to stay under the 8000 byte Postgres
1525+
// NOTIFY limit. Since both value and error can be set, the real
1526+
// payload limit is 2 * 2048 * 4/3 <base64 expansion> = 5461 bytes + a few hundred bytes for JSON
1527+
// syntax, key names, and metadata.
1528+
maxValueLen = 2048
15281529
maxErrorLen = maxValueLen
15291530
)
15301531

@@ -1567,7 +1568,13 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15671568
slog.F("value", ellipse(datum.Value, 16)),
15681569
)
15691570

1570-
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key))
1571+
datumJSON, err := json.Marshal(datum)
1572+
if err != nil {
1573+
httpapi.InternalServerError(rw, err)
1574+
return
1575+
}
1576+
1577+
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON)
15711578
if err != nil {
15721579
httpapi.InternalServerError(rw, err)
15731580
return
@@ -1593,7 +1600,42 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
15931600
)
15941601
)
15951602

1596-
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
1603+
// We avoid channel-based synchronization here to avoid backpressure problems.
1604+
var (
1605+
metadataMapMu sync.Mutex
1606+
metadataMap = make(map[string]database.WorkspaceAgentMetadatum)
1607+
// pendingChanges must only be mutated when metadataMapMu is held.
1608+
pendingChanges atomic.Bool
1609+
)
1610+
1611+
// Send metadata on updates, we must ensure subscription before sending
1612+
// initial metadata to guarantee that events in-between are not missed.
1613+
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
1614+
var update database.UpdateWorkspaceAgentMetadataParams
1615+
err := json.Unmarshal(byt, &update)
1616+
if err != nil {
1617+
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
1618+
return
1619+
}
1620+
1621+
log.Debug(ctx, "received metadata update", "key", update.Key)
1622+
1623+
metadataMapMu.Lock()
1624+
defer metadataMapMu.Unlock()
1625+
md := metadataMap[update.Key]
1626+
md.Value = update.Value
1627+
md.Error = update.Error
1628+
md.CollectedAt = update.CollectedAt
1629+
metadataMap[update.Key] = md
1630+
pendingChanges.Store(true)
1631+
})
1632+
if err != nil {
1633+
httpapi.InternalServerError(rw, err)
1634+
return
1635+
}
1636+
defer cancelSub()
1637+
1638+
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
15971639
if err != nil {
15981640
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
15991641
Message: "Internal error setting up server-sent events.",
@@ -1603,97 +1645,61 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16031645
}
16041646
// Prevent handler from returning until the sender is closed.
16051647
defer func() {
1606-
<-senderClosed
1648+
<-sseSenderClosed
16071649
}()
16081650

1609-
const refreshInterval = time.Second * 5
1610-
refreshTicker := time.NewTicker(refreshInterval)
1611-
defer refreshTicker.Stop()
1651+
// We send updates exactly every second.
1652+
const sendInterval = time.Second * 1
1653+
sendTicker := time.NewTicker(sendInterval)
1654+
defer sendTicker.Stop()
16121655

1613-
var (
1614-
lastDBMetaMu sync.Mutex
1615-
lastDBMeta []database.WorkspaceAgentMetadatum
1616-
)
1656+
// We always use the original Request context because it contains
1657+
// the RBAC actor.
1658+
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
1659+
if err != nil {
1660+
// If we can't successfully pull the initial metadata, pubsub
1661+
// updates will be no-op so we may as well terminate the
1662+
// connection early.
1663+
httpapi.InternalServerError(rw, err)
1664+
return
1665+
}
16171666

1618-
sendMetadata := func(pull bool) {
1619-
log.Debug(ctx, "sending metadata update", "pull", pull)
1620-
lastDBMetaMu.Lock()
1621-
defer lastDBMetaMu.Unlock()
1667+
metadataMapMu.Lock()
1668+
for _, datum := range md {
1669+
metadataMap[datum.Key] = datum
1670+
}
1671+
metadataMapMu.Unlock()
16221672

1623-
var err error
1624-
if pull {
1625-
// We always use the original Request context because it contains
1626-
// the RBAC actor.
1627-
lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
1628-
if err != nil {
1629-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1630-
Type: codersdk.ServerSentEventTypeError,
1631-
Data: codersdk.Response{
1632-
Message: "Internal error getting metadata.",
1633-
Detail: err.Error(),
1634-
},
1635-
})
1636-
return
1637-
}
1638-
slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int {
1639-
return slice.Ascending(a.Key, b.Key)
1640-
})
1673+
// Send initial metadata.
16411674

1642-
// Avoid sending refresh if the client is about to get a
1643-
// fresh update.
1644-
refreshTicker.Reset(refreshInterval)
1645-
}
1675+
var lastSend time.Time
1676+
sendMetadata := func() {
1677+
metadataMapMu.Lock()
1678+
values := maps.Values(metadataMap)
1679+
pendingChanges.Store(false)
1680+
metadataMapMu.Unlock()
16461681

1647-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1682+
lastSend = time.Now()
1683+
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
16481684
Type: codersdk.ServerSentEventTypeData,
1649-
Data: convertWorkspaceAgentMetadata(lastDBMeta),
1685+
Data: convertWorkspaceAgentMetadata(values),
16501686
})
16511687
}
16521688

1653-
// Note: we previously used a debounce here, but when the rate of metadata updates was too
1654-
// high the debounce would never fire.
1655-
//
1656-
// The rate-limit has its own caveat. If the agent sends a burst of metadata
1657-
// but then goes quiet, we will never pull the new metadata and the frontend
1658-
// will go stale until refresh. This would only happen if the agent was
1659-
// under extreme load. Under normal operations, the interval between metadata
1660-
// updates is constant so there is no burst phenomenon.
1661-
pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2)
1662-
if flag.Lookup("test.v") != nil {
1663-
// We essentially disable the rate-limit in tests for determinism.
1664-
pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100)
1665-
}
1666-
1667-
// Send metadata on updates, we must ensure subscription before sending
1668-
// initial metadata to guarantee that events in-between are not missed.
1669-
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
1670-
allow := pubsubRatelimit.Allow()
1671-
log.Debug(ctx, "received metadata update", "allow", allow)
1672-
if allow {
1673-
sendMetadata(true)
1674-
}
1675-
})
1676-
if err != nil {
1677-
httpapi.InternalServerError(rw, err)
1678-
return
1679-
}
1680-
defer cancelSub()
1681-
1682-
// Send initial metadata.
1683-
sendMetadata(true)
1689+
sendMetadata()
16841690

16851691
for {
16861692
select {
1687-
case <-senderClosed:
1693+
case <-sendTicker.C:
1694+
// We send an update even if there's no change every 5 seconds
1695+
// to ensure that the frontend always has an accurate "Result.Age".
1696+
if !pendingChanges.Load() && time.Since(lastSend) < time.Second*5 {
1697+
continue
1698+
}
1699+
sendMetadata()
1700+
case <-sseSenderClosed:
16881701
return
1689-
case <-refreshTicker.C:
16901702
}
1691-
1692-
// Avoid spamming the DB with reads we know there are no updates. We want
1693-
// to continue sending updates to the frontend so that "Result.Age"
1694-
// is always accurate. This way, the frontend doesn't need to
1695-
// sync its own clock with the backend.
1696-
sendMetadata(false)
16971703
}
16981704
}
16991705

@@ -1717,6 +1723,10 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code
17171723
},
17181724
})
17191725
}
1726+
// Sorting prevents the metadata from jumping around in the frontend.
1727+
sort.Slice(result, func(i, j int) bool {
1728+
return result[i].Description.Key < result[j].Description.Key
1729+
})
17201730
return result
17211731
}
17221732

coderd/workspaceagents_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
11531153
require.Len(t, update, 3)
11541154
check(wantMetadata1, update[0], true)
11551155

1156-
const maxValueLen = 32 << 10
1156+
const maxValueLen = 2048
11571157
tooLongValueMetadata := wantMetadata1
11581158
tooLongValueMetadata.Value = strings.Repeat("a", maxValueLen*2)
11591159
tooLongValueMetadata.Error = ""

0 commit comments

Comments
 (0)