Skip to content

Commit 78b2f60

Browse files
committed
fix(coderd): remove rate limits from agent metadata
Include the full update message in the PubSub notification so that we don't have to refresh metadata from the DB and can avoid rate limiting.
1 parent af939d1 commit 78b2f60

File tree

1 file changed

+50
-47
lines changed

1 file changed

+50
-47
lines changed

coderd/workspaceagents.go

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"database/sql"
77
"encoding/json"
88
"errors"
9-
"flag"
109
"fmt"
1110
"io"
1211
"net"
@@ -17,7 +16,6 @@ import (
1716
"sort"
1817
"strconv"
1918
"strings"
20-
"sync"
2119
"sync/atomic"
2220
"time"
2321

@@ -26,7 +24,6 @@ import (
2624
"golang.org/x/exp/slices"
2725
"golang.org/x/mod/semver"
2826
"golang.org/x/sync/errgroup"
29-
"golang.org/x/time/rate"
3027
"golang.org/x/xerrors"
3128
"nhooyr.io/websocket"
3229
"tailscale.com/tailcfg"
@@ -1567,7 +1564,13 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15671564
slog.F("value", ellipse(datum.Value, 16)),
15681565
)
15691566

1570-
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key))
1567+
datumJSON, err := json.Marshal(datum)
1568+
if err != nil {
1569+
httpapi.InternalServerError(rw, err)
1570+
return
1571+
}
1572+
1573+
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON)
15711574
if err != nil {
15721575
httpapi.InternalServerError(rw, err)
15731576
return
@@ -1593,7 +1596,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
15931596
)
15941597
)
15951598

1596-
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
1599+
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
15971600
if err != nil {
15981601
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
15991602
Message: "Internal error setting up server-sent events.",
@@ -1603,30 +1606,25 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16031606
}
16041607
// Prevent handler from returning until the sender is closed.
16051608
defer func() {
1606-
<-senderClosed
1609+
<-sseSenderClosed
16071610
}()
16081611

16091612
const refreshInterval = time.Second * 5
16101613
refreshTicker := time.NewTicker(refreshInterval)
16111614
defer refreshTicker.Stop()
16121615

1613-
var (
1614-
lastDBMetaMu sync.Mutex
1615-
lastDBMeta []database.WorkspaceAgentMetadatum
1616-
)
1616+
var lastMetadata []database.WorkspaceAgentMetadatum
16171617

1618-
sendMetadata := func(pull bool) {
1619-
log.Debug(ctx, "sending metadata update", "pull", pull)
1620-
lastDBMetaMu.Lock()
1621-
defer lastDBMetaMu.Unlock()
1618+
sendMetadata := func(pullDB bool) {
1619+
log.Debug(ctx, "sending metadata update", "pull_db", pullDB)
16221620

1623-
var err error
1624-
if pull {
1621+
if pullDB {
1622+
var err error
16251623
// We always use the original Request context because it contains
16261624
// the RBAC actor.
1627-
lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
1625+
lastMetadata, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
16281626
if err != nil {
1629-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1627+
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
16301628
Type: codersdk.ServerSentEventTypeError,
16311629
Data: codersdk.Response{
16321630
Message: "Internal error getting metadata.",
@@ -1635,7 +1633,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16351633
})
16361634
return
16371635
}
1638-
slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int {
1636+
slices.SortFunc(lastMetadata, func(a, b database.WorkspaceAgentMetadatum) int {
16391637
return slice.Ascending(a.Key, b.Key)
16401638
})
16411639

@@ -1644,34 +1642,26 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16441642
refreshTicker.Reset(refreshInterval)
16451643
}
16461644

1647-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1645+
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
16481646
Type: codersdk.ServerSentEventTypeData,
1649-
Data: convertWorkspaceAgentMetadata(lastDBMeta),
1647+
Data: convertWorkspaceAgentMetadata(lastMetadata),
16501648
})
16511649
}
16521650

1653-
// Note: we previously used a debounce here, but when the rate of metadata updates was too
1654-
// high the debounce would never fire.
1655-
//
1656-
// The rate-limit has its own caveat. If the agent sends a burst of metadata
1657-
// but then goes quiet, we will never pull the new metadata and the frontend
1658-
// will go stale until refresh. This would only happen if the agent was
1659-
// under extreme load. Under normal operations, the interval between metadata
1660-
// updates is constant so there is no burst phenomenon.
1661-
pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2)
1662-
if flag.Lookup("test.v") != nil {
1663-
// We essentially disable the rate-limit in tests for determinism.
1664-
pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100)
1665-
}
1651+
// Note: we tried using a ratelimit and debounce here before but it was
1652+
// pretty buggy.
1653+
pubsubUpdates := make(chan database.UpdateWorkspaceAgentMetadataParams, 8)
16661654

16671655
// Send metadata on updates, we must ensure subscription before sending
16681656
// initial metadata to guarantee that events in-between are not missed.
1669-
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
1670-
allow := pubsubRatelimit.Allow()
1671-
log.Debug(ctx, "received metadata update", "allow", allow)
1672-
if allow {
1673-
sendMetadata(true)
1657+
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
1658+
var update database.UpdateWorkspaceAgentMetadataParams
1659+
err = json.Unmarshal(byt, &update)
1660+
if err != nil {
1661+
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
1662+
return
16741663
}
1664+
pubsubUpdates <- update
16751665
})
16761666
if err != nil {
16771667
httpapi.InternalServerError(rw, err)
@@ -1684,16 +1674,29 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16841674

16851675
for {
16861676
select {
1687-
case <-senderClosed:
1688-
return
1677+
case update := <-pubsubUpdates:
1678+
// There can only be 128 metadatums so this will always be fast.
1679+
for i, datum := range lastMetadata {
1680+
if datum.Key != update.Key {
1681+
continue
1682+
}
1683+
1684+
lastMetadata[i] = database.WorkspaceAgentMetadatum{
1685+
Value: update.Value,
1686+
Error: update.Error,
1687+
CollectedAt: update.CollectedAt,
1688+
}
1689+
}
1690+
sendMetadata(false)
16891691
case <-refreshTicker.C:
1692+
// Avoid spamming the DB with reads we know there are no updates. We want
1693+
// to continue sending updates to the frontend so that "Result.Age"
1694+
// is always accurate. This way, the frontend doesn't need to
1695+
// sync its own clock with the backend.
1696+
sendMetadata(false)
1697+
case <-sseSenderClosed:
1698+
return
16901699
}
1691-
1692-
// Avoid spamming the DB with reads we know there are no updates. We want
1693-
// to continue sending updates to the frontend so that "Result.Age"
1694-
// is always accurate. This way, the frontend doesn't need to
1695-
// sync its own clock with the backend.
1696-
sendMetadata(false)
16971700
}
16981701
}
16991702

0 commit comments

Comments
 (0)