Skip to content

chore: add agentapi tests #11269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remaining tests
  • Loading branch information
deansheather committed Dec 19, 2023
commit 24f6614db4c8da30117df61e18cf4d45a3d91173
7 changes: 4 additions & 3 deletions coderd/agentapi/activitybump.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ func ActivityBumpWorkspace(ctx context.Context, log slog.Logger, db database.Sto
// low priority operations fail first.
ctx, cancel := context.WithTimeout(ctx, time.Second*15)
defer cancel()
if err := db.ActivityBumpWorkspace(ctx, database.ActivityBumpWorkspaceParams{
err := db.ActivityBumpWorkspace(ctx, database.ActivityBumpWorkspaceParams{
NextAutostart: nextAutostart.UTC(),
WorkspaceID: workspaceID,
}); err != nil {
})
if err != nil {
if !xerrors.Is(err, context.Canceled) && !database.IsQueryCanceledError(err) {
// Bump will fail if the context is canceled, but this is ok.
log.Error(ctx, "bump failed", slog.Error(err),
log.Error(ctx, "activity bump failed", slog.Error(err),
slog.F("workspace_id", workspaceID),
)
}
Expand Down
16 changes: 5 additions & 11 deletions coderd/agentapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/batchstats"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
Expand Down Expand Up @@ -58,7 +57,7 @@ type Options struct {
Pubsub pubsub.Pubsub
DerpMapFn func() *tailcfg.DERPMap
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsBatcher *batchstats.Batcher
StatsBatcher StatsBatcher // *batchstats.Batcher
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)

Expand Down Expand Up @@ -201,20 +200,15 @@ func (a *API) workspaceID(ctx context.Context, agent *database.WorkspaceAgent) (
agent = &agnt
}

resource, err := a.opts.Database.GetWorkspaceResourceByID(ctx, agent.ResourceID)
getWorkspaceAgentByIDRow, err := a.opts.Database.GetWorkspaceByAgentID(ctx, agent.ID)
if err != nil {
return uuid.Nil, xerrors.Errorf("get workspace agent resource by id %q: %w", agent.ResourceID, err)
}

build, err := a.opts.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
if err != nil {
return uuid.Nil, xerrors.Errorf("get workspace build by job id %q: %w", resource.JobID, err)
return uuid.Nil, xerrors.Errorf("get workspace by agent id %q: %w", agent.ID, err)
}

a.mu.Lock()
a.cachedWorkspaceID = build.WorkspaceID
a.cachedWorkspaceID = getWorkspaceAgentByIDRow.Workspace.ID
a.mu.Unlock()
return build.WorkspaceID, nil
return getWorkspaceAgentByIDRow.Workspace.ID, nil
}

func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.WorkspaceAgent) error {
Expand Down
75 changes: 55 additions & 20 deletions coderd/agentapi/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
)

Expand All @@ -20,14 +21,26 @@ type MetadataAPI struct {
Database database.Store
Pubsub pubsub.Pubsub
Log slog.Logger

TimeNowFn func() time.Time // defaults to dbtime.Now()
}

func (a *MetadataAPI) now() time.Time {
if a.TimeNowFn != nil {
return a.TimeNowFn()
}
return dbtime.Now()
}

func (a *MetadataAPI) BatchUpdateMetadata(ctx context.Context, req *agentproto.BatchUpdateMetadataRequest) (*agentproto.BatchUpdateMetadataResponse, error) {
const (
// maxValueLen is set to 2048 to stay under the 8000 byte Postgres
// NOTIFY limit. Since both value and error can be set, the real payload
// limit is 2 * 2048 * 4/3 <base64 expansion> = 5461 bytes + a few
// hundred bytes for JSON syntax, key names, and metadata.
// maxAllKeysLen is the maximum length of all metadata keys. This is
// 6144 to stay below the Postgres NOTIFY limit of 8000 bytes, with some
// headway for the timestamp and JSON encoding. Any values that would
// exceed this limit are discarded (the rest are still inserted) and an
// error is returned.
maxAllKeysLen = 6144 // 1024 * 6

maxValueLen = 2048
maxErrorLen = maxValueLen
)
Expand All @@ -37,18 +50,36 @@ func (a *MetadataAPI) BatchUpdateMetadata(ctx context.Context, req *agentproto.B
return nil, err
}

collectedAt := time.Now()
dbUpdate := database.UpdateWorkspaceAgentMetadataParams{
WorkspaceAgentID: workspaceAgent.ID,
Key: make([]string, 0, len(req.Metadata)),
Value: make([]string, 0, len(req.Metadata)),
Error: make([]string, 0, len(req.Metadata)),
CollectedAt: make([]time.Time, 0, len(req.Metadata)),
}

var (
collectedAt = a.now()
allKeysLen = 0
dbUpdate = database.UpdateWorkspaceAgentMetadataParams{
WorkspaceAgentID: workspaceAgent.ID,
// These need to be `make(x, 0, len(req.Metadata))` instead of
// `make(x, len(req.Metadata))` because we may not insert all
// metadata if the keys are large.
Key: make([]string, 0, len(req.Metadata)),
Value: make([]string, 0, len(req.Metadata)),
Error: make([]string, 0, len(req.Metadata)),
CollectedAt: make([]time.Time, 0, len(req.Metadata)),
}
)
for _, md := range req.Metadata {
metadataError := md.Result.Error

allKeysLen += len(md.Key)
if allKeysLen > maxAllKeysLen {
// We still insert the rest of the metadata, and we return an error
// after the insert.
a.Log.Warn(
ctx, "discarded extra agent metadata due to excessive key length",
slog.F("collected_at", collectedAt),
slog.F("all_keys_len", allKeysLen),
slog.F("max_all_keys_len", maxAllKeysLen),
)
break
}

// We overwrite the error if the provided payload is too long.
if len(md.Result.Value) > maxValueLen {
metadataError = fmt.Sprintf("value of %d bytes exceeded %d bytes", len(md.Result.Value), maxValueLen)
Expand All @@ -71,30 +102,34 @@ func (a *MetadataAPI) BatchUpdateMetadata(ctx context.Context, req *agentproto.B
a.Log.Debug(
ctx, "accepted metadata report",
slog.F("collected_at", collectedAt),
slog.F("original_collected_at", collectedAt),
slog.F("key", md.Key),
slog.F("value", ellipse(md.Result.Value, 16)),
)
}

err = a.Database.UpdateWorkspaceAgentMetadata(ctx, dbUpdate)
if err != nil {
return nil, xerrors.Errorf("update workspace agent metadata in database: %w", err)
}

payload, err := json.Marshal(WorkspaceAgentMetadataChannelPayload{
CollectedAt: collectedAt,
Keys: dbUpdate.Key,
})
if err != nil {
return nil, xerrors.Errorf("marshal workspace agent metadata channel payload: %w", err)
}

err = a.Database.UpdateWorkspaceAgentMetadata(ctx, dbUpdate)
if err != nil {
return nil, xerrors.Errorf("update workspace agent metadata in database: %w", err)
}

err = a.Pubsub.Publish(WatchWorkspaceAgentMetadataChannel(workspaceAgent.ID), payload)
if err != nil {
return nil, xerrors.Errorf("publish workspace agent metadata: %w", err)
}

// If the metadata keys were too large, we return an error so the agent can
// log it.
if allKeysLen > maxAllKeysLen {
return nil, xerrors.Errorf("metadata keys of %d bytes exceeded %d bytes", allKeysLen, maxAllKeysLen)
}

return &agentproto.BatchUpdateMetadataResponse{}, nil
}

Expand Down
Loading