@@ -2,11 +2,12 @@ package coderd
2
2
3
3
import (
4
4
"bufio"
5
+ "bytes"
5
6
"context"
6
7
"database/sql"
8
+ "encoding/gob"
7
9
"encoding/json"
8
10
"errors"
9
- "flag"
10
11
"fmt"
11
12
"io"
12
13
"net"
@@ -17,7 +18,6 @@ import (
17
18
"sort"
18
19
"strconv"
19
20
"strings"
20
- "sync"
21
21
"sync/atomic"
22
22
"time"
23
23
@@ -26,7 +26,6 @@ import (
26
26
"golang.org/x/exp/slices"
27
27
"golang.org/x/mod/semver"
28
28
"golang.org/x/sync/errgroup"
29
- "golang.org/x/time/rate"
30
29
"golang.org/x/xerrors"
31
30
"nhooyr.io/websocket"
32
31
"tailscale.com/tailcfg"
@@ -1492,6 +1491,19 @@ func ellipse(v string, n int) string {
1492
1491
return v
1493
1492
}
1494
1493
1494
+ func marshalGob (v any ) ([]byte , error ) {
1495
+ var buf bytes.Buffer
1496
+ err := gob .NewEncoder (& buf ).Encode (v )
1497
+ if err != nil {
1498
+ return nil , err
1499
+ }
1500
+ return buf .Bytes (), nil
1501
+ }
1502
+
1503
+ func unmarshalGob (data []byte , v any ) error {
1504
+ return gob .NewDecoder (bytes .NewReader (data )).Decode (v )
1505
+ }
1506
+
1495
1507
// @Summary Submit workspace agent metadata
1496
1508
// @ID submit-workspace-agent-metadata
1497
1509
// @Security CoderSessionToken
@@ -1524,7 +1536,9 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
1524
1536
key := chi .URLParam (r , "key" )
1525
1537
1526
1538
const (
1527
- maxValueLen = 32 << 10
1539
+ // maxValueLen is set to 4096 to stay under the 8000 byte Postgres
1540
+ // NOTIFY limit.
1541
+ maxValueLen = 4096
1528
1542
maxErrorLen = maxValueLen
1529
1543
)
1530
1544
@@ -1567,7 +1581,15 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
1567
1581
slog .F ("value" , ellipse (datum .Value , 16 )),
1568
1582
)
1569
1583
1570
- err = api .Pubsub .Publish (watchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), []byte (datum .Key ))
1584
+ // gob is used instead of JSON due to the potential large performance
1585
+ // overhead of agent metadata.
1586
+ datumGob , err := marshalGob (datum )
1587
+ if err != nil {
1588
+ httpapi .InternalServerError (rw , err )
1589
+ return
1590
+ }
1591
+
1592
+ err = api .Pubsub .Publish (watchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), datumGob )
1571
1593
if err != nil {
1572
1594
httpapi .InternalServerError (rw , err )
1573
1595
return
@@ -1593,7 +1615,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1593
1615
)
1594
1616
)
1595
1617
1596
- sendEvent , senderClosed , err := httpapi .ServerSentEventSender (rw , r )
1618
+ sseSendEvent , sseSenderClosed , err := httpapi .ServerSentEventSender (rw , r )
1597
1619
if err != nil {
1598
1620
httpapi .Write (ctx , rw , http .StatusInternalServerError , codersdk.Response {
1599
1621
Message : "Internal error setting up server-sent events." ,
@@ -1603,30 +1625,25 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1603
1625
}
1604
1626
// Prevent handler from returning until the sender is closed.
1605
1627
defer func () {
1606
- <- senderClosed
1628
+ <- sseSenderClosed
1607
1629
}()
1608
1630
1609
1631
const refreshInterval = time .Second * 5
1610
1632
refreshTicker := time .NewTicker (refreshInterval )
1611
1633
defer refreshTicker .Stop ()
1612
1634
1613
- var (
1614
- lastDBMetaMu sync.Mutex
1615
- lastDBMeta []database.WorkspaceAgentMetadatum
1616
- )
1635
+ var lastMetadata []database.WorkspaceAgentMetadatum
1617
1636
1618
- sendMetadata := func (pull bool ) {
1619
- log .Debug (ctx , "sending metadata update" , "pull" , pull )
1620
- lastDBMetaMu .Lock ()
1621
- defer lastDBMetaMu .Unlock ()
1637
+ sendMetadata := func (pullDB bool ) {
1638
+ log .Debug (ctx , "sending metadata update" , "pull_db" , pullDB )
1622
1639
1623
- var err error
1624
- if pull {
1640
+ if pullDB {
1641
+ var err error
1625
1642
// We always use the original Request context because it contains
1626
1643
// the RBAC actor.
1627
- lastDBMeta , err = api .Database .GetWorkspaceAgentMetadata (ctx , workspaceAgent .ID )
1644
+ lastMetadata , err = api .Database .GetWorkspaceAgentMetadata (ctx , workspaceAgent .ID )
1628
1645
if err != nil {
1629
- _ = sendEvent (ctx , codersdk.ServerSentEvent {
1646
+ _ = sseSendEvent (ctx , codersdk.ServerSentEvent {
1630
1647
Type : codersdk .ServerSentEventTypeError ,
1631
1648
Data : codersdk.Response {
1632
1649
Message : "Internal error getting metadata." ,
@@ -1635,7 +1652,7 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1635
1652
})
1636
1653
return
1637
1654
}
1638
- slices .SortFunc (lastDBMeta , func (a , b database.WorkspaceAgentMetadatum ) int {
1655
+ slices .SortFunc (lastMetadata , func (a , b database.WorkspaceAgentMetadatum ) int {
1639
1656
return slice .Ascending (a .Key , b .Key )
1640
1657
})
1641
1658
@@ -1644,34 +1661,26 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1644
1661
refreshTicker .Reset (refreshInterval )
1645
1662
}
1646
1663
1647
- _ = sendEvent (ctx , codersdk.ServerSentEvent {
1664
+ _ = sseSendEvent (ctx , codersdk.ServerSentEvent {
1648
1665
Type : codersdk .ServerSentEventTypeData ,
1649
- Data : convertWorkspaceAgentMetadata (lastDBMeta ),
1666
+ Data : convertWorkspaceAgentMetadata (lastMetadata ),
1650
1667
})
1651
1668
}
1652
1669
1653
- // Note: we previously used a debounce here, but when the rate of metadata updates was too
1654
- // high the debounce would never fire.
1655
- //
1656
- // The rate-limit has its own caveat. If the agent sends a burst of metadata
1657
- // but then goes quiet, we will never pull the new metadata and the frontend
1658
- // will go stale until refresh. This would only happen if the agent was
1659
- // under extreme load. Under normal operations, the interval between metadata
1660
- // updates is constant so there is no burst phenomenon.
1661
- pubsubRatelimit := rate .NewLimiter (rate .Every (time .Second ), 2 )
1662
- if flag .Lookup ("test.v" ) != nil {
1663
- // We essentially disable the rate-limit in tests for determinism.
1664
- pubsubRatelimit = rate .NewLimiter (rate .Every (time .Second * 100 ), 100 )
1665
- }
1670
+ // Note: we tried using a ratelimit and debounce here before but it was
1671
+ // pretty buggy.
1672
+ pubsubUpdates := make (chan database.UpdateWorkspaceAgentMetadataParams , 8 )
1666
1673
1667
1674
// Send metadata on updates, we must ensure subscription before sending
1668
1675
// initial metadata to guarantee that events in-between are not missed.
1669
- cancelSub , err := api .Pubsub .Subscribe (watchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), func (_ context.Context , _ []byte ) {
1670
- allow := pubsubRatelimit .Allow ()
1671
- log .Debug (ctx , "received metadata update" , "allow" , allow )
1672
- if allow {
1673
- sendMetadata (true )
1676
+ cancelSub , err := api .Pubsub .Subscribe (watchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), func (_ context.Context , byt []byte ) {
1677
+ var update database.UpdateWorkspaceAgentMetadataParams
1678
+ err = unmarshalGob (byt , & update )
1679
+ if err != nil {
1680
+ api .Logger .Error (ctx , "failed to unmarshal pubsub message" , slog .Error (err ))
1681
+ return
1674
1682
}
1683
+ pubsubUpdates <- update
1675
1684
})
1676
1685
if err != nil {
1677
1686
httpapi .InternalServerError (rw , err )
@@ -1684,16 +1693,29 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1684
1693
1685
1694
for {
1686
1695
select {
1687
- case <- senderClosed :
1688
- return
1696
+ case update := <- pubsubUpdates :
1697
+ // There can only be 128 metadatums so this will always be fast.
1698
+ for i , datum := range lastMetadata {
1699
+ if datum .Key != update .Key {
1700
+ continue
1701
+ }
1702
+
1703
+ lastMetadata [i ] = database.WorkspaceAgentMetadatum {
1704
+ Value : update .Value ,
1705
+ Error : update .Error ,
1706
+ CollectedAt : update .CollectedAt ,
1707
+ }
1708
+ }
1709
+ sendMetadata (false )
1689
1710
case <- refreshTicker .C :
1711
+ // Avoid spamming the DB with reads we know there are no updates. We want
1712
+ // to continue sending updates to the frontend so that "Result.Age"
1713
+ // is always accurate. This way, the frontend doesn't need to
1714
+ // sync its own clock with the backend.
1715
+ sendMetadata (false )
1716
+ case <- sseSenderClosed :
1717
+ return
1690
1718
}
1691
-
1692
- // Avoid spamming the DB with reads we know there are no updates. We want
1693
- // to continue sending updates to the frontend so that "Result.Age"
1694
- // is always accurate. This way, the frontend doesn't need to
1695
- // sync its own clock with the backend.
1696
- sendMetadata (false )
1697
1719
}
1698
1720
}
1699
1721
0 commit comments