From 1e35c53c692ffd9cc733de65aa0fd23250a1b401 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Wed, 19 Feb 2025 16:05:09 -0500 Subject: [PATCH 01/18] fix(coderd/database): remove linux build tags from db package --- coderd/database/db_test.go | 2 -- coderd/database/dbtestutil/postgres_test.go | 2 -- coderd/database/migrations/migrate_test.go | 2 -- coderd/database/pubsub/pubsub_linux_test.go | 2 -- coderd/database/querier_test.go | 2 -- 5 files changed, 10 deletions(-) diff --git a/coderd/database/db_test.go b/coderd/database/db_test.go index b4580527c843a..68b60a788fd3d 100644 --- a/coderd/database/db_test.go +++ b/coderd/database/db_test.go @@ -1,5 +1,3 @@ -//go:build linux - package database_test import ( diff --git a/coderd/database/dbtestutil/postgres_test.go b/coderd/database/dbtestutil/postgres_test.go index d4aaacdf909d8..49e791a4c735e 100644 --- a/coderd/database/dbtestutil/postgres_test.go +++ b/coderd/database/dbtestutil/postgres_test.go @@ -1,5 +1,3 @@ -//go:build linux - package dbtestutil_test import ( diff --git a/coderd/database/migrations/migrate_test.go b/coderd/database/migrations/migrate_test.go index 716ebe398b6d7..bd347af0be1ea 100644 --- a/coderd/database/migrations/migrate_test.go +++ b/coderd/database/migrations/migrate_test.go @@ -1,5 +1,3 @@ -//go:build linux - package migrations_test import ( diff --git a/coderd/database/pubsub/pubsub_linux_test.go b/coderd/database/pubsub/pubsub_linux_test.go index fe7933c62caee..05bd76232e162 100644 --- a/coderd/database/pubsub/pubsub_linux_test.go +++ b/coderd/database/pubsub/pubsub_linux_test.go @@ -1,5 +1,3 @@ -//go:build linux - package pubsub_test import ( diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index 00b189967f5a6..2897a88b880ff 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -1,5 +1,3 @@ -//go:build linux - package database_test import ( From 5c49221ba6a50dd212d5cb2a0fc7c0245a21cca5 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Thu, 20 Feb 2025 10:55:32 -0500 Subject: [PATCH 02/18] fix(coderd/database): consider tag sets when calculating queue position --- coderd/database/querier.go | 2 + coderd/database/querier_test.go | 229 +++++++++++++++++++- coderd/database/queries.sql.go | 93 +++++--- coderd/database/queries/provisionerjobs.sql | 85 +++++--- 4 files changed, 347 insertions(+), 62 deletions(-) diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 42b88d855e4c3..37ce0bbfbd96d 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -214,7 +214,9 @@ type sqlcQuerier interface { GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (ProvisionerJob, error) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]ProvisionerJobTiming, error) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]ProvisionerJob, error) + // Step 5: Final SELECT with INNER JOIN provisioner_jobs GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]GetProvisionerJobsByIDsWithQueuePositionRow, error) + // Preserve original ID order from upstream GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) GetProvisionerJobsCreatedAfter(ctx context.Context, createdAt time.Time) ([]ProvisionerJob, error) GetProvisionerKeyByHashedSecret(ctx context.Context, hashedSecret []byte) (ProvisionerKey, error) diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index 2897a88b880ff..e3b68653581f7 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -2162,11 +2162,125 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { t.SkipNow() } + now := dbtime.Now() + ctx := testutil.Context(t, testutil.WaitShort) + + testCases := []struct { + name string + jobTags []database.StringMap + daemonTags []database.StringMap + queueSizes []int64 + queuePositions []int64 + }{ + { + name: "test-case-1", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + }, + queueSizes: []int64{2, 2, 0}, + queuePositions: []int64{1, 1, 0}, + }, + { + name: "test-case-2", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3, 3, 3}, + queuePositions: []int64{1, 1, 3}, + }, + } + + for _, tc := range testCases { + db, _ := dbtestutil.NewDB(t) + + // Create provisioner jobs based on provided tags: + allJobs := make([]database.ProvisionerJob, len(tc.jobTags)) + for idx, tags := range tc.jobTags { + // Make sure jobs are stored in correct order, first job should have the earliest createdAt timestamp. + // Example for 3 jobs: + // job_1 createdAt: now - 3 minutes + // job_2 createdAt: now - 2 minutes + // job_3 createdAt: now - 1 minute + timeOffsetInMinutes := len(tc.jobTags) - idx + timeOffset := time.Duration(timeOffsetInMinutes) * time.Minute + createdAt := now.Add(-timeOffset) + + allJobs[idx] = dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: createdAt, + Tags: tags, + }) + } + + // Create provisioner daemons based on provided tags: + for idx, tags := range tc.daemonTags { + dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ + Name: fmt.Sprintf("prov_%v", idx), + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + Tags: tags, + }) + } + + // Assert invariant: the jobs are in pending status + for idx, job := range allJobs { + require.Equal(t, database.ProvisionerJobStatusPending, job.JobStatus, "expected job %d to have status %s", idx, database.ProvisionerJobStatusPending) + } + + var jobIDs []uuid.UUID + for _, job := range allJobs { + jobIDs = append(jobIDs, job.ID) + } + + // When: we fetch the jobs by their IDs + actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs) + require.NoError(t, err) + require.Len(t, actualJobs, len(allJobs), "should return all jobs") + + // Then: the jobs should be returned in the correct order (by IDs in the input slice) + for idx, job := range actualJobs { + assert.EqualValues(t, allJobs[idx], job.ProvisionerJob) + } + + // Then: the queue size should be set correctly + var queueSizes []int64 + for _, job := range actualJobs { + queueSizes = append(queueSizes, job.QueueSize) + } + assert.EqualValues(t, tc.queueSizes, queueSizes, "expected queue positions to be set correctly") + + // Then: the queue position should be set correctly: + var queuePositions []int64 + for _, job := range actualJobs { + queuePositions = append(queuePositions, job.QueuePosition) + } + assert.EqualValues(t, tc.queuePositions, queuePositions, "expected queue positions to be set correctly") + } +} + +func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.SkipNow() + } + db, _ := dbtestutil.NewDB(t) now := dbtime.Now() ctx := testutil.Context(t, testutil.WaitShort) - // Given the following provisioner jobs: + defaultTags := database.StringMap{"a": "1"} + // Create the following provisioner jobs: allJobs := []database.ProvisionerJob{ // Pending. This will be the last in the queue because // it was created most recently. @@ -2176,6 +2290,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, + Tags: defaultTags, }), // Another pending. This will come first in the queue @@ -2186,6 +2301,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, + Tags: defaultTags, }), // Running @@ -2195,6 +2311,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, + Tags: defaultTags, }), // Succeeded @@ -2204,6 +2321,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{Valid: true, Time: now}, Error: sql.NullString{}, + Tags: defaultTags, }), // Canceling @@ -2213,6 +2331,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{Valid: true, Time: now}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, + Tags: defaultTags, }), // Canceled @@ -2222,6 +2341,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{Valid: true, Time: now}, CompletedAt: sql.NullTime{Valid: true, Time: now}, Error: sql.NullString{}, + Tags: defaultTags, }), // Failed @@ -2231,9 +2351,17 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{String: "failed", Valid: true}, + Tags: defaultTags, }), } + // Create default provisioner daemon: + dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ + Name: "default_provisioner", + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + Tags: defaultTags, + }) + // Assert invariant: the jobs are in the expected order require.Len(t, allJobs, 7, "expected 7 jobs") for idx, status := range []database.ProvisionerJobStatus{ @@ -2264,9 +2392,11 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { } // Then: the queue size should be set correctly + var queueSizes []int64 for _, job := range actualJobs { - assert.EqualValues(t, job.QueueSize, 2, "should have queue size 2") + queueSizes = append(queueSizes, job.QueueSize) } + assert.EqualValues(t, []int64{2, 2, 0, 0, 0, 0, 0}, queueSizes, "expected queue positions to be set correctly") // Then: the queue position should be set correctly: var queuePositions []int64 @@ -2276,6 +2406,101 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { assert.EqualValues(t, []int64{2, 1, 0, 0, 0, 0, 0}, queuePositions, "expected queue positions to be set correctly") } +func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.SkipNow() + } + + db, _ := dbtestutil.NewDB(t) + now := dbtime.Now() + ctx := testutil.Context(t, testutil.WaitShort) + + defaultTags := database.StringMap{"a": "1"} + // Create the following provisioner jobs: + allJobs := []database.ProvisionerJob{ + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-4 * time.Minute), + Tags: defaultTags, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-5 * time.Minute), + Tags: defaultTags, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-6 * time.Minute), + Tags: defaultTags, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-3 * time.Minute), + Tags: defaultTags, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-2 * time.Minute), + Tags: defaultTags, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-1 * time.Minute), + Tags: defaultTags, + }), + } + + // Create default provisioner daemon: + dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ + Name: "default_provisioner", + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + Tags: defaultTags, + }) + + // Assert invariant: the jobs are in the expected order + require.Len(t, allJobs, 6, "expected 7 jobs") + for idx, status := range []database.ProvisionerJobStatus{ + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + } { + require.Equal(t, status, allJobs[idx].JobStatus, "expected job %d to have status %s", idx, status) + } + + var jobIDs []uuid.UUID + for _, job := range allJobs { + jobIDs = append(jobIDs, job.ID) + } + + // When: we fetch the jobs by their IDs + actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs) + require.NoError(t, err) + require.Len(t, actualJobs, len(allJobs), "should return all jobs") + + // Then: the jobs should be returned in the correct order (by IDs in the input slice) + for idx, job := range actualJobs { + assert.EqualValues(t, allJobs[idx], job.ProvisionerJob) + assert.EqualValues(t, allJobs[idx].CreatedAt, job.ProvisionerJob.CreatedAt) + } + + // Then: the queue size should be set correctly + var queueSizes []int64 + for _, job := range actualJobs { + queueSizes = append(queueSizes, job.QueueSize) + } + assert.EqualValues(t, []int64{6, 6, 6, 6, 6, 6}, queueSizes, "expected queue positions to be set correctly") + + // Then: the queue position should be set correctly: + var queuePositions []int64 + for _, job := range actualJobs { + queuePositions = append(queuePositions, job.QueuePosition) + } + assert.EqualValues(t, []int64{3, 2, 1, 4, 5, 6}, queuePositions, "expected queue positions to be set correctly") +} + func TestGroupRemovalTrigger(t *testing.T) { t.Parallel() diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 58722dc152005..1e35296fb45c1 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -6360,50 +6360,78 @@ func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUI } const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByIDsWithQueuePosition :many -WITH pending_jobs AS ( - SELECT - id, created_at - FROM - provisioner_jobs - WHERE - started_at IS NULL - AND - canceled_at IS NULL - AND - completed_at IS NULL - AND - error IS NULL +WITH filtered_provisioner_jobs AS ( + -- Step 1: Filter provisioner_jobs and assign an order from upstream system + SELECT + id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status, + ROW_NUMBER() OVER () AS ordinality -- Track original order + FROM + provisioner_jobs + WHERE + id = ANY($1 :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOINs ), -queue_position AS ( - SELECT - id, - ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position - FROM - pending_jobs +pending_jobs AS ( + -- Step 2: Extract only pending jobs from the already filtered dataset + SELECT id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status, ordinality + FROM filtered_provisioner_jobs + WHERE started_at IS NULL + AND canceled_at IS NULL + AND completed_at IS NULL + AND error IS NULL ), -queue_size AS ( - SELECT COUNT(*) AS count FROM pending_jobs +ranked_jobs AS ( + -- Step 3: Rank only pending jobs based on provisioner availability + SELECT + pj.id, + pj.created_at, + ROW_NUMBER() OVER (PARTITION BY pd.id ORDER BY pj.created_at ASC) AS queue_position, + COUNT(*) OVER (PARTITION BY pd.id) AS queue_size + FROM + pending_jobs pj + INNER JOIN provisioner_daemons pd + ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small filtered pending set +), +final_jobs AS ( + -- Step 4: Compute best queue position and max queue size per job + SELECT + fpj.id, + fpj.created_at, + COALESCE(MIN(rj.queue_position), 0) :: BIGINT AS queue_position, -- Best queue position across provisioners + COALESCE(MAX(rj.queue_size), 0) :: BIGINT AS queue_size, -- Max queue size across provisioners + fpj.ordinality -- Preserve original order + FROM + filtered_provisioner_jobs fpj -- Use the pre-filtered dataset instead of full provisioner_jobs + LEFT JOIN ranked_jobs rj + ON fpj.id = rj.id -- Ensure we only keep jobs that have a ranking + GROUP BY + fpj.id, fpj.created_at, fpj.ordinality -- Include ` + "`" + `ordinality` + "`" + ` in GROUP BY ) SELECT + fj.id, + fj.created_at, pj.id, pj.created_at, pj.updated_at, pj.started_at, pj.canceled_at, pj.completed_at, pj.error, pj.organization_id, pj.initiator_id, pj.provisioner, pj.storage_method, pj.type, pj.input, pj.worker_id, pj.file_id, pj.tags, pj.error_code, pj.trace_metadata, pj.job_status, - COALESCE(qp.queue_position, 0) AS queue_position, - COALESCE(qs.count, 0) AS queue_size + fj.queue_position, + fj.queue_size, + fj.ordinality FROM - provisioner_jobs pj -LEFT JOIN - queue_position qp ON qp.id = pj.id -LEFT JOIN - queue_size qs ON TRUE -WHERE - pj.id = ANY($1 :: uuid [ ]) + final_jobs fj + INNER JOIN provisioner_jobs pj + ON fj.id = pj.id -- Ensure we retrieve full details from ` + "`" + `provisioner_jobs` + "`" + `. + -- JOIN with pj is required for sqlc.embed(pj) to compile successfully. +ORDER BY + fj.ordinality ` type GetProvisionerJobsByIDsWithQueuePositionRow struct { + ID uuid.UUID `db:"id" json:"id"` + CreatedAt time.Time `db:"created_at" json:"created_at"` ProvisionerJob ProvisionerJob `db:"provisioner_job" json:"provisioner_job"` QueuePosition int64 `db:"queue_position" json:"queue_position"` QueueSize int64 `db:"queue_size" json:"queue_size"` + Ordinality int64 `db:"ordinality" json:"ordinality"` } +// Step 5: Final SELECT with INNER JOIN provisioner_jobs func (q *sqlQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]GetProvisionerJobsByIDsWithQueuePositionRow, error) { rows, err := q.db.QueryContext(ctx, getProvisionerJobsByIDsWithQueuePosition, pq.Array(ids)) if err != nil { @@ -6414,6 +6442,8 @@ func (q *sqlQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Contex for rows.Next() { var i GetProvisionerJobsByIDsWithQueuePositionRow if err := rows.Scan( + &i.ID, + &i.CreatedAt, &i.ProvisionerJob.ID, &i.ProvisionerJob.CreatedAt, &i.ProvisionerJob.UpdatedAt, @@ -6435,6 +6465,7 @@ func (q *sqlQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Contex &i.ProvisionerJob.JobStatus, &i.QueuePosition, &i.QueueSize, + &i.Ordinality, ); err != nil { return nil, err } @@ -6450,6 +6481,7 @@ func (q *sqlQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Contex } const getProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner = `-- name: GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner :many + WITH pending_jobs AS ( SELECT id, created_at @@ -6569,6 +6601,7 @@ type GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow WorkspaceName string `db:"workspace_name" json:"workspace_name"` } +// Preserve original ID order from upstream func (q *sqlQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) { rows, err := q.db.QueryContext(ctx, getProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner, arg.OrganizationID, diff --git a/coderd/database/queries/provisionerjobs.sql b/coderd/database/queries/provisionerjobs.sql index 592b228af2806..5ababf8726d90 100644 --- a/coderd/database/queries/provisionerjobs.sql +++ b/coderd/database/queries/provisionerjobs.sql @@ -50,42 +50,67 @@ WHERE id = ANY(@ids :: uuid [ ]); -- name: GetProvisionerJobsByIDsWithQueuePosition :many -WITH pending_jobs AS ( - SELECT - id, created_at - FROM - provisioner_jobs - WHERE - started_at IS NULL - AND - canceled_at IS NULL - AND - completed_at IS NULL - AND - error IS NULL +WITH filtered_provisioner_jobs AS ( + -- Step 1: Filter provisioner_jobs and assign an order from upstream system + SELECT + *, + ROW_NUMBER() OVER () AS ordinality -- Track original order + FROM + provisioner_jobs + WHERE + id = ANY(@ids :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOINs ), -queue_position AS ( - SELECT - id, - ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position - FROM - pending_jobs +pending_jobs AS ( + -- Step 2: Extract only pending jobs from the already filtered dataset + SELECT * + FROM filtered_provisioner_jobs + WHERE started_at IS NULL + AND canceled_at IS NULL + AND completed_at IS NULL + AND error IS NULL ), -queue_size AS ( - SELECT COUNT(*) AS count FROM pending_jobs +ranked_jobs AS ( + -- Step 3: Rank only pending jobs based on provisioner availability + SELECT + pj.id, + pj.created_at, + ROW_NUMBER() OVER (PARTITION BY pd.id ORDER BY pj.created_at ASC) AS queue_position, + COUNT(*) OVER (PARTITION BY pd.id) AS queue_size + FROM + pending_jobs pj + INNER JOIN provisioner_daemons pd + ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small filtered pending set +), +final_jobs AS ( + -- Step 4: Compute best queue position and max queue size per job + SELECT + fpj.id, + fpj.created_at, + COALESCE(MIN(rj.queue_position), 0) :: BIGINT AS queue_position, -- Best queue position across provisioners + COALESCE(MAX(rj.queue_size), 0) :: BIGINT AS queue_size, -- Max queue size across provisioners + fpj.ordinality -- Preserve original order + FROM + filtered_provisioner_jobs fpj -- Use the pre-filtered dataset instead of full provisioner_jobs + LEFT JOIN ranked_jobs rj + ON fpj.id = rj.id -- Ensure we only keep jobs that have a ranking + GROUP BY + fpj.id, fpj.created_at, fpj.ordinality -- Include `ordinality` in GROUP BY ) +-- Step 5: Final SELECT with INNER JOIN provisioner_jobs SELECT + fj.id, + fj.created_at, sqlc.embed(pj), - COALESCE(qp.queue_position, 0) AS queue_position, - COALESCE(qs.count, 0) AS queue_size + fj.queue_position, + fj.queue_size, + fj.ordinality FROM - provisioner_jobs pj -LEFT JOIN - queue_position qp ON qp.id = pj.id -LEFT JOIN - queue_size qs ON TRUE -WHERE - pj.id = ANY(@ids :: uuid [ ]); + final_jobs fj + INNER JOIN provisioner_jobs pj + ON fj.id = pj.id -- Ensure we retrieve full details from `provisioner_jobs`. + -- JOIN with pj is required for sqlc.embed(pj) to compile successfully. +ORDER BY + fj.ordinality; -- Preserve original ID order from upstream -- name: GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner :many WITH pending_jobs AS ( From 4257ca02569b138d61875465df27ae100701ccfc Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Tue, 25 Feb 2025 11:13:31 -0500 Subject: [PATCH 03/18] test(coderd/database): skip tests when PostgreSQL is unavailable --- coderd/database/dbtestutil/postgres_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/coderd/database/dbtestutil/postgres_test.go b/coderd/database/dbtestutil/postgres_test.go index 49e791a4c735e..f1b9336d57b37 100644 --- a/coderd/database/dbtestutil/postgres_test.go +++ b/coderd/database/dbtestutil/postgres_test.go @@ -19,6 +19,9 @@ func TestMain(m *testing.M) { func TestOpen(t *testing.T) { t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("this test requires postgres") + } connect, err := dbtestutil.Open(t) require.NoError(t, err) @@ -33,6 +36,9 @@ func TestOpen(t *testing.T) { func TestOpen_InvalidDBFrom(t *testing.T) { t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("this test requires postgres") + } _, err := dbtestutil.Open(t, dbtestutil.WithDBFrom("__invalid__")) require.Error(t, err) @@ -42,6 +48,9 @@ func TestOpen_InvalidDBFrom(t *testing.T) { func TestOpen_ValidDBFrom(t *testing.T) { t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("this test requires postgres") + } // first check if we can create a new template db dsn, err := dbtestutil.Open(t, dbtestutil.WithDBFrom("")) From 30f007c364e019cfa01a6bee02b67c9bb42f627f Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Tue, 25 Feb 2025 13:02:59 -0500 Subject: [PATCH 04/18] test(coderd/database): create default provisioner in tests --- coderd/database/querier_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index e3b68653581f7..1305673d3baea 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -1256,6 +1256,15 @@ func TestQueuePosition(t *testing.T) { time.Sleep(time.Millisecond) } + // Create default provisioner daemon: + dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ + Name: "default_provisioner", + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + // Ensure the `tags` field is NOT NULL for the default provisioner; + // otherwise, it won't be able to pick up any jobs. + Tags: database.StringMap{"a": "1"}, + }) + queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs) require.NoError(t, err) require.Len(t, queued, jobCount) From acb93ac99771bd7b04fe30c35cddecbbf248f184 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Wed, 26 Feb 2025 14:33:19 -0500 Subject: [PATCH 05/18] fix(coderd/database): correctly calculate pending and ranked jobs --- coderd/database/querier.go | 2 - coderd/database/querier_test.go | 122 ++++++++++++++++++-- coderd/database/queries.sql.go | 29 ++--- coderd/database/queries/provisionerjobs.sql | 23 ++-- 4 files changed, 131 insertions(+), 45 deletions(-) diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 37ce0bbfbd96d..42b88d855e4c3 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -214,9 +214,7 @@ type sqlcQuerier interface { GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (ProvisionerJob, error) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]ProvisionerJobTiming, error) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]ProvisionerJob, error) - // Step 5: Final SELECT with INNER JOIN provisioner_jobs GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]GetProvisionerJobsByIDsWithQueuePositionRow, error) - // Preserve original ID order from upstream GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) GetProvisionerJobsCreatedAfter(ctx context.Context, createdAt time.Time) ([]ProvisionerJob, error) GetProvisionerKeyByHashedSecret(ctx context.Context, hashedSecret []byte) (ProvisionerKey, error) diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index 1305673d3baea..fdca9daab8940 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -2180,6 +2180,11 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { daemonTags []database.StringMap queueSizes []int64 queuePositions []int64 + // GetProvisionerJobsByIDsWithQueuePosition takes jobIDs as a parameter. + // If skipJobIDs is empty, all jobs are passed to the function; otherwise, the specified jobs are skipped. + // NOTE: Skipping job IDs means they will be excluded from the result, + // but this should not affect the queue position or queue size of other jobs. + skipJobIDs map[int]struct{} }{ { name: "test-case-1", @@ -2195,6 +2200,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { queueSizes: []int64{2, 2, 0}, queuePositions: []int64{1, 1, 0}, }, + // Similar to the previous case, but includes an additional provisioner. { name: "test-case-2", jobTags: []database.StringMap{ @@ -2210,6 +2216,83 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { queueSizes: []int64{3, 3, 3}, queuePositions: []int64{1, 1, 3}, }, + // Similar to the previous case, but skips job at index 0 + { + name: "test-case-3", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3, 3}, + queuePositions: []int64{1, 3}, + skipJobIDs: map[int]struct{}{ + 0: {}, + }, + }, + // Skips job at index 1 + { + name: "test-case-4", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3, 3}, + queuePositions: []int64{1, 3}, + skipJobIDs: map[int]struct{}{ + 1: {}, + }, + }, + // Skips job at index 2 + { + name: "test-case-5", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3, 3}, + queuePositions: []int64{1, 1}, + skipJobIDs: map[int]struct{}{ + 2: {}, + }, + }, + // Skips jobs at indexes 0 and 2 + { + name: "test-case-6", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3}, + queuePositions: []int64{1}, + skipJobIDs: map[int]struct{}{ + 0: {}, + 2: {}, + }, + }, } for _, tc := range testCases { @@ -2247,19 +2330,28 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { require.Equal(t, database.ProvisionerJobStatusPending, job.JobStatus, "expected job %d to have status %s", idx, database.ProvisionerJobStatusPending) } - var jobIDs []uuid.UUID - for _, job := range allJobs { - jobIDs = append(jobIDs, job.ID) + filteredJobs := make([]database.ProvisionerJob, 0) + filteredJobIDs := make([]uuid.UUID, 0) + for idx, job := range allJobs { + if _, skip := tc.skipJobIDs[idx]; skip { + continue + } + + filteredJobs = append(filteredJobs, job) + filteredJobIDs = append(filteredJobIDs, job.ID) } // When: we fetch the jobs by their IDs - actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs) + actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, filteredJobIDs) require.NoError(t, err) - require.Len(t, actualJobs, len(allJobs), "should return all jobs") + require.Len(t, actualJobs, len(filteredJobs), "should return all unskipped jobs") - // Then: the jobs should be returned in the correct order (by IDs in the input slice) + // Then: the jobs should be returned in the correct order (sorted by createdAt) + sort.Slice(filteredJobs, func(i, j int) bool { + return filteredJobs[i].CreatedAt.Before(filteredJobs[j].CreatedAt) + }) for idx, job := range actualJobs { - assert.EqualValues(t, allJobs[idx], job.ProvisionerJob) + assert.EqualValues(t, filteredJobs[idx], job.ProvisionerJob) } // Then: the queue size should be set correctly @@ -2395,7 +2487,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { require.NoError(t, err) require.Len(t, actualJobs, len(allJobs), "should return all jobs") - // Then: the jobs should be returned in the correct order (by IDs in the input slice) + // Then: the jobs should be returned in the correct order (sorted by createdAt) + sort.Slice(allJobs, func(i, j int) bool { + return allJobs[i].CreatedAt.Before(allJobs[j].CreatedAt) + }) for idx, job := range actualJobs { assert.EqualValues(t, allJobs[idx], job.ProvisionerJob) } @@ -2405,14 +2500,14 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { for _, job := range actualJobs { queueSizes = append(queueSizes, job.QueueSize) } - assert.EqualValues(t, []int64{2, 2, 0, 0, 0, 0, 0}, queueSizes, "expected queue positions to be set correctly") + assert.EqualValues(t, []int64{0, 0, 0, 0, 0, 2, 2}, queueSizes, "expected queue positions to be set correctly") // Then: the queue position should be set correctly: var queuePositions []int64 for _, job := range actualJobs { queuePositions = append(queuePositions, job.QueuePosition) } - assert.EqualValues(t, []int64{2, 1, 0, 0, 0, 0, 0}, queuePositions, "expected queue positions to be set correctly") + assert.EqualValues(t, []int64{0, 0, 0, 0, 0, 1, 2}, queuePositions, "expected queue positions to be set correctly") } func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) { @@ -2489,7 +2584,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) require.NoError(t, err) require.Len(t, actualJobs, len(allJobs), "should return all jobs") - // Then: the jobs should be returned in the correct order (by IDs in the input slice) + // Then: the jobs should be returned in the correct order (sorted by createdAt) + sort.Slice(allJobs, func(i, j int) bool { + return allJobs[i].CreatedAt.Before(allJobs[j].CreatedAt) + }) for idx, job := range actualJobs { assert.EqualValues(t, allJobs[idx], job.ProvisionerJob) assert.EqualValues(t, allJobs[idx].CreatedAt, job.ProvisionerJob.CreatedAt) @@ -2507,7 +2605,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) for _, job := range actualJobs { queuePositions = append(queuePositions, job.QueuePosition) } - assert.EqualValues(t, []int64{3, 2, 1, 4, 5, 6}, queuePositions, "expected queue positions to be set correctly") + assert.EqualValues(t, []int64{1, 2, 3, 4, 5, 6}, queuePositions, "expected queue positions to be set correctly") } func TestGroupRemovalTrigger(t *testing.T) { diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 1e35296fb45c1..d848ff90a09c5 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -6361,19 +6361,18 @@ func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUI const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByIDsWithQueuePosition :many WITH filtered_provisioner_jobs AS ( - -- Step 1: Filter provisioner_jobs and assign an order from upstream system + -- Step 1: Filter provisioner_jobs SELECT - id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status, - ROW_NUMBER() OVER () AS ordinality -- Track original order + id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status FROM provisioner_jobs WHERE id = ANY($1 :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOINs ), pending_jobs AS ( - -- Step 2: Extract only pending jobs from the already filtered dataset - SELECT id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status, ordinality - FROM filtered_provisioner_jobs + -- Step 2: Extract only pending jobs + SELECT id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status + FROM provisioner_jobs WHERE started_at IS NULL AND canceled_at IS NULL AND completed_at IS NULL @@ -6389,7 +6388,7 @@ ranked_jobs AS ( FROM pending_jobs pj INNER JOIN provisioner_daemons pd - ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small filtered pending set + ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small pending set ), final_jobs AS ( -- Step 4: Compute best queue position and max queue size per job @@ -6397,29 +6396,28 @@ final_jobs AS ( fpj.id, fpj.created_at, COALESCE(MIN(rj.queue_position), 0) :: BIGINT AS queue_position, -- Best queue position across provisioners - COALESCE(MAX(rj.queue_size), 0) :: BIGINT AS queue_size, -- Max queue size across provisioners - fpj.ordinality -- Preserve original order + COALESCE(MAX(rj.queue_size), 0) :: BIGINT AS queue_size -- Max queue size across provisioners FROM filtered_provisioner_jobs fpj -- Use the pre-filtered dataset instead of full provisioner_jobs LEFT JOIN ranked_jobs rj ON fpj.id = rj.id -- Ensure we only keep jobs that have a ranking GROUP BY - fpj.id, fpj.created_at, fpj.ordinality -- Include ` + "`" + `ordinality` + "`" + ` in GROUP BY + fpj.id, fpj.created_at ) SELECT + -- Step 5: Final SELECT with INNER JOIN provisioner_jobs fj.id, fj.created_at, pj.id, pj.created_at, pj.updated_at, pj.started_at, pj.canceled_at, pj.completed_at, pj.error, pj.organization_id, pj.initiator_id, pj.provisioner, pj.storage_method, pj.type, pj.input, pj.worker_id, pj.file_id, pj.tags, pj.error_code, pj.trace_metadata, pj.job_status, fj.queue_position, - fj.queue_size, - fj.ordinality + fj.queue_size FROM final_jobs fj INNER JOIN provisioner_jobs pj ON fj.id = pj.id -- Ensure we retrieve full details from ` + "`" + `provisioner_jobs` + "`" + `. -- JOIN with pj is required for sqlc.embed(pj) to compile successfully. ORDER BY - fj.ordinality + fj.created_at ` type GetProvisionerJobsByIDsWithQueuePositionRow struct { @@ -6428,10 +6426,8 @@ type GetProvisionerJobsByIDsWithQueuePositionRow struct { ProvisionerJob ProvisionerJob `db:"provisioner_job" json:"provisioner_job"` QueuePosition int64 `db:"queue_position" json:"queue_position"` QueueSize int64 `db:"queue_size" json:"queue_size"` - Ordinality int64 `db:"ordinality" json:"ordinality"` } -// Step 5: Final SELECT with INNER JOIN provisioner_jobs func (q *sqlQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]GetProvisionerJobsByIDsWithQueuePositionRow, error) { rows, err := q.db.QueryContext(ctx, getProvisionerJobsByIDsWithQueuePosition, pq.Array(ids)) if err != nil { @@ -6465,7 +6461,6 @@ func (q *sqlQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Contex &i.ProvisionerJob.JobStatus, &i.QueuePosition, &i.QueueSize, - &i.Ordinality, ); err != nil { return nil, err } @@ -6481,7 +6476,6 @@ func (q *sqlQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Contex } const getProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner = `-- name: GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner :many - WITH pending_jobs AS ( SELECT id, created_at @@ -6601,7 +6595,6 @@ type GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow WorkspaceName string `db:"workspace_name" json:"workspace_name"` } -// Preserve original ID order from upstream func (q *sqlQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) { rows, err := q.db.QueryContext(ctx, getProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner, arg.OrganizationID, diff --git a/coderd/database/queries/provisionerjobs.sql b/coderd/database/queries/provisionerjobs.sql index 5ababf8726d90..ff7615f7e9dae 100644 --- a/coderd/database/queries/provisionerjobs.sql +++ b/coderd/database/queries/provisionerjobs.sql @@ -51,19 +51,18 @@ WHERE -- name: GetProvisionerJobsByIDsWithQueuePosition :many WITH filtered_provisioner_jobs AS ( - -- Step 1: Filter provisioner_jobs and assign an order from upstream system + -- Step 1: Filter provisioner_jobs SELECT - *, - ROW_NUMBER() OVER () AS ordinality -- Track original order + * FROM provisioner_jobs WHERE id = ANY(@ids :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOINs ), pending_jobs AS ( - -- Step 2: Extract only pending jobs from the already filtered dataset + -- Step 2: Extract only pending jobs SELECT * - FROM filtered_provisioner_jobs + FROM provisioner_jobs WHERE started_at IS NULL AND canceled_at IS NULL AND completed_at IS NULL @@ -79,7 +78,7 @@ ranked_jobs AS ( FROM pending_jobs pj INNER JOIN provisioner_daemons pd - ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small filtered pending set + ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small pending set ), final_jobs AS ( -- Step 4: Compute best queue position and max queue size per job @@ -87,30 +86,28 @@ final_jobs AS ( fpj.id, fpj.created_at, COALESCE(MIN(rj.queue_position), 0) :: BIGINT AS queue_position, -- Best queue position across provisioners - COALESCE(MAX(rj.queue_size), 0) :: BIGINT AS queue_size, -- Max queue size across provisioners - fpj.ordinality -- Preserve original order + COALESCE(MAX(rj.queue_size), 0) :: BIGINT AS queue_size -- Max queue size across provisioners FROM filtered_provisioner_jobs fpj -- Use the pre-filtered dataset instead of full provisioner_jobs LEFT JOIN ranked_jobs rj ON fpj.id = rj.id -- Ensure we only keep jobs that have a ranking GROUP BY - fpj.id, fpj.created_at, fpj.ordinality -- Include `ordinality` in GROUP BY + fpj.id, fpj.created_at ) --- Step 5: Final SELECT with INNER JOIN provisioner_jobs SELECT + -- Step 5: Final SELECT with INNER JOIN provisioner_jobs fj.id, fj.created_at, sqlc.embed(pj), fj.queue_position, - fj.queue_size, - fj.ordinality + fj.queue_size FROM final_jobs fj INNER JOIN provisioner_jobs pj ON fj.id = pj.id -- Ensure we retrieve full details from `provisioner_jobs`. -- JOIN with pj is required for sqlc.embed(pj) to compile successfully. ORDER BY - fj.ordinality; -- Preserve original ID order from upstream + fj.created_at; -- name: GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner :many WITH pending_jobs AS ( From 5aa0ffa2c758a878425e307655619aec5c327dbc Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Wed, 26 Feb 2025 17:53:26 -0500 Subject: [PATCH 06/18] test(coderd/database): improve test coverage --- coderd/database/querier_test.go | 45 +++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index fdca9daab8940..58e91c885d5e9 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -2186,6 +2186,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { // but this should not affect the queue position or queue size of other jobs. skipJobIDs map[int]struct{} }{ + // Baseline test case { name: "test-case-1", jobTags: []database.StringMap{ @@ -2200,7 +2201,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { queueSizes: []int64{2, 2, 0}, queuePositions: []int64{1, 1, 0}, }, - // Similar to the previous case, but includes an additional provisioner. + // Includes an additional provisioner { name: "test-case-2", jobTags: []database.StringMap{ @@ -2216,7 +2217,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { queueSizes: []int64{3, 3, 3}, queuePositions: []int64{1, 1, 3}, }, - // Similar to the previous case, but skips job at index 0 + // Skips job at index 0 { name: "test-case-3", jobTags: []database.StringMap{ @@ -2293,6 +2294,46 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { 2: {}, }, }, + // Includes two additional jobs that any provisioner can execute. + { + name: "test-case-7", + jobTags: []database.StringMap{ + {}, + {}, + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{5, 5, 5, 5, 5}, + queuePositions: []int64{1, 2, 3, 3, 5}, + }, + // Includes two additional jobs that any provisioner can execute, but they are intentionally skipped. + { + name: "test-case-8", + jobTags: []database.StringMap{ + {}, + {}, + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{5, 5, 5}, + queuePositions: []int64{3, 3, 5}, + skipJobIDs: map[int]struct{}{ + 0: {}, + 1: {}, + }, + }, } for _, tc := range testCases { From c1f421dc990c096d4f586e5c5082e4666db0bc32 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Wed, 26 Feb 2025 18:37:55 -0500 Subject: [PATCH 07/18] refactor(coderd/database): use empty tag sets --- coderd/database/querier_test.go | 38 +++++++++++++++++---------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index 58e91c885d5e9..7dfd955176de5 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -1262,7 +1262,7 @@ func TestQueuePosition(t *testing.T) { Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, // Ensure the `tags` field is NOT NULL for the default provisioner; // otherwise, it won't be able to pick up any jobs. - Tags: database.StringMap{"a": "1"}, + Tags: database.StringMap{}, }) queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs) @@ -2421,7 +2421,6 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { now := dbtime.Now() ctx := testutil.Context(t, testutil.WaitShort) - defaultTags := database.StringMap{"a": "1"} // Create the following provisioner jobs: allJobs := []database.ProvisionerJob{ // Pending. This will be the last in the queue because @@ -2432,7 +2431,9 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, - Tags: defaultTags, + // Ensure the `tags` field is NOT NULL for both provisioner jobs and provisioner daemons; + // otherwise, provisioner daemons won't be able to pick up any jobs. + Tags: database.StringMap{}, }), // Another pending. This will come first in the queue @@ -2443,7 +2444,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, - Tags: defaultTags, + Tags: database.StringMap{}, }), // Running @@ -2453,7 +2454,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, - Tags: defaultTags, + Tags: database.StringMap{}, }), // Succeeded @@ -2463,7 +2464,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{Valid: true, Time: now}, Error: sql.NullString{}, - Tags: defaultTags, + Tags: database.StringMap{}, }), // Canceling @@ -2473,7 +2474,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { CanceledAt: sql.NullTime{Valid: true, Time: now}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, - Tags: defaultTags, + Tags: database.StringMap{}, }), // Canceled @@ -2483,7 +2484,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { CanceledAt: sql.NullTime{Valid: true, Time: now}, CompletedAt: sql.NullTime{Valid: true, Time: now}, Error: sql.NullString{}, - Tags: defaultTags, + Tags: database.StringMap{}, }), // Failed @@ -2493,7 +2494,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{String: "failed", Valid: true}, - Tags: defaultTags, + Tags: database.StringMap{}, }), } @@ -2501,7 +2502,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ Name: "default_provisioner", Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, - Tags: defaultTags, + Tags: database.StringMap{}, }) // Assert invariant: the jobs are in the expected order @@ -2561,37 +2562,38 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) now := dbtime.Now() ctx := testutil.Context(t, testutil.WaitShort) - defaultTags := database.StringMap{"a": "1"} // Create the following provisioner jobs: allJobs := []database.ProvisionerJob{ dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ CreatedAt: now.Add(-4 * time.Minute), - Tags: defaultTags, + // Ensure the `tags` field is NOT NULL for both provisioner jobs and provisioner daemons; + // otherwise, provisioner daemons won't be able to pick up any jobs. + Tags: database.StringMap{}, }), dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ CreatedAt: now.Add(-5 * time.Minute), - Tags: defaultTags, + Tags: database.StringMap{}, }), dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ CreatedAt: now.Add(-6 * time.Minute), - Tags: defaultTags, + Tags: database.StringMap{}, }), dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ CreatedAt: now.Add(-3 * time.Minute), - Tags: defaultTags, + Tags: database.StringMap{}, }), dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ CreatedAt: now.Add(-2 * time.Minute), - Tags: defaultTags, + Tags: database.StringMap{}, }), dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ CreatedAt: now.Add(-1 * time.Minute), - Tags: defaultTags, + Tags: database.StringMap{}, }), } @@ -2599,7 +2601,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ Name: "default_provisioner", Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, - Tags: defaultTags, + Tags: database.StringMap{}, }) // Assert invariant: the jobs are in the expected order From e7693eeca6733ca404f0649870ba096926fe45f8 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Thu, 27 Feb 2025 09:12:23 -0500 Subject: [PATCH 08/18] test(coderd/database): improve test coverage --- coderd/database/querier_test.go | 54 +++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index 7dfd955176de5..081435d218164 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -2334,6 +2334,60 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { 1: {}, }, }, + // N jobs (1 job with 0 tags) & 0 provisioners exist + { + name: "test-case-9", + jobTags: []database.StringMap{ + {}, + {"a": "1"}, + {"b": "2"}, + }, + daemonTags: []database.StringMap{}, + queueSizes: []int64{0, 0, 0}, + queuePositions: []int64{0, 0, 0}, + }, + // N jobs (1 job with 0 tags) & N provisioners + { + name: "test-case-10", + jobTags: []database.StringMap{ + {}, + {"a": "1"}, + {"b": "2"}, + }, + daemonTags: []database.StringMap{ + {}, + {"a": "1"}, + {"b": "2"}, + }, + queueSizes: []int64{2, 2, 2}, + queuePositions: []int64{1, 2, 2}, + }, + // (N + 1) jobs (1 job with 0 tags) & N provisioners + // 1 job not matching any provisioner (first in the list) + { + name: "test-case-11", + jobTags: []database.StringMap{ + {"c": "3"}, + {}, + {"a": "1"}, + {"b": "2"}, + }, + daemonTags: []database.StringMap{ + {}, + {"a": "1"}, + {"b": "2"}, + }, + queueSizes: []int64{0, 2, 2, 2}, + queuePositions: []int64{0, 1, 2, 2}, + }, + // 0 jobs & 0 provisioners + { + name: "test-case-12", + jobTags: []database.StringMap{}, + daemonTags: []database.StringMap{}, + queueSizes: nil, // TODO(yevhenii): should it be empty array instead? + queuePositions: nil, + }, } for _, tc := range testCases { From a9ed7d27aee6509b4248f3c5564e2f27a70caee1 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Thu, 27 Feb 2025 10:38:24 -0500 Subject: [PATCH 09/18] test(coderd/database): wrap test cases in subtests --- coderd/database/querier_test.go | 128 ++++++++++++++++---------------- 1 file changed, 66 insertions(+), 62 deletions(-) diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index 081435d218164..d9ef814f0b175 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -2391,77 +2391,81 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { } for _, tc := range testCases { - db, _ := dbtestutil.NewDB(t) + tc := tc // Capture loop variable to avoid data races + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + db, _ := dbtestutil.NewDB(t) + + // Create provisioner jobs based on provided tags: + allJobs := make([]database.ProvisionerJob, len(tc.jobTags)) + for idx, tags := range tc.jobTags { + // Make sure jobs are stored in correct order, first job should have the earliest createdAt timestamp. + // Example for 3 jobs: + // job_1 createdAt: now - 3 minutes + // job_2 createdAt: now - 2 minutes + // job_3 createdAt: now - 1 minute + timeOffsetInMinutes := len(tc.jobTags) - idx + timeOffset := time.Duration(timeOffsetInMinutes) * time.Minute + createdAt := now.Add(-timeOffset) + + allJobs[idx] = dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: createdAt, + Tags: tags, + }) + } - // Create provisioner jobs based on provided tags: - allJobs := make([]database.ProvisionerJob, len(tc.jobTags)) - for idx, tags := range tc.jobTags { - // Make sure jobs are stored in correct order, first job should have the earliest createdAt timestamp. - // Example for 3 jobs: - // job_1 createdAt: now - 3 minutes - // job_2 createdAt: now - 2 minutes - // job_3 createdAt: now - 1 minute - timeOffsetInMinutes := len(tc.jobTags) - idx - timeOffset := time.Duration(timeOffsetInMinutes) * time.Minute - createdAt := now.Add(-timeOffset) - - allJobs[idx] = dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ - CreatedAt: createdAt, - Tags: tags, - }) - } + // Create provisioner daemons based on provided tags: + for idx, tags := range tc.daemonTags { + dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ + Name: fmt.Sprintf("prov_%v", idx), + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + Tags: tags, + }) + } - // Create provisioner daemons based on provided tags: - for idx, tags := range tc.daemonTags { - dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ - Name: fmt.Sprintf("prov_%v", idx), - Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, - Tags: tags, - }) - } + // Assert invariant: the jobs are in pending status + for idx, job := range allJobs { + require.Equal(t, database.ProvisionerJobStatusPending, job.JobStatus, "expected job %d to have status %s", idx, database.ProvisionerJobStatusPending) + } - // Assert invariant: the jobs are in pending status - for idx, job := range allJobs { - require.Equal(t, database.ProvisionerJobStatusPending, job.JobStatus, "expected job %d to have status %s", idx, database.ProvisionerJobStatusPending) - } + filteredJobs := make([]database.ProvisionerJob, 0) + filteredJobIDs := make([]uuid.UUID, 0) + for idx, job := range allJobs { + if _, skip := tc.skipJobIDs[idx]; skip { + continue + } - filteredJobs := make([]database.ProvisionerJob, 0) - filteredJobIDs := make([]uuid.UUID, 0) - for idx, job := range allJobs { - if _, skip := tc.skipJobIDs[idx]; skip { - continue + filteredJobs = append(filteredJobs, job) + filteredJobIDs = append(filteredJobIDs, job.ID) } - filteredJobs = append(filteredJobs, job) - filteredJobIDs = append(filteredJobIDs, job.ID) - } - - // When: we fetch the jobs by their IDs - actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, filteredJobIDs) - require.NoError(t, err) - require.Len(t, actualJobs, len(filteredJobs), "should return all unskipped jobs") + // When: we fetch the jobs by their IDs + actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, filteredJobIDs) + require.NoError(t, err) + require.Len(t, actualJobs, len(filteredJobs), "should return all unskipped jobs") - // Then: the jobs should be returned in the correct order (sorted by createdAt) - sort.Slice(filteredJobs, func(i, j int) bool { - return filteredJobs[i].CreatedAt.Before(filteredJobs[j].CreatedAt) - }) - for idx, job := range actualJobs { - assert.EqualValues(t, filteredJobs[idx], job.ProvisionerJob) - } + // Then: the jobs should be returned in the correct order (sorted by createdAt) + sort.Slice(filteredJobs, func(i, j int) bool { + return filteredJobs[i].CreatedAt.Before(filteredJobs[j].CreatedAt) + }) + for idx, job := range actualJobs { + assert.EqualValues(t, filteredJobs[idx], job.ProvisionerJob) + } - // Then: the queue size should be set correctly - var queueSizes []int64 - for _, job := range actualJobs { - queueSizes = append(queueSizes, job.QueueSize) - } - assert.EqualValues(t, tc.queueSizes, queueSizes, "expected queue positions to be set correctly") + // Then: the queue size should be set correctly + var queueSizes []int64 + for _, job := range actualJobs { + queueSizes = append(queueSizes, job.QueueSize) + } + assert.EqualValues(t, tc.queueSizes, queueSizes, "expected queue positions to be set correctly") - // Then: the queue position should be set correctly: - var queuePositions []int64 - for _, job := range actualJobs { - queuePositions = append(queuePositions, job.QueuePosition) - } - assert.EqualValues(t, tc.queuePositions, queuePositions, "expected queue positions to be set correctly") + // Then: the queue position should be set correctly: + var queuePositions []int64 + for _, job := range actualJobs { + queuePositions = append(queuePositions, job.QueuePosition) + } + assert.EqualValues(t, tc.queuePositions, queuePositions, "expected queue positions to be set correctly") + }) } } From 574cdf645bf964fd783e9d2c959d89b7ce8f4d38 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Thu, 27 Feb 2025 10:47:34 -0500 Subject: [PATCH 10/18] refactor(coderd/database): minor refactoring in tests --- coderd/database/querier_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index d9ef814f0b175..c3ab29034c410 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -2166,10 +2166,10 @@ func TestExpectOne(t *testing.T) { } func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { - t.Parallel() if !dbtestutil.WillUsePostgres() { t.SkipNow() } + t.Parallel() now := dbtime.Now() ctx := testutil.Context(t, testutil.WaitShort) @@ -2470,10 +2470,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { } func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { - t.Parallel() if !dbtestutil.WillUsePostgres() { t.SkipNow() } + t.Parallel() db, _ := dbtestutil.NewDB(t) now := dbtime.Now() @@ -2611,10 +2611,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { } func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) { - t.Parallel() if !dbtestutil.WillUsePostgres() { t.SkipNow() } + t.Parallel() db, _ := dbtestutil.NewDB(t) now := dbtime.Now() From 8b3446eee4371b7a5c580e139282a911e39864cb Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Thu, 27 Feb 2025 11:35:46 -0500 Subject: [PATCH 11/18] fix(coderd/database): linter --- coderd/database/querier_test.go | 6 +++--- coderd/database/queries.sql.go | 4 ++-- coderd/database/queries/provisionerjobs.sql | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index c3ab29034c410..d9ef814f0b175 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -2166,10 +2166,10 @@ func TestExpectOne(t *testing.T) { } func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { + t.Parallel() if !dbtestutil.WillUsePostgres() { t.SkipNow() } - t.Parallel() now := dbtime.Now() ctx := testutil.Context(t, testutil.WaitShort) @@ -2470,10 +2470,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { } func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { + t.Parallel() if !dbtestutil.WillUsePostgres() { t.SkipNow() } - t.Parallel() db, _ := dbtestutil.NewDB(t) now := dbtime.Now() @@ -2611,10 +2611,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { } func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) { + t.Parallel() if !dbtestutil.WillUsePostgres() { t.SkipNow() } - t.Parallel() db, _ := dbtestutil.NewDB(t) now := dbtime.Now() diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index d848ff90a09c5..4f5625a0782dc 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -6367,7 +6367,7 @@ WITH filtered_provisioner_jobs AS ( FROM provisioner_jobs WHERE - id = ANY($1 :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOINs + id = ANY($1 :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOIN ), pending_jobs AS ( -- Step 2: Extract only pending jobs @@ -6400,7 +6400,7 @@ final_jobs AS ( FROM filtered_provisioner_jobs fpj -- Use the pre-filtered dataset instead of full provisioner_jobs LEFT JOIN ranked_jobs rj - ON fpj.id = rj.id -- Ensure we only keep jobs that have a ranking + ON fpj.id = rj.id -- Join with the ranking jobs CTE to assign a rank to each specified provisioner job. GROUP BY fpj.id, fpj.created_at ) diff --git a/coderd/database/queries/provisionerjobs.sql b/coderd/database/queries/provisionerjobs.sql index ff7615f7e9dae..d6d3c1e6ac943 100644 --- a/coderd/database/queries/provisionerjobs.sql +++ b/coderd/database/queries/provisionerjobs.sql @@ -57,7 +57,7 @@ WITH filtered_provisioner_jobs AS ( FROM provisioner_jobs WHERE - id = ANY(@ids :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOINs + id = ANY(@ids :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOIN ), pending_jobs AS ( -- Step 2: Extract only pending jobs @@ -90,7 +90,7 @@ final_jobs AS ( FROM filtered_provisioner_jobs fpj -- Use the pre-filtered dataset instead of full provisioner_jobs LEFT JOIN ranked_jobs rj - ON fpj.id = rj.id -- Ensure we only keep jobs that have a ranking + ON fpj.id = rj.id -- Join with the ranking jobs CTE to assign a rank to each specified provisioner job. GROUP BY fpj.id, fpj.created_at ) From c4316fd82e6cca17dea153d3acd298d4692e9243 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Thu, 27 Feb 2025 12:40:55 -0500 Subject: [PATCH 12/18] refactor(coderd/database): use job_status generated column --- coderd/database/queries.sql.go | 5 +---- coderd/database/queries/provisionerjobs.sql | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 4f5625a0782dc..b38cde2a09848 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -6373,10 +6373,7 @@ pending_jobs AS ( -- Step 2: Extract only pending jobs SELECT id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status FROM provisioner_jobs - WHERE started_at IS NULL - AND canceled_at IS NULL - AND completed_at IS NULL - AND error IS NULL + WHERE job_status = 'pending' ), ranked_jobs AS ( -- Step 3: Rank only pending jobs based on provisioner availability diff --git a/coderd/database/queries/provisionerjobs.sql b/coderd/database/queries/provisionerjobs.sql index d6d3c1e6ac943..16ea10a1e6293 100644 --- a/coderd/database/queries/provisionerjobs.sql +++ b/coderd/database/queries/provisionerjobs.sql @@ -63,10 +63,7 @@ pending_jobs AS ( -- Step 2: Extract only pending jobs SELECT * FROM provisioner_jobs - WHERE started_at IS NULL - AND canceled_at IS NULL - AND completed_at IS NULL - AND error IS NULL + WHERE job_status = 'pending' ), ranked_jobs AS ( -- Step 3: Rank only pending jobs based on provisioner availability From b4ea4db2c1cd04989e232c1498a27933e5279692 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Thu, 27 Feb 2025 16:05:20 -0500 Subject: [PATCH 13/18] perf: add index on provisioner_jobs table for status field --- .../migrations/000296_provisioner_jobs_status_idx.down.sql | 1 + .../migrations/000296_provisioner_jobs_status_idx.up.sql | 1 + 2 files changed, 2 insertions(+) create mode 100644 coderd/database/migrations/000296_provisioner_jobs_status_idx.down.sql create mode 100644 coderd/database/migrations/000296_provisioner_jobs_status_idx.up.sql diff --git a/coderd/database/migrations/000296_provisioner_jobs_status_idx.down.sql b/coderd/database/migrations/000296_provisioner_jobs_status_idx.down.sql new file mode 100644 index 0000000000000..e7e976e0e25f0 --- /dev/null +++ b/coderd/database/migrations/000296_provisioner_jobs_status_idx.down.sql @@ -0,0 +1 @@ +DROP INDEX idx_provisioner_jobs_status; diff --git a/coderd/database/migrations/000296_provisioner_jobs_status_idx.up.sql b/coderd/database/migrations/000296_provisioner_jobs_status_idx.up.sql new file mode 100644 index 0000000000000..8a1375232430e --- /dev/null +++ b/coderd/database/migrations/000296_provisioner_jobs_status_idx.up.sql @@ -0,0 +1 @@ +CREATE INDEX idx_provisioner_jobs_status ON provisioner_jobs USING btree (job_status); From 1f8e4e3781ad534005ab6342c45a8651a77fa41c Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Fri, 28 Feb 2025 08:46:07 -0500 Subject: [PATCH 14/18] fix(coderd/database): increment migration number --- ...s_idx.down.sql => 000297_provisioner_jobs_status_idx.down.sql} | 0 ...tatus_idx.up.sql => 000297_provisioner_jobs_status_idx.up.sql} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename coderd/database/migrations/{000296_provisioner_jobs_status_idx.down.sql => 000297_provisioner_jobs_status_idx.down.sql} (100%) rename coderd/database/migrations/{000296_provisioner_jobs_status_idx.up.sql => 000297_provisioner_jobs_status_idx.up.sql} (100%) diff --git a/coderd/database/migrations/000296_provisioner_jobs_status_idx.down.sql b/coderd/database/migrations/000297_provisioner_jobs_status_idx.down.sql similarity index 100% rename from coderd/database/migrations/000296_provisioner_jobs_status_idx.down.sql rename to coderd/database/migrations/000297_provisioner_jobs_status_idx.down.sql diff --git a/coderd/database/migrations/000296_provisioner_jobs_status_idx.up.sql b/coderd/database/migrations/000297_provisioner_jobs_status_idx.up.sql similarity index 100% rename from coderd/database/migrations/000296_provisioner_jobs_status_idx.up.sql rename to coderd/database/migrations/000297_provisioner_jobs_status_idx.up.sql From f9c9711f77d05dac80154bf00930c87f6d8ceb5a Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Fri, 28 Feb 2025 09:39:15 -0500 Subject: [PATCH 15/18] fix(coderd/database): update dump.sql --- coderd/database/dump.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index e05d3a06d31f5..c6f858d7cd1dd 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -2290,6 +2290,8 @@ CREATE UNIQUE INDEX idx_provisioner_daemons_org_name_owner_key ON provisioner_da COMMENT ON INDEX idx_provisioner_daemons_org_name_owner_key IS 'Allow unique provisioner daemon names by organization and user'; +CREATE INDEX idx_provisioner_jobs_status ON provisioner_jobs USING btree (job_status); + CREATE INDEX idx_tailnet_agents_coordinator ON tailnet_agents USING btree (coordinator_id); CREATE INDEX idx_tailnet_clients_coordinator ON tailnet_clients USING btree (coordinator_id); From 61a9f58e788d2d9e4d7737b01d24755cb3e43ab9 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Fri, 28 Feb 2025 10:02:22 -0500 Subject: [PATCH 16/18] perf: optimize get-queue-position sql query --- coderd/database/queries.sql.go | 11 +++++++---- coderd/database/queries/provisionerjobs.sql | 11 +++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 0c860741dee9a..50a7c5bd909e2 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -6393,7 +6393,7 @@ const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByI WITH filtered_provisioner_jobs AS ( -- Step 1: Filter provisioner_jobs SELECT - id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status + id, created_at FROM provisioner_jobs WHERE @@ -6401,9 +6401,12 @@ WITH filtered_provisioner_jobs AS ( ), pending_jobs AS ( -- Step 2: Extract only pending jobs - SELECT id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status - FROM provisioner_jobs - WHERE job_status = 'pending' + SELECT + id, created_at, tags + FROM + provisioner_jobs + WHERE + job_status = 'pending' ), ranked_jobs AS ( -- Step 3: Rank only pending jobs based on provisioner availability diff --git a/coderd/database/queries/provisionerjobs.sql b/coderd/database/queries/provisionerjobs.sql index 16ea10a1e6293..2d544aedb9bd8 100644 --- a/coderd/database/queries/provisionerjobs.sql +++ b/coderd/database/queries/provisionerjobs.sql @@ -53,7 +53,7 @@ WHERE WITH filtered_provisioner_jobs AS ( -- Step 1: Filter provisioner_jobs SELECT - * + id, created_at FROM provisioner_jobs WHERE @@ -61,9 +61,12 @@ WITH filtered_provisioner_jobs AS ( ), pending_jobs AS ( -- Step 2: Extract only pending jobs - SELECT * - FROM provisioner_jobs - WHERE job_status = 'pending' + SELECT + id, created_at, tags + FROM + provisioner_jobs + WHERE + job_status = 'pending' ), ranked_jobs AS ( -- Step 3: Rank only pending jobs based on provisioner availability From 4f77f67a4c18228777f1e1f7e10ec9a91d64b071 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Fri, 28 Feb 2025 15:39:48 -0500 Subject: [PATCH 17/18] chore: update dbmem --- coderd/database/dbmem/dbmem.go | 118 +++++++++++++++++++++++++++++++- coderd/database/querier_test.go | 6 -- 2 files changed, 115 insertions(+), 9 deletions(-) diff --git a/coderd/database/dbmem/dbmem.go b/coderd/database/dbmem/dbmem.go index 6fbafa562d087..fe344e3dfd66c 100644 --- a/coderd/database/dbmem/dbmem.go +++ b/coderd/database/dbmem/dbmem.go @@ -1147,7 +1147,119 @@ func getOwnerFromTags(tags map[string]string) string { return "" } -func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLocked(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) { +// provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags +func provisionerTagsetContains(daemonTags, jobTags map[string]string) bool { + for jobKey, jobValue := range jobTags { + if daemonValue, exists := daemonTags[jobKey]; !exists || daemonValue != jobValue { + return false + } + } + return true +} + +// GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go +func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(_ context.Context, jobIDs []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) { + // Step 1: Filter provisionerJobs based on jobIDs + filteredJobs := make(map[uuid.UUID]database.ProvisionerJob) + for _, job := range q.provisionerJobs { + for _, id := range jobIDs { + if job.ID == id { + filteredJobs[job.ID] = job + } + } + } + + // Step 2: Identify pending jobs + pendingJobs := make(map[uuid.UUID]database.ProvisionerJob) + for _, job := range q.provisionerJobs { + if job.JobStatus == "pending" { + pendingJobs[job.ID] = job + } + } + + // Step 3: Identify pending jobs that have a matching provisioner + matchedJobs := make(map[uuid.UUID]struct{}) + for _, job := range pendingJobs { + for _, daemon := range q.provisionerDaemons { + if provisionerTagsetContains(daemon.Tags, job.Tags) { + matchedJobs[job.ID] = struct{}{} + break + } + } + } + + // Step 4: Rank pending jobs per provisioner + jobRanks := make(map[uuid.UUID][]database.ProvisionerJob) + for _, job := range pendingJobs { + for _, daemon := range q.provisionerDaemons { + if provisionerTagsetContains(daemon.Tags, job.Tags) { + jobRanks[daemon.ID] = append(jobRanks[daemon.ID], job) + } + } + } + + // Sort jobs per provisioner by CreatedAt + for daemonID := range jobRanks { + sort.Slice(jobRanks[daemonID], func(i, j int) bool { + return jobRanks[daemonID][i].CreatedAt.Before(jobRanks[daemonID][j].CreatedAt) + }) + } + + // Step 5: Compute queue position & max queue size across all provisioners + jobQueueStats := make(map[uuid.UUID]database.GetProvisionerJobsByIDsWithQueuePositionRow) + for _, jobs := range jobRanks { + queueSize := int64(len(jobs)) // Queue size per provisioner + for i, job := range jobs { + queuePosition := int64(i + 1) + + // If the job already exists, update only if this queuePosition is better + if existing, exists := jobQueueStats[job.ID]; exists { + jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{ + ID: job.ID, + CreatedAt: job.CreatedAt, + ProvisionerJob: job, + QueuePosition: min(existing.QueuePosition, queuePosition), + QueueSize: max(existing.QueueSize, queueSize), // Take the maximum queue size across provisioners + } + } else { + jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{ + ID: job.ID, + CreatedAt: job.CreatedAt, + ProvisionerJob: job, + QueuePosition: queuePosition, + QueueSize: queueSize, + } + } + } + } + + // Step 6: Compute the final results with minimal checks + var results []database.GetProvisionerJobsByIDsWithQueuePositionRow + for _, job := range filteredJobs { + // If the job has a computed rank, use it + if rank, found := jobQueueStats[job.ID]; found { + results = append(results, rank) + } else { + // Otherwise, return (0,0) for non-pending jobs and unranked pending jobs + results = append(results, database.GetProvisionerJobsByIDsWithQueuePositionRow{ + ID: job.ID, + CreatedAt: job.CreatedAt, + ProvisionerJob: job, + QueuePosition: 0, + QueueSize: 0, + }) + } + } + + // Step 7: Sort results by CreatedAt + sort.Slice(results, func(i, j int) bool { + return results[i].CreatedAt.Before(results[j].CreatedAt) + }) + + return results, nil +} + +func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) { // WITH pending_jobs AS ( // SELECT // id, created_at @@ -4149,7 +4261,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte if ids == nil { ids = []uuid.UUID{} } - return q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, ids) + return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, ids) } func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) { @@ -4218,7 +4330,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition LIMIT sqlc.narg('limit')::int; */ - rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, nil) + rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(ctx, nil) if err != nil { return nil, err } diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index a73f3f3831d88..ecf9a59c0a393 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -2168,9 +2168,6 @@ func TestExpectOne(t *testing.T) { func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { t.Parallel() - if !dbtestutil.WillUsePostgres() { - t.SkipNow() - } now := dbtime.Now() ctx := testutil.Context(t, testutil.WaitShort) @@ -2613,9 +2610,6 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) { t.Parallel() - if !dbtestutil.WillUsePostgres() { - t.SkipNow() - } db, _ := dbtestutil.NewDB(t) now := dbtime.Now() From 6f4da846464915fa8899fb93307edbd6bbc73cf9 Mon Sep 17 00:00:00 2001 From: evgeniy-scherbina Date: Mon, 3 Mar 2025 08:56:48 -0500 Subject: [PATCH 18/18] fix(coderd/database): increment migration number --- ...s_idx.down.sql => 000298_provisioner_jobs_status_idx.down.sql} | 0 ...tatus_idx.up.sql => 000298_provisioner_jobs_status_idx.up.sql} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename coderd/database/migrations/{000297_provisioner_jobs_status_idx.down.sql => 000298_provisioner_jobs_status_idx.down.sql} (100%) rename coderd/database/migrations/{000297_provisioner_jobs_status_idx.up.sql => 000298_provisioner_jobs_status_idx.up.sql} (100%) diff --git a/coderd/database/migrations/000297_provisioner_jobs_status_idx.down.sql b/coderd/database/migrations/000298_provisioner_jobs_status_idx.down.sql similarity index 100% rename from coderd/database/migrations/000297_provisioner_jobs_status_idx.down.sql rename to coderd/database/migrations/000298_provisioner_jobs_status_idx.down.sql diff --git a/coderd/database/migrations/000297_provisioner_jobs_status_idx.up.sql b/coderd/database/migrations/000298_provisioner_jobs_status_idx.up.sql similarity index 100% rename from coderd/database/migrations/000297_provisioner_jobs_status_idx.up.sql rename to coderd/database/migrations/000298_provisioner_jobs_status_idx.up.sql