Skip to content

Commit 198b56c

Browse files
authored
fix(coderd): fix memory leak in watchWorkspaceAgentMetadata (coder#10685)
Fixes coder#10550
1 parent c130f8d commit 198b56c

File tree

4 files changed

+269
-43
lines changed

4 files changed

+269
-43
lines changed

coderd/workspaceagents.go

Lines changed: 82 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,10 +1785,10 @@ func (api *API) workspaceAgentUpdateMetadata(ctx context.Context, workspaceAgent
17851785

17861786
datum := database.UpdateWorkspaceAgentMetadataParams{
17871787
WorkspaceAgentID: workspaceAgent.ID,
1788-
Key: []string{},
1789-
Value: []string{},
1790-
Error: []string{},
1791-
CollectedAt: []time.Time{},
1788+
Key: make([]string, 0, len(req.Metadata)),
1789+
Value: make([]string, 0, len(req.Metadata)),
1790+
Error: make([]string, 0, len(req.Metadata)),
1791+
CollectedAt: make([]time.Time, 0, len(req.Metadata)),
17921792
}
17931793

17941794
for _, md := range req.Metadata {
@@ -1853,41 +1853,36 @@ func (api *API) workspaceAgentUpdateMetadata(ctx context.Context, workspaceAgent
18531853
// @Router /workspaceagents/{workspaceagent}/watch-metadata [get]
18541854
// @x-apidocgen {"skip": true}
18551855
func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Request) {
1856-
var (
1857-
ctx = r.Context()
1858-
workspaceAgent = httpmw.WorkspaceAgentParam(r)
1859-
log = api.Logger.Named("workspace_metadata_watcher").With(
1860-
slog.F("workspace_agent_id", workspaceAgent.ID),
1861-
)
1856+
// Allow us to interrupt watch via cancel.
1857+
ctx, cancel := context.WithCancel(r.Context())
1858+
defer cancel()
1859+
r = r.WithContext(ctx) // Rewire context for SSE cancellation.
1860+
1861+
workspaceAgent := httpmw.WorkspaceAgentParam(r)
1862+
log := api.Logger.Named("workspace_metadata_watcher").With(
1863+
slog.F("workspace_agent_id", workspaceAgent.ID),
18621864
)
18631865

18641866
// Send metadata on updates, we must ensure subscription before sending
18651867
// initial metadata to guarantee that events in-between are not missed.
18661868
update := make(chan workspaceAgentMetadataChannelPayload, 1)
18671869
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
1870+
if ctx.Err() != nil {
1871+
return
1872+
}
1873+
18681874
var payload workspaceAgentMetadataChannelPayload
18691875
err := json.Unmarshal(byt, &payload)
18701876
if err != nil {
1871-
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
1877+
log.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
18721878
return
18731879
}
18741880

18751881
log.Debug(ctx, "received metadata update", "payload", payload)
18761882

18771883
select {
18781884
case prev := <-update:
1879-
// This update wasn't consumed yet, merge the keys.
1880-
newKeysSet := make(map[string]struct{})
1881-
for _, key := range payload.Keys {
1882-
newKeysSet[key] = struct{}{}
1883-
}
1884-
keys := prev.Keys
1885-
for _, key := range prev.Keys {
1886-
if _, ok := newKeysSet[key]; !ok {
1887-
keys = append(keys, key)
1888-
}
1889-
}
1890-
payload.Keys = keys
1885+
payload.Keys = appendUnique(prev.Keys, payload.Keys)
18911886
default:
18921887
}
18931888
// This can never block since we pop and merge beforehand.
@@ -1899,22 +1894,9 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
18991894
}
19001895
defer cancelSub()
19011896

1902-
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
1903-
if err != nil {
1904-
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
1905-
Message: "Internal error setting up server-sent events.",
1906-
Detail: err.Error(),
1907-
})
1908-
return
1909-
}
1910-
// Prevent handler from returning until the sender is closed.
1911-
defer func() {
1912-
<-sseSenderClosed
1913-
}()
1914-
19151897
// We always use the original Request context because it contains
19161898
// the RBAC actor.
1917-
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
1899+
initialMD, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
19181900
WorkspaceAgentID: workspaceAgent.ID,
19191901
Keys: nil,
19201902
})
@@ -1926,15 +1908,45 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
19261908
return
19271909
}
19281910

1929-
metadataMap := make(map[string]database.WorkspaceAgentMetadatum)
1930-
for _, datum := range md {
1911+
log.Debug(ctx, "got initial metadata", "num", len(initialMD))
1912+
1913+
metadataMap := make(map[string]database.WorkspaceAgentMetadatum, len(initialMD))
1914+
for _, datum := range initialMD {
19311915
metadataMap[datum.Key] = datum
19321916
}
1917+
//nolint:ineffassign // Release memory.
1918+
initialMD = nil
1919+
1920+
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
1921+
if err != nil {
1922+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
1923+
Message: "Internal error setting up server-sent events.",
1924+
Detail: err.Error(),
1925+
})
1926+
return
1927+
}
1928+
// Prevent handler from returning until the sender is closed.
1929+
defer func() {
1930+
cancel()
1931+
<-sseSenderClosed
1932+
}()
1933+
// Synchronize cancellation from SSE -> context, this lets us simplify the
1934+
// cancellation logic.
1935+
go func() {
1936+
select {
1937+
case <-ctx.Done():
1938+
case <-sseSenderClosed:
1939+
cancel()
1940+
}
1941+
}()
19331942

19341943
var lastSend time.Time
19351944
sendMetadata := func() {
19361945
lastSend = time.Now()
19371946
values := maps.Values(metadataMap)
1947+
1948+
log.Debug(ctx, "sending metadata", "num", len(values))
1949+
19381950
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
19391951
Type: codersdk.ServerSentEventTypeData,
19401952
Data: convertWorkspaceAgentMetadata(values),
@@ -1953,10 +1965,11 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
19531965
fetchedMetadata := make(chan []database.WorkspaceAgentMetadatum)
19541966
go func() {
19551967
defer close(fetchedMetadata)
1968+
defer cancel()
19561969

19571970
for {
19581971
select {
1959-
case <-sseSenderClosed:
1972+
case <-ctx.Done():
19601973
return
19611974
case payload := <-update:
19621975
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
@@ -1966,24 +1979,35 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
19661979
if err != nil {
19671980
if !errors.Is(err, context.Canceled) {
19681981
log.Error(ctx, "failed to get metadata", slog.Error(err))
1982+
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
1983+
Type: codersdk.ServerSentEventTypeError,
1984+
Data: codersdk.Response{
1985+
Message: "Failed to get metadata.",
1986+
Detail: err.Error(),
1987+
},
1988+
})
19691989
}
19701990
return
19711991
}
19721992
select {
1973-
case <-sseSenderClosed:
1993+
case <-ctx.Done():
19741994
return
19751995
// We want to block here to avoid constantly pinging the
19761996
// database when the metadata isn't being processed.
19771997
case fetchedMetadata <- md:
1998+
log.Debug(ctx, "fetched metadata update for keys", "keys", payload.Keys, "num", len(md))
19781999
}
19792000
}
19802001
}
19812002
}()
2003+
defer func() {
2004+
<-fetchedMetadata
2005+
}()
19822006

19832007
pendingChanges := true
19842008
for {
19852009
select {
1986-
case <-sseSenderClosed:
2010+
case <-ctx.Done():
19872011
return
19882012
case md, ok := <-fetchedMetadata:
19892013
if !ok {
@@ -2007,9 +2031,24 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
20072031
}
20082032
}
20092033

2034+
// appendUnique is like append and adds elements from src to dst,
2035+
// skipping any elements that already exist in dst.
2036+
func appendUnique[T comparable](dst, src []T) []T {
2037+
exists := make(map[T]struct{}, len(dst))
2038+
for _, key := range dst {
2039+
exists[key] = struct{}{}
2040+
}
2041+
for _, key := range src {
2042+
if _, ok := exists[key]; !ok {
2043+
dst = append(dst, key)
2044+
}
2045+
}
2046+
return dst
2047+
}
2048+
20102049
func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []codersdk.WorkspaceAgentMetadata {
20112050
// An empty array is easier for clients to handle than a null.
2012-
result := []codersdk.WorkspaceAgentMetadata{}
2051+
result := make([]codersdk.WorkspaceAgentMetadata, 0, len(db))
20132052
for _, datum := range db {
20142053
result = append(result, codersdk.WorkspaceAgentMetadata{
20152054
Result: codersdk.WorkspaceAgentMetadataResult{

0 commit comments

Comments
 (0)