From dc89ae8aa181747a7ffa0f6de48bfa0a9f8ffb03 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Tue, 3 Oct 2023 13:59:54 +0000 Subject: [PATCH] feat(coderd): add support for sending batched agent metadata --- agent/agent.go | 2 +- agent/agent_test.go | 4 +- agent/agenttest/client.go | 8 +- coderd/apidoc/docs.go | 76 ++++++- coderd/apidoc/swagger.json | 72 ++++++- coderd/coderd.go | 3 +- coderd/database/dbauthz/dbauthz.go | 6 +- coderd/database/dbfake/dbfake.go | 33 +-- coderd/database/dbmetrics/dbmetrics.go | 2 +- coderd/database/dbmock/dbmock.go | 2 +- coderd/database/querier.go | 2 +- coderd/database/queries.sql.go | 49 +++-- coderd/database/queries/workspaceagents.sql | 24 ++- coderd/deprecated.go | 44 ++++ coderd/workspaceagents.go | 226 ++++++++++++-------- coderd/wsconncache/wsconncache_test.go | 2 +- codersdk/agentsdk/agentsdk.go | 13 +- docs/api/schemas.md | 44 ++++ 18 files changed, 469 insertions(+), 143 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 7a50aa1fe1756..68c82eea2d1b1 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -90,7 +90,7 @@ type Client interface { PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error PostStartup(ctx context.Context, req agentsdk.PostStartupRequest) error - PostMetadata(ctx context.Context, key string, req agentsdk.PostMetadataRequest) error + PostMetadata(ctx context.Context, key string, req agentsdk.PostMetadataRequestDeprecated) error PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerConfig, error) } diff --git a/agent/agent_test.go b/agent/agent_test.go index a609e779c96f0..22734678ccb7e 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1079,7 +1079,7 @@ func TestAgent_Metadata(t *testing.T) { opts.ReportMetadataInterval = 100 * time.Millisecond }) - var gotMd map[string]agentsdk.PostMetadataRequest + var gotMd map[string]agentsdk.PostMetadataRequestDeprecated require.Eventually(t, func() bool { gotMd = client.GetMetadata() return len(gotMd) == 1 @@ -1112,7 +1112,7 @@ func TestAgent_Metadata(t *testing.T) { opts.ReportMetadataInterval = testutil.IntervalFast }) - var gotMd map[string]agentsdk.PostMetadataRequest + var gotMd map[string]agentsdk.PostMetadataRequestDeprecated require.Eventually(t, func() bool { gotMd = client.GetMetadata() return len(gotMd) == 1 diff --git a/agent/agenttest/client.go b/agent/agenttest/client.go index f8c69bf408869..ac2b447518f90 100644 --- a/agent/agenttest/client.go +++ b/agent/agenttest/client.go @@ -45,7 +45,7 @@ type Client struct { logger slog.Logger agentID uuid.UUID manifest agentsdk.Manifest - metadata map[string]agentsdk.PostMetadataRequest + metadata map[string]agentsdk.PostMetadataRequestDeprecated statsChan chan *agentsdk.Stats coordinator tailnet.Coordinator LastWorkspaceAgent func() @@ -136,17 +136,17 @@ func (c *Client) GetStartup() agentsdk.PostStartupRequest { return c.startup } -func (c *Client) GetMetadata() map[string]agentsdk.PostMetadataRequest { +func (c *Client) GetMetadata() map[string]agentsdk.PostMetadataRequestDeprecated { c.mu.Lock() defer c.mu.Unlock() return maps.Clone(c.metadata) } -func (c *Client) PostMetadata(ctx context.Context, key string, req agentsdk.PostMetadataRequest) error { +func (c *Client) PostMetadata(ctx context.Context, key string, req agentsdk.PostMetadataRequestDeprecated) error { c.mu.Lock() defer c.mu.Unlock() if c.metadata == nil { - c.metadata = make(map[string]agentsdk.PostMetadataRequest) + c.metadata = make(map[string]agentsdk.PostMetadataRequestDeprecated) } c.metadata[key] = req c.logger.Debug(ctx, "post metadata", slog.F("key", key), slog.F("req", req)) diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index 71065f4ad1371..ea5eae748b255 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -4867,7 +4867,7 @@ const docTemplate = `{ } } }, - "/workspaceagents/me/metadata/{key}": { + "/workspaceagents/me/metadata": { "post": { "security": [ { @@ -4889,7 +4889,46 @@ const docTemplate = `{ "in": "body", "required": true, "schema": { - "$ref": "#/definitions/agentsdk.PostMetadataRequest" + "type": "array", + "items": { + "$ref": "#/definitions/agentsdk.PostMetadataRequest" + } + } + } + ], + "responses": { + "204": { + "description": "Success" + } + }, + "x-apidocgen": { + "skip": true + } + } + }, + "/workspaceagents/me/metadata/{key}": { + "post": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "consumes": [ + "application/json" + ], + "tags": [ + "Agents" + ], + "summary": "Removed: Submit workspace agent metadata", + "operationId": "removed-submit-workspace-agent-metadata", + "parameters": [ + { + "description": "Workspace agent metadata request", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/agentsdk.PostMetadataRequestDeprecated" } }, { @@ -6708,6 +6747,28 @@ const docTemplate = `{ } } }, + "agentsdk.Metadata": { + "type": "object", + "properties": { + "age": { + "description": "Age is the number of seconds since the metadata was collected.\nIt is provided in addition to CollectedAt to protect against clock skew.", + "type": "integer" + }, + "collected_at": { + "type": "string", + "format": "date-time" + }, + "error": { + "type": "string" + }, + "key": { + "type": "string" + }, + "value": { + "type": "string" + } + } + }, "agentsdk.PatchLogs": { "type": "object", "properties": { @@ -6746,6 +6807,17 @@ const docTemplate = `{ } }, "agentsdk.PostMetadataRequest": { + "type": "object", + "properties": { + "metadata": { + "type": "array", + "items": { + "$ref": "#/definitions/agentsdk.Metadata" + } + } + } + }, + "agentsdk.PostMetadataRequestDeprecated": { "type": "object", "properties": { "age": { diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index 12b388130c289..5b908c90b176a 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -4281,7 +4281,7 @@ } } }, - "/workspaceagents/me/metadata/{key}": { + "/workspaceagents/me/metadata": { "post": { "security": [ { @@ -4299,7 +4299,42 @@ "in": "body", "required": true, "schema": { - "$ref": "#/definitions/agentsdk.PostMetadataRequest" + "type": "array", + "items": { + "$ref": "#/definitions/agentsdk.PostMetadataRequest" + } + } + } + ], + "responses": { + "204": { + "description": "Success" + } + }, + "x-apidocgen": { + "skip": true + } + } + }, + "/workspaceagents/me/metadata/{key}": { + "post": { + "security": [ + { + "CoderSessionToken": [] + } + ], + "consumes": ["application/json"], + "tags": ["Agents"], + "summary": "Removed: Submit workspace agent metadata", + "operationId": "removed-submit-workspace-agent-metadata", + "parameters": [ + { + "description": "Workspace agent metadata request", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/agentsdk.PostMetadataRequestDeprecated" } }, { @@ -5922,6 +5957,28 @@ } } }, + "agentsdk.Metadata": { + "type": "object", + "properties": { + "age": { + "description": "Age is the number of seconds since the metadata was collected.\nIt is provided in addition to CollectedAt to protect against clock skew.", + "type": "integer" + }, + "collected_at": { + "type": "string", + "format": "date-time" + }, + "error": { + "type": "string" + }, + "key": { + "type": "string" + }, + "value": { + "type": "string" + } + } + }, "agentsdk.PatchLogs": { "type": "object", "properties": { @@ -5960,6 +6017,17 @@ } }, "agentsdk.PostMetadataRequest": { + "type": "object", + "properties": { + "metadata": { + "type": "array", + "items": { + "$ref": "#/definitions/agentsdk.Metadata" + } + } + } + }, + "agentsdk.PostMetadataRequestDeprecated": { "type": "object", "properties": { "age": { diff --git a/coderd/coderd.go b/coderd/coderd.go index d0aaee5b1ce98..780386c4a5db1 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -821,7 +821,8 @@ func New(options *Options) *API { r.Get("/coordinate", api.workspaceAgentCoordinate) r.Post("/report-stats", api.workspaceAgentReportStats) r.Post("/report-lifecycle", api.workspaceAgentReportLifecycle) - r.Post("/metadata/{key}", api.workspaceAgentPostMetadata) + r.Post("/metadata", api.workspaceAgentPostMetadata) + r.Post("/metadata/{key}", api.workspaceAgentPostMetadataDeprecated) }) r.Route("/{workspaceagent}", func(r chi.Router) { r.Use( diff --git a/coderd/database/dbauthz/dbauthz.go b/coderd/database/dbauthz/dbauthz.go index d71dccb20baaf..038f4e0c92807 100644 --- a/coderd/database/dbauthz/dbauthz.go +++ b/coderd/database/dbauthz/dbauthz.go @@ -1643,8 +1643,8 @@ func (q *querier) GetWorkspaceAgentLogsAfter(ctx context.Context, arg database.G return q.db.GetWorkspaceAgentLogsAfter(ctx, arg) } -func (q *querier) GetWorkspaceAgentMetadata(ctx context.Context, workspaceAgentID uuid.UUID) ([]database.WorkspaceAgentMetadatum, error) { - workspace, err := q.db.GetWorkspaceByAgentID(ctx, workspaceAgentID) +func (q *querier) GetWorkspaceAgentMetadata(ctx context.Context, arg database.GetWorkspaceAgentMetadataParams) ([]database.WorkspaceAgentMetadatum, error) { + workspace, err := q.db.GetWorkspaceByAgentID(ctx, arg.WorkspaceAgentID) if err != nil { return nil, err } @@ -1654,7 +1654,7 @@ func (q *querier) GetWorkspaceAgentMetadata(ctx context.Context, workspaceAgentI return nil, err } - return q.db.GetWorkspaceAgentMetadata(ctx, workspaceAgentID) + return q.db.GetWorkspaceAgentMetadata(ctx, arg) } func (q *querier) GetWorkspaceAgentScriptsByAgentIDs(ctx context.Context, ids []uuid.UUID) ([]database.WorkspaceAgentScript, error) { diff --git a/coderd/database/dbfake/dbfake.go b/coderd/database/dbfake/dbfake.go index 17ab51f443977..4a2aeb62ef048 100644 --- a/coderd/database/dbfake/dbfake.go +++ b/coderd/database/dbfake/dbfake.go @@ -3518,13 +3518,20 @@ func (q *FakeQuerier) GetWorkspaceAgentLogsAfter(_ context.Context, arg database return logs, nil } -func (q *FakeQuerier) GetWorkspaceAgentMetadata(_ context.Context, workspaceAgentID uuid.UUID) ([]database.WorkspaceAgentMetadatum, error) { +func (q *FakeQuerier) GetWorkspaceAgentMetadata(_ context.Context, arg database.GetWorkspaceAgentMetadataParams) ([]database.WorkspaceAgentMetadatum, error) { + if err := validateDatabaseType(arg); err != nil { + return nil, err + } + q.mutex.RLock() defer q.mutex.RUnlock() metadata := make([]database.WorkspaceAgentMetadatum, 0) for _, m := range q.workspaceAgentMetadata { - if m.WorkspaceAgentID == workspaceAgentID { + if m.WorkspaceAgentID == arg.WorkspaceAgentID { + if len(arg.Keys) > 0 && !slices.Contains(arg.Keys, m.Key) { + continue + } metadata = append(metadata, m) } } @@ -6133,19 +6140,17 @@ func (q *FakeQuerier) UpdateWorkspaceAgentMetadata(_ context.Context, arg databa q.mutex.Lock() defer q.mutex.Unlock() - //nolint:gosimple - updated := database.WorkspaceAgentMetadatum{ - WorkspaceAgentID: arg.WorkspaceAgentID, - Key: arg.Key, - Value: arg.Value, - Error: arg.Error, - CollectedAt: arg.CollectedAt, - } - for i, m := range q.workspaceAgentMetadata { - if m.WorkspaceAgentID == arg.WorkspaceAgentID && m.Key == arg.Key { - q.workspaceAgentMetadata[i] = updated - return nil + if m.WorkspaceAgentID != arg.WorkspaceAgentID { + continue + } + for j := 0; j < len(arg.Key); j++ { + if m.Key == arg.Key[j] { + q.workspaceAgentMetadata[i].Value = arg.Value[j] + q.workspaceAgentMetadata[i].Error = arg.Error[j] + q.workspaceAgentMetadata[i].CollectedAt = arg.CollectedAt[j] + return nil + } } } diff --git a/coderd/database/dbmetrics/dbmetrics.go b/coderd/database/dbmetrics/dbmetrics.go index 1702b95513490..ece7020139b0f 100644 --- a/coderd/database/dbmetrics/dbmetrics.go +++ b/coderd/database/dbmetrics/dbmetrics.go @@ -900,7 +900,7 @@ func (m metricsStore) GetWorkspaceAgentLogsAfter(ctx context.Context, arg databa return r0, r1 } -func (m metricsStore) GetWorkspaceAgentMetadata(ctx context.Context, workspaceAgentID uuid.UUID) ([]database.WorkspaceAgentMetadatum, error) { +func (m metricsStore) GetWorkspaceAgentMetadata(ctx context.Context, workspaceAgentID database.GetWorkspaceAgentMetadataParams) ([]database.WorkspaceAgentMetadatum, error) { start := time.Now() metadata, err := m.s.GetWorkspaceAgentMetadata(ctx, workspaceAgentID) m.queryLatencies.WithLabelValues("GetWorkspaceAgentMetadata").Observe(time.Since(start).Seconds()) diff --git a/coderd/database/dbmock/dbmock.go b/coderd/database/dbmock/dbmock.go index 8a4c3a298efb5..31614be3ae919 100644 --- a/coderd/database/dbmock/dbmock.go +++ b/coderd/database/dbmock/dbmock.go @@ -1869,7 +1869,7 @@ func (mr *MockStoreMockRecorder) GetWorkspaceAgentLogsAfter(arg0, arg1 interface } // GetWorkspaceAgentMetadata mocks base method. -func (m *MockStore) GetWorkspaceAgentMetadata(arg0 context.Context, arg1 uuid.UUID) ([]database.WorkspaceAgentMetadatum, error) { +func (m *MockStore) GetWorkspaceAgentMetadata(arg0 context.Context, arg1 database.GetWorkspaceAgentMetadataParams) ([]database.WorkspaceAgentMetadatum, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetWorkspaceAgentMetadata", arg0, arg1) ret0, _ := ret[0].([]database.WorkspaceAgentMetadatum) diff --git a/coderd/database/querier.go b/coderd/database/querier.go index cbf954c287e39..99503ba40e3d6 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -192,7 +192,7 @@ type sqlcQuerier interface { GetWorkspaceAgentLifecycleStateByID(ctx context.Context, id uuid.UUID) (GetWorkspaceAgentLifecycleStateByIDRow, error) GetWorkspaceAgentLogSourcesByAgentIDs(ctx context.Context, ids []uuid.UUID) ([]WorkspaceAgentLogSource, error) GetWorkspaceAgentLogsAfter(ctx context.Context, arg GetWorkspaceAgentLogsAfterParams) ([]WorkspaceAgentLog, error) - GetWorkspaceAgentMetadata(ctx context.Context, workspaceAgentID uuid.UUID) ([]WorkspaceAgentMetadatum, error) + GetWorkspaceAgentMetadata(ctx context.Context, arg GetWorkspaceAgentMetadataParams) ([]WorkspaceAgentMetadatum, error) GetWorkspaceAgentScriptsByAgentIDs(ctx context.Context, ids []uuid.UUID) ([]WorkspaceAgentScript, error) GetWorkspaceAgentStats(ctx context.Context, createdAt time.Time) ([]GetWorkspaceAgentStatsRow, error) GetWorkspaceAgentStatsAndLabels(ctx context.Context, createdAt time.Time) ([]GetWorkspaceAgentStatsAndLabelsRow, error) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 15880f8dbe441..2d41e3e29e270 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -7318,10 +7318,16 @@ FROM workspace_agent_metadata WHERE workspace_agent_id = $1 + AND CASE WHEN COALESCE(array_length($2::text[], 1), 0) > 0 THEN key = ANY($2::text[]) ELSE TRUE END ` -func (q *sqlQuerier) GetWorkspaceAgentMetadata(ctx context.Context, workspaceAgentID uuid.UUID) ([]WorkspaceAgentMetadatum, error) { - rows, err := q.db.QueryContext(ctx, getWorkspaceAgentMetadata, workspaceAgentID) +type GetWorkspaceAgentMetadataParams struct { + WorkspaceAgentID uuid.UUID `db:"workspace_agent_id" json:"workspace_agent_id"` + Keys []string `db:"keys" json:"keys"` +} + +func (q *sqlQuerier) GetWorkspaceAgentMetadata(ctx context.Context, arg GetWorkspaceAgentMetadataParams) ([]WorkspaceAgentMetadatum, error) { + rows, err := q.db.QueryContext(ctx, getWorkspaceAgentMetadata, arg.WorkspaceAgentID, pq.Array(arg.Keys)) if err != nil { return nil, err } @@ -7880,32 +7886,41 @@ func (q *sqlQuerier) UpdateWorkspaceAgentLogOverflowByID(ctx context.Context, ar } const updateWorkspaceAgentMetadata = `-- name: UpdateWorkspaceAgentMetadata :exec +WITH metadata AS ( + SELECT + unnest($2::text[]) AS key, + unnest($3::text[]) AS value, + unnest($4::text[]) AS error, + unnest($5::timestamptz[]) AS collected_at +) UPDATE - workspace_agent_metadata + workspace_agent_metadata wam SET - value = $3, - error = $4, - collected_at = $5 + value = m.value, + error = m.error, + collected_at = m.collected_at +FROM + metadata m WHERE - workspace_agent_id = $1 - AND key = $2 + wam.workspace_agent_id = $1 + AND wam.key = m.key ` type UpdateWorkspaceAgentMetadataParams struct { - WorkspaceAgentID uuid.UUID `db:"workspace_agent_id" json:"workspace_agent_id"` - Key string `db:"key" json:"key"` - Value string `db:"value" json:"value"` - Error string `db:"error" json:"error"` - CollectedAt time.Time `db:"collected_at" json:"collected_at"` + WorkspaceAgentID uuid.UUID `db:"workspace_agent_id" json:"workspace_agent_id"` + Key []string `db:"key" json:"key"` + Value []string `db:"value" json:"value"` + Error []string `db:"error" json:"error"` + CollectedAt []time.Time `db:"collected_at" json:"collected_at"` } func (q *sqlQuerier) UpdateWorkspaceAgentMetadata(ctx context.Context, arg UpdateWorkspaceAgentMetadataParams) error { _, err := q.db.ExecContext(ctx, updateWorkspaceAgentMetadata, arg.WorkspaceAgentID, - arg.Key, - arg.Value, - arg.Error, - arg.CollectedAt, + pq.Array(arg.Key), + pq.Array(arg.Value), + pq.Array(arg.Error), + pq.Array(arg.CollectedAt), ) return err } diff --git a/coderd/database/queries/workspaceagents.sql b/coderd/database/queries/workspaceagents.sql index 0e9ec08152a69..9ecede976c372 100644 --- a/coderd/database/queries/workspaceagents.sql +++ b/coderd/database/queries/workspaceagents.sql @@ -108,15 +108,24 @@ VALUES ($1, $2, $3, $4, $5, $6); -- name: UpdateWorkspaceAgentMetadata :exec +WITH metadata AS ( + SELECT + unnest(sqlc.arg('key')::text[]) AS key, + unnest(sqlc.arg('value')::text[]) AS value, + unnest(sqlc.arg('error')::text[]) AS error, + unnest(sqlc.arg('collected_at')::timestamptz[]) AS collected_at +) UPDATE - workspace_agent_metadata + workspace_agent_metadata wam SET - value = $3, - error = $4, - collected_at = $5 + value = m.value, + error = m.error, + collected_at = m.collected_at +FROM + metadata m WHERE - workspace_agent_id = $1 - AND key = $2; + wam.workspace_agent_id = $1 + AND wam.key = m.key; -- name: GetWorkspaceAgentMetadata :many SELECT @@ -124,7 +133,8 @@ SELECT FROM workspace_agent_metadata WHERE - workspace_agent_id = $1; + workspace_agent_id = $1 + AND CASE WHEN COALESCE(array_length(sqlc.arg('keys')::text[], 1), 0) > 0 THEN key = ANY(sqlc.arg('keys')::text[]) ELSE TRUE END; -- name: UpdateWorkspaceAgentLogOverflowByID :exec UPDATE diff --git a/coderd/deprecated.go b/coderd/deprecated.go index 0b7b0b14a2762..3d37351efa3a9 100644 --- a/coderd/deprecated.go +++ b/coderd/deprecated.go @@ -3,7 +3,12 @@ package coderd import ( "net/http" + "github.com/go-chi/chi/v5" + + "cdr.dev/slog" "github.com/coder/coder/v2/coderd/httpapi" + "github.com/coder/coder/v2/coderd/httpmw" + "github.com/coder/coder/v2/codersdk/agentsdk" ) // @Summary Removed: Get parameters by template version @@ -70,3 +75,42 @@ func (api *API) workspaceAgentLogsDeprecated(rw http.ResponseWriter, r *http.Req func (api *API) workspaceAgentsGitAuth(rw http.ResponseWriter, r *http.Request) { api.workspaceAgentsExternalAuth(rw, r) } + +// @Summary Removed: Submit workspace agent metadata +// @ID removed-submit-workspace-agent-metadata +// @Security CoderSessionToken +// @Accept json +// @Tags Agents +// @Param request body agentsdk.PostMetadataRequestDeprecated true "Workspace agent metadata request" +// @Param key path string true "metadata key" format(string) +// @Success 204 "Success" +// @Router /workspaceagents/me/metadata/{key} [post] +// @x-apidocgen {"skip": true} +func (api *API) workspaceAgentPostMetadataDeprecated(rw http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var req agentsdk.PostMetadataRequestDeprecated + if !httpapi.Read(ctx, rw, r, &req) { + return + } + + workspaceAgent := httpmw.WorkspaceAgent(r) + + key := chi.URLParam(r, "key") + + err := api.workspaceAgentUpdateMetadata(ctx, workspaceAgent, agentsdk.PostMetadataRequest{ + Metadata: []agentsdk.Metadata{ + { + Key: key, + WorkspaceAgentMetadataResult: req, + }, + }, + }) + if err != nil { + api.Logger.Error(ctx, "failed to handle metadata request", slog.Error(err)) + httpapi.InternalServerError(rw, err) + return + } + + httpapi.Write(ctx, rw, http.StatusNoContent, nil) +} diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 3a6240e79e9f8..d6f3a44371e1a 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -16,11 +16,9 @@ import ( "sort" "strconv" "strings" - "sync" "sync/atomic" "time" - "github.com/go-chi/chi/v5" "github.com/google/uuid" "github.com/sqlc-dev/pqtype" "golang.org/x/exp/maps" @@ -181,7 +179,10 @@ func (api *API) workspaceAgentManifest(rw http.ResponseWriter, r *http.Request) return err }) eg.Go(func() (err error) { - metadata, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID) + metadata, err = api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{ + WorkspaceAgentID: workspaceAgent.ID, + Keys: nil, + }) return err }) eg.Go(func() (err error) { @@ -1723,10 +1724,9 @@ func ellipse(v string, n int) string { // @Security CoderSessionToken // @Accept json // @Tags Agents -// @Param request body agentsdk.PostMetadataRequest true "Workspace agent metadata request" -// @Param key path string true "metadata key" format(string) +// @Param request body []agentsdk.PostMetadataRequest true "Workspace agent metadata request" // @Success 204 "Success" -// @Router /workspaceagents/me/metadata/{key} [post] +// @Router /workspaceagents/me/metadata [post] // @x-apidocgen {"skip": true} func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -1738,17 +1738,18 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque workspaceAgent := httpmw.WorkspaceAgent(r) - workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID) + // Split into function to allow call by deprecated handler. + err := api.workspaceAgentUpdateMetadata(ctx, workspaceAgent, req) if err != nil { - httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ - Message: "Failed to get workspace.", - Detail: err.Error(), - }) + api.Logger.Error(ctx, "failed to handle metadata request", slog.Error(err)) + httpapi.InternalServerError(rw, err) return } - key := chi.URLParam(r, "key") + httpapi.Write(ctx, rw, http.StatusNoContent, nil) +} +func (api *API) workspaceAgentUpdateMetadata(ctx context.Context, workspaceAgent database.WorkspaceAgent, req agentsdk.PostMetadataRequest) error { const ( // maxValueLen is set to 2048 to stay under the 8000 byte Postgres // NOTIFY limit. Since both value and error can be set, the real @@ -1758,58 +1759,67 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque maxErrorLen = maxValueLen ) - metadataError := req.Error + collectedAt := time.Now() - // We overwrite the error if the provided payload is too long. - if len(req.Value) > maxValueLen { - metadataError = fmt.Sprintf("value of %d bytes exceeded %d bytes", len(req.Value), maxValueLen) - req.Value = req.Value[:maxValueLen] + datum := database.UpdateWorkspaceAgentMetadataParams{ + WorkspaceAgentID: workspaceAgent.ID, + Key: []string{}, + Value: []string{}, + Error: []string{}, + CollectedAt: []time.Time{}, } - if len(req.Error) > maxErrorLen { - metadataError = fmt.Sprintf("error of %d bytes exceeded %d bytes", len(req.Error), maxErrorLen) - req.Error = req.Error[:maxErrorLen] - } + for _, md := range req.Metadata { + metadataError := md.Error + + // We overwrite the error if the provided payload is too long. + if len(md.Value) > maxValueLen { + metadataError = fmt.Sprintf("value of %d bytes exceeded %d bytes", len(md.Value), maxValueLen) + md.Value = md.Value[:maxValueLen] + } + + if len(md.Error) > maxErrorLen { + metadataError = fmt.Sprintf("error of %d bytes exceeded %d bytes", len(md.Error), maxErrorLen) + md.Error = md.Error[:maxErrorLen] + } - datum := database.UpdateWorkspaceAgentMetadataParams{ - WorkspaceAgentID: workspaceAgent.ID, // We don't want a misconfigured agent to fill the database. - Key: key, - Value: req.Value, - Error: metadataError, + datum.Key = append(datum.Key, md.Key) + datum.Value = append(datum.Value, md.Value) + datum.Error = append(datum.Error, metadataError) // We ignore the CollectedAt from the agent to avoid bugs caused by // clock skew. - CollectedAt: time.Now(), + datum.CollectedAt = append(datum.CollectedAt, collectedAt) + + api.Logger.Debug( + ctx, "accepted metadata report", + slog.F("workspace_agent_id", workspaceAgent.ID), + slog.F("collected_at", collectedAt), + slog.F("original_collected_at", md.CollectedAt), + slog.F("key", md.Key), + slog.F("value", ellipse(md.Value, 16)), + ) } - err = api.Database.UpdateWorkspaceAgentMetadata(ctx, datum) + payload, err := json.Marshal(workspaceAgentMetadataChannelPayload{ + CollectedAt: collectedAt, + Keys: datum.Key, + }) if err != nil { - httpapi.InternalServerError(rw, err) - return + return err } - api.Logger.Debug( - ctx, "accepted metadata report", - slog.F("workspace_agent_id", workspaceAgent.ID), - slog.F("workspace_id", workspace.ID), - slog.F("collected_at", datum.CollectedAt), - slog.F("key", datum.Key), - slog.F("value", ellipse(datum.Value, 16)), - ) - - datumJSON, err := json.Marshal(datum) + err = api.Database.UpdateWorkspaceAgentMetadata(ctx, datum) if err != nil { - httpapi.InternalServerError(rw, err) - return + return err } - err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON) + err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), payload) if err != nil { - httpapi.InternalServerError(rw, err) - return + return err } - httpapi.Write(ctx, rw, http.StatusNoContent, nil) + return nil } // @Summary Watch for workspace agent metadata updates @@ -1829,34 +1839,37 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ ) ) - // We avoid channel-based synchronization here to avoid backpressure problems. - var ( - metadataMapMu sync.Mutex - metadataMap = make(map[string]database.WorkspaceAgentMetadatum) - // pendingChanges must only be mutated when metadataMapMu is held. - pendingChanges atomic.Bool - ) - // Send metadata on updates, we must ensure subscription before sending // initial metadata to guarantee that events in-between are not missed. + update := make(chan workspaceAgentMetadataChannelPayload, 1) cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) { - var update database.UpdateWorkspaceAgentMetadataParams - err := json.Unmarshal(byt, &update) + var payload workspaceAgentMetadataChannelPayload + err := json.Unmarshal(byt, &payload) if err != nil { api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err)) return } - log.Debug(ctx, "received metadata update", "key", update.Key) + log.Debug(ctx, "received metadata update", "payload", payload) - metadataMapMu.Lock() - defer metadataMapMu.Unlock() - md := metadataMap[update.Key] - md.Value = update.Value - md.Error = update.Error - md.CollectedAt = update.CollectedAt - metadataMap[update.Key] = md - pendingChanges.Store(true) + select { + case prev := <-update: + // This update wasn't consumed yet, merge the keys. + newKeysSet := make(map[string]struct{}) + for _, key := range payload.Keys { + newKeysSet[key] = struct{}{} + } + keys := prev.Keys + for _, key := range prev.Keys { + if _, ok := newKeysSet[key]; !ok { + keys = append(keys, key) + } + } + payload.Keys = keys + default: + } + // This can never block since we pop and merge beforehand. + update <- payload }) if err != nil { httpapi.InternalServerError(rw, err) @@ -1877,14 +1890,12 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ <-sseSenderClosed }() - // We send updates exactly every second. - const sendInterval = time.Second * 1 - sendTicker := time.NewTicker(sendInterval) - defer sendTicker.Stop() - // We always use the original Request context because it contains // the RBAC actor. - md, err := api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID) + md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{ + WorkspaceAgentID: workspaceAgent.ID, + Keys: nil, + }) if err != nil { // If we can't successfully pull the initial metadata, pubsub // updates will be no-op so we may as well terminate the @@ -1893,42 +1904,84 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ return } - metadataMapMu.Lock() + metadataMap := make(map[string]database.WorkspaceAgentMetadatum) for _, datum := range md { metadataMap[datum.Key] = datum } - metadataMapMu.Unlock() - - // Send initial metadata. var lastSend time.Time sendMetadata := func() { - metadataMapMu.Lock() - values := maps.Values(metadataMap) - pendingChanges.Store(false) - metadataMapMu.Unlock() - lastSend = time.Now() + values := maps.Values(metadataMap) _ = sseSendEvent(ctx, codersdk.ServerSentEvent{ Type: codersdk.ServerSentEventTypeData, Data: convertWorkspaceAgentMetadata(values), }) } + // We send updates exactly every second. + const sendInterval = time.Second * 1 + sendTicker := time.NewTicker(sendInterval) + defer sendTicker.Stop() + + // Send initial metadata. sendMetadata() + // Fetch updated metadata keys as they come in. + fetchedMetadata := make(chan []database.WorkspaceAgentMetadatum) + go func() { + defer close(fetchedMetadata) + + for { + select { + case <-sseSenderClosed: + return + case payload := <-update: + md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{ + WorkspaceAgentID: workspaceAgent.ID, + Keys: payload.Keys, + }) + if err != nil { + if !errors.Is(err, context.Canceled) { + log.Error(ctx, "failed to get metadata", slog.Error(err)) + } + return + } + select { + case <-sseSenderClosed: + return + // We want to block here to avoid constantly pinging the + // database when the metadata isn't being processed. + case fetchedMetadata <- md: + } + } + } + }() + + pendingChanges := true for { select { + case <-sseSenderClosed: + return + case md, ok := <-fetchedMetadata: + if !ok { + return + } + for _, datum := range md { + metadataMap[datum.Key] = datum + } + pendingChanges = true + continue case <-sendTicker.C: // We send an update even if there's no change every 5 seconds // to ensure that the frontend always has an accurate "Result.Age". - if !pendingChanges.Load() && time.Since(lastSend) < time.Second*5 { + if !pendingChanges && time.Since(lastSend) < 5*time.Second { continue } - sendMetadata() - case <-sseSenderClosed: - return + pendingChanges = false } + + sendMetadata() } } @@ -1959,6 +2012,11 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code return result } +type workspaceAgentMetadataChannelPayload struct { + CollectedAt time.Time `json:"collected_at"` + Keys []string `json:"keys"` +} + func watchWorkspaceAgentMetadataChannel(id uuid.UUID) string { return "workspace_agent_metadata:" + id.String() } diff --git a/coderd/wsconncache/wsconncache_test.go b/coderd/wsconncache/wsconncache_test.go index 68e41b17517fa..e429e90a38ecc 100644 --- a/coderd/wsconncache/wsconncache_test.go +++ b/coderd/wsconncache/wsconncache_test.go @@ -267,7 +267,7 @@ func (*client) PostAppHealth(_ context.Context, _ agentsdk.PostAppHealthsRequest return nil } -func (*client) PostMetadata(_ context.Context, _ string, _ agentsdk.PostMetadataRequest) error { +func (*client) PostMetadata(_ context.Context, _ string, _ agentsdk.PostMetadataRequestDeprecated) error { return nil } diff --git a/codersdk/agentsdk/agentsdk.go b/codersdk/agentsdk/agentsdk.go index d7d0a182d0357..370f325ce28ae 100644 --- a/codersdk/agentsdk/agentsdk.go +++ b/codersdk/agentsdk/agentsdk.go @@ -69,11 +69,20 @@ func (c *Client) GitSSHKey(ctx context.Context) (GitSSHKey, error) { return gitSSHKey, json.NewDecoder(res.Body).Decode(&gitSSHKey) } +type Metadata struct { + Key string `json:"key"` + codersdk.WorkspaceAgentMetadataResult +} + +type PostMetadataRequest struct { + Metadata []Metadata `json:"metadata"` +} + // In the future, we may want to support sending back multiple values for // performance. -type PostMetadataRequest = codersdk.WorkspaceAgentMetadataResult +type PostMetadataRequestDeprecated = codersdk.WorkspaceAgentMetadataResult -func (c *Client) PostMetadata(ctx context.Context, key string, req PostMetadataRequest) error { +func (c *Client) PostMetadata(ctx context.Context, key string, req PostMetadataRequestDeprecated) error { res, err := c.SDK.Request(ctx, http.MethodPost, "/api/v2/workspaceagents/me/metadata/"+key, req) if err != nil { return xerrors.Errorf("execute request: %w", err) diff --git a/docs/api/schemas.md b/docs/api/schemas.md index f90c37603eb10..7a2a6280b9838 100644 --- a/docs/api/schemas.md +++ b/docs/api/schemas.md @@ -317,6 +317,28 @@ | `scripts` | array of [codersdk.WorkspaceAgentScript](#codersdkworkspaceagentscript) | false | | | | `vscode_port_proxy_uri` | string | false | | | +## agentsdk.Metadata + +```json +{ + "age": 0, + "collected_at": "2019-08-24T14:15:22Z", + "error": "string", + "key": "string", + "value": "string" +} +``` + +### Properties + +| Name | Type | Required | Restrictions | Description | +| -------------- | ------- | -------- | ------------ | --------------------------------------------------------------------------------------------------------------------------------------- | +| `age` | integer | false | | Age is the number of seconds since the metadata was collected. It is provided in addition to CollectedAt to protect against clock skew. | +| `collected_at` | string | false | | | +| `error` | string | false | | | +| `key` | string | false | | | +| `value` | string | false | | | + ## agentsdk.PatchLogs ```json @@ -375,6 +397,28 @@ ## agentsdk.PostMetadataRequest +```json +{ + "metadata": [ + { + "age": 0, + "collected_at": "2019-08-24T14:15:22Z", + "error": "string", + "key": "string", + "value": "string" + } + ] +} +``` + +### Properties + +| Name | Type | Required | Restrictions | Description | +| ---------- | ----------------------------------------------- | -------- | ------------ | ----------- | +| `metadata` | array of [agentsdk.Metadata](#agentsdkmetadata) | false | | | + +## agentsdk.PostMetadataRequestDeprecated + ```json { "age": 0,