6
6
"database/sql"
7
7
"encoding/json"
8
8
"errors"
9
- "flag"
10
9
"fmt"
11
10
"io"
12
11
"net"
@@ -23,10 +22,9 @@ import (
23
22
24
23
"github.com/go-chi/chi/v5"
25
24
"github.com/google/uuid"
26
- "golang.org/x/exp/slices "
25
+ "golang.org/x/exp/maps "
27
26
"golang.org/x/mod/semver"
28
27
"golang.org/x/sync/errgroup"
29
- "golang.org/x/time/rate"
30
28
"golang.org/x/xerrors"
31
29
"nhooyr.io/websocket"
32
30
"tailscale.com/tailcfg"
@@ -39,7 +37,6 @@ import (
39
37
"github.com/coder/coder/v2/coderd/httpmw"
40
38
"github.com/coder/coder/v2/coderd/rbac"
41
39
"github.com/coder/coder/v2/coderd/util/ptr"
42
- "github.com/coder/coder/v2/coderd/util/slice"
43
40
"github.com/coder/coder/v2/codersdk"
44
41
"github.com/coder/coder/v2/codersdk/agentsdk"
45
42
"github.com/coder/coder/v2/tailnet"
@@ -1528,7 +1525,11 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
1528
1525
key := chi .URLParam (r , "key" )
1529
1526
1530
1527
const (
1531
- maxValueLen = 32 << 10
1528
+ // maxValueLen is set to 2048 to stay under the 8000 byte Postgres
1529
+ // NOTIFY limit. Since both value and error can be set, the real
1530
+ // payload limit is 2 * 2048 * 4/3 <base64 expansion> = 5461 bytes + a few hundred bytes for JSON
1531
+ // syntax, key names, and metadata.
1532
+ maxValueLen = 2048
1532
1533
maxErrorLen = maxValueLen
1533
1534
)
1534
1535
@@ -1571,7 +1572,13 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
1571
1572
slog .F ("value" , ellipse (datum .Value , 16 )),
1572
1573
)
1573
1574
1574
- err = api .Pubsub .Publish (watchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), []byte (datum .Key ))
1575
+ datumJSON , err := json .Marshal (datum )
1576
+ if err != nil {
1577
+ httpapi .InternalServerError (rw , err )
1578
+ return
1579
+ }
1580
+
1581
+ err = api .Pubsub .Publish (watchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), datumJSON )
1575
1582
if err != nil {
1576
1583
httpapi .InternalServerError (rw , err )
1577
1584
return
@@ -1597,7 +1604,42 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1597
1604
)
1598
1605
)
1599
1606
1600
- sendEvent , senderClosed , err := httpapi .ServerSentEventSender (rw , r )
1607
+ // We avoid channel-based synchronization here to avoid backpressure problems.
1608
+ var (
1609
+ metadataMapMu sync.Mutex
1610
+ metadataMap = make (map [string ]database.WorkspaceAgentMetadatum )
1611
+ // pendingChanges must only be mutated when metadataMapMu is held.
1612
+ pendingChanges atomic.Bool
1613
+ )
1614
+
1615
+ // Send metadata on updates, we must ensure subscription before sending
1616
+ // initial metadata to guarantee that events in-between are not missed.
1617
+ cancelSub , err := api .Pubsub .Subscribe (watchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), func (_ context.Context , byt []byte ) {
1618
+ var update database.UpdateWorkspaceAgentMetadataParams
1619
+ err := json .Unmarshal (byt , & update )
1620
+ if err != nil {
1621
+ api .Logger .Error (ctx , "failed to unmarshal pubsub message" , slog .Error (err ))
1622
+ return
1623
+ }
1624
+
1625
+ log .Debug (ctx , "received metadata update" , "key" , update .Key )
1626
+
1627
+ metadataMapMu .Lock ()
1628
+ defer metadataMapMu .Unlock ()
1629
+ md := metadataMap [update .Key ]
1630
+ md .Value = update .Value
1631
+ md .Error = update .Error
1632
+ md .CollectedAt = update .CollectedAt
1633
+ metadataMap [update .Key ] = md
1634
+ pendingChanges .Store (true )
1635
+ })
1636
+ if err != nil {
1637
+ httpapi .InternalServerError (rw , err )
1638
+ return
1639
+ }
1640
+ defer cancelSub ()
1641
+
1642
+ sseSendEvent , sseSenderClosed , err := httpapi .ServerSentEventSender (rw , r )
1601
1643
if err != nil {
1602
1644
httpapi .Write (ctx , rw , http .StatusInternalServerError , codersdk.Response {
1603
1645
Message : "Internal error setting up server-sent events." ,
@@ -1607,97 +1649,61 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1607
1649
}
1608
1650
// Prevent handler from returning until the sender is closed.
1609
1651
defer func () {
1610
- <- senderClosed
1652
+ <- sseSenderClosed
1611
1653
}()
1612
1654
1613
- const refreshInterval = time .Second * 5
1614
- refreshTicker := time .NewTicker (refreshInterval )
1615
- defer refreshTicker .Stop ()
1655
+ // We send updates exactly every second.
1656
+ const sendInterval = time .Second * 1
1657
+ sendTicker := time .NewTicker (sendInterval )
1658
+ defer sendTicker .Stop ()
1616
1659
1617
- var (
1618
- lastDBMetaMu sync.Mutex
1619
- lastDBMeta []database.WorkspaceAgentMetadatum
1620
- )
1660
+ // We always use the original Request context because it contains
1661
+ // the RBAC actor.
1662
+ md , err := api .Database .GetWorkspaceAgentMetadata (ctx , workspaceAgent .ID )
1663
+ if err != nil {
1664
+ // If we can't successfully pull the initial metadata, pubsub
1665
+ // updates will be no-op so we may as well terminate the
1666
+ // connection early.
1667
+ httpapi .InternalServerError (rw , err )
1668
+ return
1669
+ }
1621
1670
1622
- sendMetadata := func (pull bool ) {
1623
- log .Debug (ctx , "sending metadata update" , "pull" , pull )
1624
- lastDBMetaMu .Lock ()
1625
- defer lastDBMetaMu .Unlock ()
1671
+ metadataMapMu .Lock ()
1672
+ for _ , datum := range md {
1673
+ metadataMap [datum .Key ] = datum
1674
+ }
1675
+ metadataMapMu .Unlock ()
1626
1676
1627
- var err error
1628
- if pull {
1629
- // We always use the original Request context because it contains
1630
- // the RBAC actor.
1631
- lastDBMeta , err = api .Database .GetWorkspaceAgentMetadata (ctx , workspaceAgent .ID )
1632
- if err != nil {
1633
- _ = sendEvent (ctx , codersdk.ServerSentEvent {
1634
- Type : codersdk .ServerSentEventTypeError ,
1635
- Data : codersdk.Response {
1636
- Message : "Internal error getting metadata." ,
1637
- Detail : err .Error (),
1638
- },
1639
- })
1640
- return
1641
- }
1642
- slices .SortFunc (lastDBMeta , func (a , b database.WorkspaceAgentMetadatum ) int {
1643
- return slice .Ascending (a .Key , b .Key )
1644
- })
1677
+ // Send initial metadata.
1645
1678
1646
- // Avoid sending refresh if the client is about to get a
1647
- // fresh update.
1648
- refreshTicker .Reset (refreshInterval )
1649
- }
1679
+ var lastSend time.Time
1680
+ sendMetadata := func () {
1681
+ metadataMapMu .Lock ()
1682
+ values := maps .Values (metadataMap )
1683
+ pendingChanges .Store (false )
1684
+ metadataMapMu .Unlock ()
1650
1685
1651
- _ = sendEvent (ctx , codersdk.ServerSentEvent {
1686
+ lastSend = time .Now ()
1687
+ _ = sseSendEvent (ctx , codersdk.ServerSentEvent {
1652
1688
Type : codersdk .ServerSentEventTypeData ,
1653
- Data : convertWorkspaceAgentMetadata (lastDBMeta ),
1689
+ Data : convertWorkspaceAgentMetadata (values ),
1654
1690
})
1655
1691
}
1656
1692
1657
- // Note: we previously used a debounce here, but when the rate of metadata updates was too
1658
- // high the debounce would never fire.
1659
- //
1660
- // The rate-limit has its own caveat. If the agent sends a burst of metadata
1661
- // but then goes quiet, we will never pull the new metadata and the frontend
1662
- // will go stale until refresh. This would only happen if the agent was
1663
- // under extreme load. Under normal operations, the interval between metadata
1664
- // updates is constant so there is no burst phenomenon.
1665
- pubsubRatelimit := rate .NewLimiter (rate .Every (time .Second ), 2 )
1666
- if flag .Lookup ("test.v" ) != nil {
1667
- // We essentially disable the rate-limit in tests for determinism.
1668
- pubsubRatelimit = rate .NewLimiter (rate .Every (time .Second * 100 ), 100 )
1669
- }
1670
-
1671
- // Send metadata on updates, we must ensure subscription before sending
1672
- // initial metadata to guarantee that events in-between are not missed.
1673
- cancelSub , err := api .Pubsub .Subscribe (watchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), func (_ context.Context , _ []byte ) {
1674
- allow := pubsubRatelimit .Allow ()
1675
- log .Debug (ctx , "received metadata update" , "allow" , allow )
1676
- if allow {
1677
- sendMetadata (true )
1678
- }
1679
- })
1680
- if err != nil {
1681
- httpapi .InternalServerError (rw , err )
1682
- return
1683
- }
1684
- defer cancelSub ()
1685
-
1686
- // Send initial metadata.
1687
- sendMetadata (true )
1693
+ sendMetadata ()
1688
1694
1689
1695
for {
1690
1696
select {
1691
- case <- senderClosed :
1697
+ case <- sendTicker .C :
1698
+ // We send an update even if there's no change every 5 seconds
1699
+ // to ensure that the frontend always has an accurate "Result.Age".
1700
+ if ! pendingChanges .Load () && time .Since (lastSend ) < time .Second * 5 {
1701
+ continue
1702
+ }
1703
+ sendMetadata ()
1704
+ case <- sseSenderClosed :
1692
1705
return
1693
- case <- refreshTicker .C :
1694
1706
}
1695
-
1696
- // Avoid spamming the DB with reads we know there are no updates. We want
1697
- // to continue sending updates to the frontend so that "Result.Age"
1698
- // is always accurate. This way, the frontend doesn't need to
1699
- // sync its own clock with the backend.
1700
- sendMetadata (false )
1701
1707
}
1702
1708
}
1703
1709
@@ -1721,6 +1727,10 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code
1721
1727
},
1722
1728
})
1723
1729
}
1730
+ // Sorting prevents the metadata from jumping around in the frontend.
1731
+ sort .Slice (result , func (i , j int ) bool {
1732
+ return result [i ].Description .Key < result [j ].Description .Key
1733
+ })
1724
1734
return result
1725
1735
}
1726
1736
0 commit comments