Skip to content

Commit 9ea6948

Browse files
committed
fix(coderd): remove rate limits from agent metadata
Include the full update message in the PubSub notification so that we don't have to refresh metadata from the DB and can avoid rate limiting.
1 parent af939d1 commit 9ea6948

File tree

2 files changed

+61
-55
lines changed

2 files changed

+61
-55
lines changed

coderd/workspaceagents.go

Lines changed: 60 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"database/sql"
77
"encoding/json"
88
"errors"
9-
"flag"
109
"fmt"
1110
"io"
1211
"net"
@@ -17,7 +16,6 @@ import (
1716
"sort"
1817
"strconv"
1918
"strings"
20-
"sync"
2119
"sync/atomic"
2220
"time"
2321

@@ -26,7 +24,6 @@ import (
2624
"golang.org/x/exp/slices"
2725
"golang.org/x/mod/semver"
2826
"golang.org/x/sync/errgroup"
29-
"golang.org/x/time/rate"
3027
"golang.org/x/xerrors"
3128
"nhooyr.io/websocket"
3229
"tailscale.com/tailcfg"
@@ -1524,7 +1521,11 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15241521
key := chi.URLParam(r, "key")
15251522

15261523
const (
1527-
maxValueLen = 32 << 10
1524+
// maxValueLen is set to 2048 to stay under the 8000 byte Postgres
1525+
// NOTIFY limit. Since both value and error can be set, the real
1526+
// payload limit is 2 * 2048 * 4/3 <base64 expansion> = 5461 bytes + a few hundred bytes for JSON
1527+
// syntax, key names, and metadata.
1528+
maxValueLen = 2048
15281529
maxErrorLen = maxValueLen
15291530
)
15301531

@@ -1567,7 +1568,15 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15671568
slog.F("value", ellipse(datum.Value, 16)),
15681569
)
15691570

1570-
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key))
1571+
// gob is used instead of JSON due to the potential large performance
1572+
// overhead of agent metadata.
1573+
datumJSON, err := json.Marshal(datum)
1574+
if err != nil {
1575+
httpapi.InternalServerError(rw, err)
1576+
return
1577+
}
1578+
1579+
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON)
15711580
if err != nil {
15721581
httpapi.InternalServerError(rw, err)
15731582
return
@@ -1593,7 +1602,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
15931602
)
15941603
)
15951604

1596-
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
1605+
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
15971606
if err != nil {
15981607
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
15991608
Message: "Internal error setting up server-sent events.",
@@ -1603,39 +1612,31 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16031612
}
16041613
// Prevent handler from returning until the sender is closed.
16051614
defer func() {
1606-
<-senderClosed
1615+
<-sseSenderClosed
16071616
}()
16081617

16091618
const refreshInterval = time.Second * 5
16101619
refreshTicker := time.NewTicker(refreshInterval)
16111620
defer refreshTicker.Stop()
16121621

1613-
var (
1614-
lastDBMetaMu sync.Mutex
1615-
lastDBMeta []database.WorkspaceAgentMetadatum
1616-
)
1622+
var lastMetadata []database.WorkspaceAgentMetadatum
16171623

1618-
sendMetadata := func(pull bool) {
1619-
log.Debug(ctx, "sending metadata update", "pull", pull)
1620-
lastDBMetaMu.Lock()
1621-
defer lastDBMetaMu.Unlock()
1624+
sendMetadata := func(pullDB bool) {
1625+
log.Debug(ctx, "sending metadata update", "pull_db", pullDB)
16221626

1623-
var err error
1624-
if pull {
1627+
if pullDB {
1628+
var err error
16251629
// We always use the original Request context because it contains
16261630
// the RBAC actor.
1627-
lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
1631+
lastMetadata, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
16281632
if err != nil {
1629-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1630-
Type: codersdk.ServerSentEventTypeError,
1631-
Data: codersdk.Response{
1632-
Message: "Internal error getting metadata.",
1633-
Detail: err.Error(),
1634-
},
1635-
})
1633+
// If we can't successfully pull the initial metadata, pubsub
1634+
// updates will be no-op so we may as well terminate the
1635+
// connection early.
1636+
httpapi.InternalServerError(rw, err)
16361637
return
16371638
}
1638-
slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int {
1639+
slices.SortFunc(lastMetadata, func(a, b database.WorkspaceAgentMetadatum) int {
16391640
return slice.Ascending(a.Key, b.Key)
16401641
})
16411642

@@ -1644,34 +1645,26 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16441645
refreshTicker.Reset(refreshInterval)
16451646
}
16461647

1647-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1648+
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
16481649
Type: codersdk.ServerSentEventTypeData,
1649-
Data: convertWorkspaceAgentMetadata(lastDBMeta),
1650+
Data: convertWorkspaceAgentMetadata(lastMetadata),
16501651
})
16511652
}
16521653

1653-
// Note: we previously used a debounce here, but when the rate of metadata updates was too
1654-
// high the debounce would never fire.
1655-
//
1656-
// The rate-limit has its own caveat. If the agent sends a burst of metadata
1657-
// but then goes quiet, we will never pull the new metadata and the frontend
1658-
// will go stale until refresh. This would only happen if the agent was
1659-
// under extreme load. Under normal operations, the interval between metadata
1660-
// updates is constant so there is no burst phenomenon.
1661-
pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2)
1662-
if flag.Lookup("test.v") != nil {
1663-
// We essentially disable the rate-limit in tests for determinism.
1664-
pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100)
1665-
}
1654+
// Note: we tried using a ratelimit and debounce here before but it was
1655+
// pretty buggy.
1656+
pubsubUpdates := make(chan database.UpdateWorkspaceAgentMetadataParams, 8)
16661657

16671658
// Send metadata on updates, we must ensure subscription before sending
16681659
// initial metadata to guarantee that events in-between are not missed.
1669-
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
1670-
allow := pubsubRatelimit.Allow()
1671-
log.Debug(ctx, "received metadata update", "allow", allow)
1672-
if allow {
1673-
sendMetadata(true)
1660+
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
1661+
var update database.UpdateWorkspaceAgentMetadataParams
1662+
err = json.Unmarshal(byt, &update)
1663+
if err != nil {
1664+
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
1665+
return
16741666
}
1667+
pubsubUpdates <- update
16751668
})
16761669
if err != nil {
16771670
httpapi.InternalServerError(rw, err)
@@ -1684,16 +1677,29 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16841677

16851678
for {
16861679
select {
1687-
case <-senderClosed:
1688-
return
1680+
case update := <-pubsubUpdates:
1681+
// There can only be 128 metadatums so this will always be fast.
1682+
for i, datum := range lastMetadata {
1683+
if datum.Key != update.Key {
1684+
continue
1685+
}
1686+
1687+
lastMetadata[i] = database.WorkspaceAgentMetadatum{
1688+
Value: update.Value,
1689+
Error: update.Error,
1690+
CollectedAt: update.CollectedAt,
1691+
}
1692+
}
1693+
sendMetadata(false)
16891694
case <-refreshTicker.C:
1695+
// Avoid spamming the DB with reads we know there are no updates. We want
1696+
// to continue sending updates to the frontend so that "Result.Age"
1697+
// is always accurate. This way, the frontend doesn't need to
1698+
// sync its own clock with the backend.
1699+
sendMetadata(false)
1700+
case <-sseSenderClosed:
1701+
return
16901702
}
1691-
1692-
// Avoid spamming the DB with reads we know there are no updates. We want
1693-
// to continue sending updates to the frontend so that "Result.Age"
1694-
// is always accurate. This way, the frontend doesn't need to
1695-
// sync its own clock with the backend.
1696-
sendMetadata(false)
16971703
}
16981704
}
16991705

coderd/workspaceagents_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
11531153
require.Len(t, update, 3)
11541154
check(wantMetadata1, update[0], true)
11551155

1156-
const maxValueLen = 32 << 10
1156+
const maxValueLen = 2048
11571157
tooLongValueMetadata := wantMetadata1
11581158
tooLongValueMetadata.Value = strings.Repeat("a", maxValueLen*2)
11591159
tooLongValueMetadata.Error = ""

0 commit comments

Comments
 (0)