Skip to content

Commit e9b8ba1

Browse files
committed
fix(coderd): remove rate limits from agent metadata
Include the full update message in the PubSub notification so that we don't have to refresh metadata from the DB and can avoid rate limiting.
1 parent af939d1 commit e9b8ba1

File tree

2 files changed

+71
-49
lines changed

2 files changed

+71
-49
lines changed

coderd/workspaceagents.go

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package coderd
22

33
import (
44
"bufio"
5+
"bytes"
56
"context"
67
"database/sql"
8+
"encoding/gob"
79
"encoding/json"
810
"errors"
9-
"flag"
1011
"fmt"
1112
"io"
1213
"net"
@@ -17,7 +18,6 @@ import (
1718
"sort"
1819
"strconv"
1920
"strings"
20-
"sync"
2121
"sync/atomic"
2222
"time"
2323

@@ -26,7 +26,6 @@ import (
2626
"golang.org/x/exp/slices"
2727
"golang.org/x/mod/semver"
2828
"golang.org/x/sync/errgroup"
29-
"golang.org/x/time/rate"
3029
"golang.org/x/xerrors"
3130
"nhooyr.io/websocket"
3231
"tailscale.com/tailcfg"
@@ -1492,6 +1491,19 @@ func ellipse(v string, n int) string {
14921491
return v
14931492
}
14941493

1494+
func marshalGob(v any) ([]byte, error) {
1495+
var buf bytes.Buffer
1496+
err := gob.NewEncoder(&buf).Encode(v)
1497+
if err != nil {
1498+
return nil, err
1499+
}
1500+
return buf.Bytes(), nil
1501+
}
1502+
1503+
func unmarshalGob(data []byte, v any) error {
1504+
return gob.NewDecoder(bytes.NewReader(data)).Decode(v)
1505+
}
1506+
14951507
// @Summary Submit workspace agent metadata
14961508
// @ID submit-workspace-agent-metadata
14971509
// @Security CoderSessionToken
@@ -1524,7 +1536,9 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15241536
key := chi.URLParam(r, "key")
15251537

15261538
const (
1527-
maxValueLen = 32 << 10
1539+
// maxValueLen is set to 4096 to stay under the 8000 byte Postgres
1540+
// NOTIFY limit.
1541+
maxValueLen = 4096
15281542
maxErrorLen = maxValueLen
15291543
)
15301544

@@ -1567,7 +1581,15 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
15671581
slog.F("value", ellipse(datum.Value, 16)),
15681582
)
15691583

1570-
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key))
1584+
// gob is used instead of JSON due to the potential large performance
1585+
// overhead of agent metadata.
1586+
datumGob, err := marshalGob(datum)
1587+
if err != nil {
1588+
httpapi.InternalServerError(rw, err)
1589+
return
1590+
}
1591+
1592+
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumGob)
15711593
if err != nil {
15721594
httpapi.InternalServerError(rw, err)
15731595
return
@@ -1593,7 +1615,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
15931615
)
15941616
)
15951617

1596-
sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r)
1618+
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
15971619
if err != nil {
15981620
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
15991621
Message: "Internal error setting up server-sent events.",
@@ -1603,30 +1625,25 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16031625
}
16041626
// Prevent handler from returning until the sender is closed.
16051627
defer func() {
1606-
<-senderClosed
1628+
<-sseSenderClosed
16071629
}()
16081630

16091631
const refreshInterval = time.Second * 5
16101632
refreshTicker := time.NewTicker(refreshInterval)
16111633
defer refreshTicker.Stop()
16121634

1613-
var (
1614-
lastDBMetaMu sync.Mutex
1615-
lastDBMeta []database.WorkspaceAgentMetadatum
1616-
)
1635+
var lastMetadata []database.WorkspaceAgentMetadatum
16171636

1618-
sendMetadata := func(pull bool) {
1619-
log.Debug(ctx, "sending metadata update", "pull", pull)
1620-
lastDBMetaMu.Lock()
1621-
defer lastDBMetaMu.Unlock()
1637+
sendMetadata := func(pullDB bool) {
1638+
log.Debug(ctx, "sending metadata update", "pull_db", pullDB)
16221639

1623-
var err error
1624-
if pull {
1640+
if pullDB {
1641+
var err error
16251642
// We always use the original Request context because it contains
16261643
// the RBAC actor.
1627-
lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
1644+
lastMetadata, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
16281645
if err != nil {
1629-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1646+
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
16301647
Type: codersdk.ServerSentEventTypeError,
16311648
Data: codersdk.Response{
16321649
Message: "Internal error getting metadata.",
@@ -1635,7 +1652,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16351652
})
16361653
return
16371654
}
1638-
slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int {
1655+
slices.SortFunc(lastMetadata, func(a, b database.WorkspaceAgentMetadatum) int {
16391656
return slice.Ascending(a.Key, b.Key)
16401657
})
16411658

@@ -1644,34 +1661,26 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16441661
refreshTicker.Reset(refreshInterval)
16451662
}
16461663

1647-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
1664+
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
16481665
Type: codersdk.ServerSentEventTypeData,
1649-
Data: convertWorkspaceAgentMetadata(lastDBMeta),
1666+
Data: convertWorkspaceAgentMetadata(lastMetadata),
16501667
})
16511668
}
16521669

1653-
// Note: we previously used a debounce here, but when the rate of metadata updates was too
1654-
// high the debounce would never fire.
1655-
//
1656-
// The rate-limit has its own caveat. If the agent sends a burst of metadata
1657-
// but then goes quiet, we will never pull the new metadata and the frontend
1658-
// will go stale until refresh. This would only happen if the agent was
1659-
// under extreme load. Under normal operations, the interval between metadata
1660-
// updates is constant so there is no burst phenomenon.
1661-
pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2)
1662-
if flag.Lookup("test.v") != nil {
1663-
// We essentially disable the rate-limit in tests for determinism.
1664-
pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100)
1665-
}
1670+
// Note: we tried using a ratelimit and debounce here before but it was
1671+
// pretty buggy.
1672+
pubsubUpdates := make(chan database.UpdateWorkspaceAgentMetadataParams, 8)
16661673

16671674
// Send metadata on updates, we must ensure subscription before sending
16681675
// initial metadata to guarantee that events in-between are not missed.
1669-
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
1670-
allow := pubsubRatelimit.Allow()
1671-
log.Debug(ctx, "received metadata update", "allow", allow)
1672-
if allow {
1673-
sendMetadata(true)
1676+
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
1677+
var update database.UpdateWorkspaceAgentMetadataParams
1678+
err = unmarshalGob(byt, &update)
1679+
if err != nil {
1680+
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
1681+
return
16741682
}
1683+
pubsubUpdates <- update
16751684
})
16761685
if err != nil {
16771686
httpapi.InternalServerError(rw, err)
@@ -1684,16 +1693,29 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
16841693

16851694
for {
16861695
select {
1687-
case <-senderClosed:
1688-
return
1696+
case update := <-pubsubUpdates:
1697+
// There can only be 128 metadatums so this will always be fast.
1698+
for i, datum := range lastMetadata {
1699+
if datum.Key != update.Key {
1700+
continue
1701+
}
1702+
1703+
lastMetadata[i] = database.WorkspaceAgentMetadatum{
1704+
Value: update.Value,
1705+
Error: update.Error,
1706+
CollectedAt: update.CollectedAt,
1707+
}
1708+
}
1709+
sendMetadata(false)
16891710
case <-refreshTicker.C:
1711+
// Avoid spamming the DB with reads we know there are no updates. We want
1712+
// to continue sending updates to the frontend so that "Result.Age"
1713+
// is always accurate. This way, the frontend doesn't need to
1714+
// sync its own clock with the backend.
1715+
sendMetadata(false)
1716+
case <-sseSenderClosed:
1717+
return
16901718
}
1691-
1692-
// Avoid spamming the DB with reads we know there are no updates. We want
1693-
// to continue sending updates to the frontend so that "Result.Age"
1694-
// is always accurate. This way, the frontend doesn't need to
1695-
// sync its own clock with the backend.
1696-
sendMetadata(false)
16971719
}
16981720
}
16991721

coderd/workspaceagents_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
11531153
require.Len(t, update, 3)
11541154
check(wantMetadata1, update[0], true)
11551155

1156-
const maxValueLen = 32 << 10
1156+
const maxValueLen = 4096
11571157
tooLongValueMetadata := wantMetadata1
11581158
tooLongValueMetadata.Value = strings.Repeat("a", maxValueLen*2)
11591159
tooLongValueMetadata.Error = ""

0 commit comments

Comments
 (0)