@@ -1785,10 +1785,10 @@ func (api *API) workspaceAgentUpdateMetadata(ctx context.Context, workspaceAgent
1785
1785
1786
1786
datum := database.UpdateWorkspaceAgentMetadataParams {
1787
1787
WorkspaceAgentID : workspaceAgent .ID ,
1788
- Key : []string {} ,
1789
- Value : []string {} ,
1790
- Error : []string {} ,
1791
- CollectedAt : []time.Time {} ,
1788
+ Key : make ( []string , 0 , len ( req . Metadata )) ,
1789
+ Value : make ( []string , 0 , len ( req . Metadata )) ,
1790
+ Error : make ( []string , 0 , len ( req . Metadata )) ,
1791
+ CollectedAt : make ( []time.Time , 0 , len ( req . Metadata )) ,
1792
1792
}
1793
1793
1794
1794
for _ , md := range req .Metadata {
@@ -1853,41 +1853,36 @@ func (api *API) workspaceAgentUpdateMetadata(ctx context.Context, workspaceAgent
1853
1853
// @Router /workspaceagents/{workspaceagent}/watch-metadata [get]
1854
1854
// @x-apidocgen {"skip": true}
1855
1855
func (api * API ) watchWorkspaceAgentMetadata (rw http.ResponseWriter , r * http.Request ) {
1856
- var (
1857
- ctx = r .Context ()
1858
- workspaceAgent = httpmw .WorkspaceAgentParam (r )
1859
- log = api .Logger .Named ("workspace_metadata_watcher" ).With (
1860
- slog .F ("workspace_agent_id" , workspaceAgent .ID ),
1861
- )
1856
+ // Allow us to interrupt watch via cancel.
1857
+ ctx , cancel := context .WithCancel (r .Context ())
1858
+ defer cancel ()
1859
+ r = r .WithContext (ctx ) // Rewire context for SSE cancellation.
1860
+
1861
+ workspaceAgent := httpmw .WorkspaceAgentParam (r )
1862
+ log := api .Logger .Named ("workspace_metadata_watcher" ).With (
1863
+ slog .F ("workspace_agent_id" , workspaceAgent .ID ),
1862
1864
)
1863
1865
1864
1866
// Send metadata on updates, we must ensure subscription before sending
1865
1867
// initial metadata to guarantee that events in-between are not missed.
1866
1868
update := make (chan workspaceAgentMetadataChannelPayload , 1 )
1867
1869
cancelSub , err := api .Pubsub .Subscribe (watchWorkspaceAgentMetadataChannel (workspaceAgent .ID ), func (_ context.Context , byt []byte ) {
1870
+ if ctx .Err () != nil {
1871
+ return
1872
+ }
1873
+
1868
1874
var payload workspaceAgentMetadataChannelPayload
1869
1875
err := json .Unmarshal (byt , & payload )
1870
1876
if err != nil {
1871
- api . Logger .Error (ctx , "failed to unmarshal pubsub message" , slog .Error (err ))
1877
+ log .Error (ctx , "failed to unmarshal pubsub message" , slog .Error (err ))
1872
1878
return
1873
1879
}
1874
1880
1875
1881
log .Debug (ctx , "received metadata update" , "payload" , payload )
1876
1882
1877
1883
select {
1878
1884
case prev := <- update :
1879
- // This update wasn't consumed yet, merge the keys.
1880
- newKeysSet := make (map [string ]struct {})
1881
- for _ , key := range payload .Keys {
1882
- newKeysSet [key ] = struct {}{}
1883
- }
1884
- keys := prev .Keys
1885
- for _ , key := range prev .Keys {
1886
- if _ , ok := newKeysSet [key ]; ! ok {
1887
- keys = append (keys , key )
1888
- }
1889
- }
1890
- payload .Keys = keys
1885
+ payload .Keys = appendUnique (prev .Keys , payload .Keys )
1891
1886
default :
1892
1887
}
1893
1888
// This can never block since we pop and merge beforehand.
@@ -1899,22 +1894,9 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1899
1894
}
1900
1895
defer cancelSub ()
1901
1896
1902
- sseSendEvent , sseSenderClosed , err := httpapi .ServerSentEventSender (rw , r )
1903
- if err != nil {
1904
- httpapi .Write (ctx , rw , http .StatusInternalServerError , codersdk.Response {
1905
- Message : "Internal error setting up server-sent events." ,
1906
- Detail : err .Error (),
1907
- })
1908
- return
1909
- }
1910
- // Prevent handler from returning until the sender is closed.
1911
- defer func () {
1912
- <- sseSenderClosed
1913
- }()
1914
-
1915
1897
// We always use the original Request context because it contains
1916
1898
// the RBAC actor.
1917
- md , err := api .Database .GetWorkspaceAgentMetadata (ctx , database.GetWorkspaceAgentMetadataParams {
1899
+ initialMD , err := api .Database .GetWorkspaceAgentMetadata (ctx , database.GetWorkspaceAgentMetadataParams {
1918
1900
WorkspaceAgentID : workspaceAgent .ID ,
1919
1901
Keys : nil ,
1920
1902
})
@@ -1926,15 +1908,45 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1926
1908
return
1927
1909
}
1928
1910
1929
- metadataMap := make (map [string ]database.WorkspaceAgentMetadatum )
1930
- for _ , datum := range md {
1911
+ log .Debug (ctx , "got initial metadata" , "num" , len (initialMD ))
1912
+
1913
+ metadataMap := make (map [string ]database.WorkspaceAgentMetadatum , len (initialMD ))
1914
+ for _ , datum := range initialMD {
1931
1915
metadataMap [datum .Key ] = datum
1932
1916
}
1917
+ //nolint:ineffassign // Release memory.
1918
+ initialMD = nil
1919
+
1920
+ sseSendEvent , sseSenderClosed , err := httpapi .ServerSentEventSender (rw , r )
1921
+ if err != nil {
1922
+ httpapi .Write (ctx , rw , http .StatusInternalServerError , codersdk.Response {
1923
+ Message : "Internal error setting up server-sent events." ,
1924
+ Detail : err .Error (),
1925
+ })
1926
+ return
1927
+ }
1928
+ // Prevent handler from returning until the sender is closed.
1929
+ defer func () {
1930
+ cancel ()
1931
+ <- sseSenderClosed
1932
+ }()
1933
+ // Synchronize cancellation from SSE -> context, this lets us simplify the
1934
+ // cancellation logic.
1935
+ go func () {
1936
+ select {
1937
+ case <- ctx .Done ():
1938
+ case <- sseSenderClosed :
1939
+ cancel ()
1940
+ }
1941
+ }()
1933
1942
1934
1943
var lastSend time.Time
1935
1944
sendMetadata := func () {
1936
1945
lastSend = time .Now ()
1937
1946
values := maps .Values (metadataMap )
1947
+
1948
+ log .Debug (ctx , "sending metadata" , "num" , len (values ))
1949
+
1938
1950
_ = sseSendEvent (ctx , codersdk.ServerSentEvent {
1939
1951
Type : codersdk .ServerSentEventTypeData ,
1940
1952
Data : convertWorkspaceAgentMetadata (values ),
@@ -1953,10 +1965,11 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1953
1965
fetchedMetadata := make (chan []database.WorkspaceAgentMetadatum )
1954
1966
go func () {
1955
1967
defer close (fetchedMetadata )
1968
+ defer cancel ()
1956
1969
1957
1970
for {
1958
1971
select {
1959
- case <- sseSenderClosed :
1972
+ case <- ctx . Done () :
1960
1973
return
1961
1974
case payload := <- update :
1962
1975
md , err := api .Database .GetWorkspaceAgentMetadata (ctx , database.GetWorkspaceAgentMetadataParams {
@@ -1966,24 +1979,35 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
1966
1979
if err != nil {
1967
1980
if ! errors .Is (err , context .Canceled ) {
1968
1981
log .Error (ctx , "failed to get metadata" , slog .Error (err ))
1982
+ _ = sseSendEvent (ctx , codersdk.ServerSentEvent {
1983
+ Type : codersdk .ServerSentEventTypeError ,
1984
+ Data : codersdk.Response {
1985
+ Message : "Failed to get metadata." ,
1986
+ Detail : err .Error (),
1987
+ },
1988
+ })
1969
1989
}
1970
1990
return
1971
1991
}
1972
1992
select {
1973
- case <- sseSenderClosed :
1993
+ case <- ctx . Done () :
1974
1994
return
1975
1995
// We want to block here to avoid constantly pinging the
1976
1996
// database when the metadata isn't being processed.
1977
1997
case fetchedMetadata <- md :
1998
+ log .Debug (ctx , "fetched metadata update for keys" , "keys" , payload .Keys , "num" , len (md ))
1978
1999
}
1979
2000
}
1980
2001
}
1981
2002
}()
2003
+ defer func () {
2004
+ <- fetchedMetadata
2005
+ }()
1982
2006
1983
2007
pendingChanges := true
1984
2008
for {
1985
2009
select {
1986
- case <- sseSenderClosed :
2010
+ case <- ctx . Done () :
1987
2011
return
1988
2012
case md , ok := <- fetchedMetadata :
1989
2013
if ! ok {
@@ -2007,9 +2031,24 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
2007
2031
}
2008
2032
}
2009
2033
2034
+ // appendUnique is like append and adds elements from src to dst,
2035
+ // skipping any elements that already exist in dst.
2036
+ func appendUnique [T comparable ](dst , src []T ) []T {
2037
+ exists := make (map [T ]struct {}, len (dst ))
2038
+ for _ , key := range dst {
2039
+ exists [key ] = struct {}{}
2040
+ }
2041
+ for _ , key := range src {
2042
+ if _ , ok := exists [key ]; ! ok {
2043
+ dst = append (dst , key )
2044
+ }
2045
+ }
2046
+ return dst
2047
+ }
2048
+
2010
2049
func convertWorkspaceAgentMetadata (db []database.WorkspaceAgentMetadatum ) []codersdk.WorkspaceAgentMetadata {
2011
2050
// An empty array is easier for clients to handle than a null.
2012
- result := []codersdk.WorkspaceAgentMetadata {}
2051
+ result := make ( []codersdk.WorkspaceAgentMetadata , 0 , len ( db ))
2013
2052
for _ , datum := range db {
2014
2053
result = append (result , codersdk.WorkspaceAgentMetadata {
2015
2054
Result : codersdk.WorkspaceAgentMetadataResult {
0 commit comments