diff --git a/coderd/database/dbmem/dbmem.go b/coderd/database/dbmem/dbmem.go index 125cca81e184f..97576c09d6168 100644 --- a/coderd/database/dbmem/dbmem.go +++ b/coderd/database/dbmem/dbmem.go @@ -1149,7 +1149,119 @@ func getOwnerFromTags(tags map[string]string) string { return "" } -func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLocked(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) { +// provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags +func provisionerTagsetContains(daemonTags, jobTags map[string]string) bool { + for jobKey, jobValue := range jobTags { + if daemonValue, exists := daemonTags[jobKey]; !exists || daemonValue != jobValue { + return false + } + } + return true +} + +// GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go +func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(_ context.Context, jobIDs []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) { + // Step 1: Filter provisionerJobs based on jobIDs + filteredJobs := make(map[uuid.UUID]database.ProvisionerJob) + for _, job := range q.provisionerJobs { + for _, id := range jobIDs { + if job.ID == id { + filteredJobs[job.ID] = job + } + } + } + + // Step 2: Identify pending jobs + pendingJobs := make(map[uuid.UUID]database.ProvisionerJob) + for _, job := range q.provisionerJobs { + if job.JobStatus == "pending" { + pendingJobs[job.ID] = job + } + } + + // Step 3: Identify pending jobs that have a matching provisioner + matchedJobs := make(map[uuid.UUID]struct{}) + for _, job := range pendingJobs { + for _, daemon := range q.provisionerDaemons { + if provisionerTagsetContains(daemon.Tags, job.Tags) { + matchedJobs[job.ID] = struct{}{} + break + } + } + } + + // Step 4: Rank pending jobs per provisioner + jobRanks := make(map[uuid.UUID][]database.ProvisionerJob) + for _, job := range pendingJobs { + for _, daemon := range q.provisionerDaemons { + if provisionerTagsetContains(daemon.Tags, job.Tags) { + jobRanks[daemon.ID] = append(jobRanks[daemon.ID], job) + } + } + } + + // Sort jobs per provisioner by CreatedAt + for daemonID := range jobRanks { + sort.Slice(jobRanks[daemonID], func(i, j int) bool { + return jobRanks[daemonID][i].CreatedAt.Before(jobRanks[daemonID][j].CreatedAt) + }) + } + + // Step 5: Compute queue position & max queue size across all provisioners + jobQueueStats := make(map[uuid.UUID]database.GetProvisionerJobsByIDsWithQueuePositionRow) + for _, jobs := range jobRanks { + queueSize := int64(len(jobs)) // Queue size per provisioner + for i, job := range jobs { + queuePosition := int64(i + 1) + + // If the job already exists, update only if this queuePosition is better + if existing, exists := jobQueueStats[job.ID]; exists { + jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{ + ID: job.ID, + CreatedAt: job.CreatedAt, + ProvisionerJob: job, + QueuePosition: min(existing.QueuePosition, queuePosition), + QueueSize: max(existing.QueueSize, queueSize), // Take the maximum queue size across provisioners + } + } else { + jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{ + ID: job.ID, + CreatedAt: job.CreatedAt, + ProvisionerJob: job, + QueuePosition: queuePosition, + QueueSize: queueSize, + } + } + } + } + + // Step 6: Compute the final results with minimal checks + var results []database.GetProvisionerJobsByIDsWithQueuePositionRow + for _, job := range filteredJobs { + // If the job has a computed rank, use it + if rank, found := jobQueueStats[job.ID]; found { + results = append(results, rank) + } else { + // Otherwise, return (0,0) for non-pending jobs and unranked pending jobs + results = append(results, database.GetProvisionerJobsByIDsWithQueuePositionRow{ + ID: job.ID, + CreatedAt: job.CreatedAt, + ProvisionerJob: job, + QueuePosition: 0, + QueueSize: 0, + }) + } + } + + // Step 7: Sort results by CreatedAt + sort.Slice(results, func(i, j int) bool { + return results[i].CreatedAt.Before(results[j].CreatedAt) + }) + + return results, nil +} + +func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) { // WITH pending_jobs AS ( // SELECT // id, created_at @@ -4237,7 +4349,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte if ids == nil { ids = []uuid.UUID{} } - return q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, ids) + return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, ids) } func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) { @@ -4306,7 +4418,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition LIMIT sqlc.narg('limit')::int; */ - rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, nil) + rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(ctx, nil) if err != nil { return nil, err } diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index c35a30ae2d866..e206b3ea7c136 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -2316,6 +2316,8 @@ CREATE UNIQUE INDEX idx_provisioner_daemons_org_name_owner_key ON provisioner_da COMMENT ON INDEX idx_provisioner_daemons_org_name_owner_key IS 'Allow unique provisioner daemon names by organization and user'; +CREATE INDEX idx_provisioner_jobs_status ON provisioner_jobs USING btree (job_status); + CREATE INDEX idx_tailnet_agents_coordinator ON tailnet_agents USING btree (coordinator_id); CREATE INDEX idx_tailnet_clients_coordinator ON tailnet_clients USING btree (coordinator_id); diff --git a/coderd/database/migrations/000298_provisioner_jobs_status_idx.down.sql b/coderd/database/migrations/000298_provisioner_jobs_status_idx.down.sql new file mode 100644 index 0000000000000..e7e976e0e25f0 --- /dev/null +++ b/coderd/database/migrations/000298_provisioner_jobs_status_idx.down.sql @@ -0,0 +1 @@ +DROP INDEX idx_provisioner_jobs_status; diff --git a/coderd/database/migrations/000298_provisioner_jobs_status_idx.up.sql b/coderd/database/migrations/000298_provisioner_jobs_status_idx.up.sql new file mode 100644 index 0000000000000..8a1375232430e --- /dev/null +++ b/coderd/database/migrations/000298_provisioner_jobs_status_idx.up.sql @@ -0,0 +1 @@ +CREATE INDEX idx_provisioner_jobs_status ON provisioner_jobs USING btree (job_status); diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index 5d3e65bb518df..ecf9a59c0a393 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -1257,6 +1257,15 @@ func TestQueuePosition(t *testing.T) { time.Sleep(time.Millisecond) } + // Create default provisioner daemon: + dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ + Name: "default_provisioner", + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + // Ensure the `tags` field is NOT NULL for the default provisioner; + // otherwise, it won't be able to pick up any jobs. + Tags: database.StringMap{}, + }) + queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs) require.NoError(t, err) require.Len(t, queued, jobCount) @@ -2159,6 +2168,307 @@ func TestExpectOne(t *testing.T) { func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { t.Parallel() + + now := dbtime.Now() + ctx := testutil.Context(t, testutil.WaitShort) + + testCases := []struct { + name string + jobTags []database.StringMap + daemonTags []database.StringMap + queueSizes []int64 + queuePositions []int64 + // GetProvisionerJobsByIDsWithQueuePosition takes jobIDs as a parameter. + // If skipJobIDs is empty, all jobs are passed to the function; otherwise, the specified jobs are skipped. + // NOTE: Skipping job IDs means they will be excluded from the result, + // but this should not affect the queue position or queue size of other jobs. + skipJobIDs map[int]struct{} + }{ + // Baseline test case + { + name: "test-case-1", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + }, + queueSizes: []int64{2, 2, 0}, + queuePositions: []int64{1, 1, 0}, + }, + // Includes an additional provisioner + { + name: "test-case-2", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3, 3, 3}, + queuePositions: []int64{1, 1, 3}, + }, + // Skips job at index 0 + { + name: "test-case-3", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3, 3}, + queuePositions: []int64{1, 3}, + skipJobIDs: map[int]struct{}{ + 0: {}, + }, + }, + // Skips job at index 1 + { + name: "test-case-4", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3, 3}, + queuePositions: []int64{1, 3}, + skipJobIDs: map[int]struct{}{ + 1: {}, + }, + }, + // Skips job at index 2 + { + name: "test-case-5", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3, 3}, + queuePositions: []int64{1, 1}, + skipJobIDs: map[int]struct{}{ + 2: {}, + }, + }, + // Skips jobs at indexes 0 and 2 + { + name: "test-case-6", + jobTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{3}, + queuePositions: []int64{1}, + skipJobIDs: map[int]struct{}{ + 0: {}, + 2: {}, + }, + }, + // Includes two additional jobs that any provisioner can execute. + { + name: "test-case-7", + jobTags: []database.StringMap{ + {}, + {}, + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{5, 5, 5, 5, 5}, + queuePositions: []int64{1, 2, 3, 3, 5}, + }, + // Includes two additional jobs that any provisioner can execute, but they are intentionally skipped. + { + name: "test-case-8", + jobTags: []database.StringMap{ + {}, + {}, + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "c": "3"}, + }, + daemonTags: []database.StringMap{ + {"a": "1", "b": "2"}, + {"a": "1"}, + {"a": "1", "b": "2", "c": "3"}, + }, + queueSizes: []int64{5, 5, 5}, + queuePositions: []int64{3, 3, 5}, + skipJobIDs: map[int]struct{}{ + 0: {}, + 1: {}, + }, + }, + // N jobs (1 job with 0 tags) & 0 provisioners exist + { + name: "test-case-9", + jobTags: []database.StringMap{ + {}, + {"a": "1"}, + {"b": "2"}, + }, + daemonTags: []database.StringMap{}, + queueSizes: []int64{0, 0, 0}, + queuePositions: []int64{0, 0, 0}, + }, + // N jobs (1 job with 0 tags) & N provisioners + { + name: "test-case-10", + jobTags: []database.StringMap{ + {}, + {"a": "1"}, + {"b": "2"}, + }, + daemonTags: []database.StringMap{ + {}, + {"a": "1"}, + {"b": "2"}, + }, + queueSizes: []int64{2, 2, 2}, + queuePositions: []int64{1, 2, 2}, + }, + // (N + 1) jobs (1 job with 0 tags) & N provisioners + // 1 job not matching any provisioner (first in the list) + { + name: "test-case-11", + jobTags: []database.StringMap{ + {"c": "3"}, + {}, + {"a": "1"}, + {"b": "2"}, + }, + daemonTags: []database.StringMap{ + {}, + {"a": "1"}, + {"b": "2"}, + }, + queueSizes: []int64{0, 2, 2, 2}, + queuePositions: []int64{0, 1, 2, 2}, + }, + // 0 jobs & 0 provisioners + { + name: "test-case-12", + jobTags: []database.StringMap{}, + daemonTags: []database.StringMap{}, + queueSizes: nil, // TODO(yevhenii): should it be empty array instead? + queuePositions: nil, + }, + } + + for _, tc := range testCases { + tc := tc // Capture loop variable to avoid data races + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + db, _ := dbtestutil.NewDB(t) + + // Create provisioner jobs based on provided tags: + allJobs := make([]database.ProvisionerJob, len(tc.jobTags)) + for idx, tags := range tc.jobTags { + // Make sure jobs are stored in correct order, first job should have the earliest createdAt timestamp. + // Example for 3 jobs: + // job_1 createdAt: now - 3 minutes + // job_2 createdAt: now - 2 minutes + // job_3 createdAt: now - 1 minute + timeOffsetInMinutes := len(tc.jobTags) - idx + timeOffset := time.Duration(timeOffsetInMinutes) * time.Minute + createdAt := now.Add(-timeOffset) + + allJobs[idx] = dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: createdAt, + Tags: tags, + }) + } + + // Create provisioner daemons based on provided tags: + for idx, tags := range tc.daemonTags { + dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ + Name: fmt.Sprintf("prov_%v", idx), + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + Tags: tags, + }) + } + + // Assert invariant: the jobs are in pending status + for idx, job := range allJobs { + require.Equal(t, database.ProvisionerJobStatusPending, job.JobStatus, "expected job %d to have status %s", idx, database.ProvisionerJobStatusPending) + } + + filteredJobs := make([]database.ProvisionerJob, 0) + filteredJobIDs := make([]uuid.UUID, 0) + for idx, job := range allJobs { + if _, skip := tc.skipJobIDs[idx]; skip { + continue + } + + filteredJobs = append(filteredJobs, job) + filteredJobIDs = append(filteredJobIDs, job.ID) + } + + // When: we fetch the jobs by their IDs + actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, filteredJobIDs) + require.NoError(t, err) + require.Len(t, actualJobs, len(filteredJobs), "should return all unskipped jobs") + + // Then: the jobs should be returned in the correct order (sorted by createdAt) + sort.Slice(filteredJobs, func(i, j int) bool { + return filteredJobs[i].CreatedAt.Before(filteredJobs[j].CreatedAt) + }) + for idx, job := range actualJobs { + assert.EqualValues(t, filteredJobs[idx], job.ProvisionerJob) + } + + // Then: the queue size should be set correctly + var queueSizes []int64 + for _, job := range actualJobs { + queueSizes = append(queueSizes, job.QueueSize) + } + assert.EqualValues(t, tc.queueSizes, queueSizes, "expected queue positions to be set correctly") + + // Then: the queue position should be set correctly: + var queuePositions []int64 + for _, job := range actualJobs { + queuePositions = append(queuePositions, job.QueuePosition) + } + assert.EqualValues(t, tc.queuePositions, queuePositions, "expected queue positions to be set correctly") + }) + } +} + +func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) { + t.Parallel() if !dbtestutil.WillUsePostgres() { t.SkipNow() } @@ -2167,7 +2477,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { now := dbtime.Now() ctx := testutil.Context(t, testutil.WaitShort) - // Given the following provisioner jobs: + // Create the following provisioner jobs: allJobs := []database.ProvisionerJob{ // Pending. This will be the last in the queue because // it was created most recently. @@ -2177,6 +2487,9 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, + // Ensure the `tags` field is NOT NULL for both provisioner jobs and provisioner daemons; + // otherwise, provisioner daemons won't be able to pick up any jobs. + Tags: database.StringMap{}, }), // Another pending. This will come first in the queue @@ -2187,6 +2500,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, + Tags: database.StringMap{}, }), // Running @@ -2196,6 +2510,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, + Tags: database.StringMap{}, }), // Succeeded @@ -2205,6 +2520,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{Valid: true, Time: now}, Error: sql.NullString{}, + Tags: database.StringMap{}, }), // Canceling @@ -2214,6 +2530,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{Valid: true, Time: now}, CompletedAt: sql.NullTime{}, Error: sql.NullString{}, + Tags: database.StringMap{}, }), // Canceled @@ -2223,6 +2540,7 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{Valid: true, Time: now}, CompletedAt: sql.NullTime{Valid: true, Time: now}, Error: sql.NullString{}, + Tags: database.StringMap{}, }), // Failed @@ -2232,9 +2550,17 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { CanceledAt: sql.NullTime{}, CompletedAt: sql.NullTime{}, Error: sql.NullString{String: "failed", Valid: true}, + Tags: database.StringMap{}, }), } + // Create default provisioner daemon: + dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ + Name: "default_provisioner", + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + Tags: database.StringMap{}, + }) + // Assert invariant: the jobs are in the expected order require.Len(t, allJobs, 7, "expected 7 jobs") for idx, status := range []database.ProvisionerJobStatus{ @@ -2259,22 +2585,123 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { require.NoError(t, err) require.Len(t, actualJobs, len(allJobs), "should return all jobs") - // Then: the jobs should be returned in the correct order (by IDs in the input slice) + // Then: the jobs should be returned in the correct order (sorted by createdAt) + sort.Slice(allJobs, func(i, j int) bool { + return allJobs[i].CreatedAt.Before(allJobs[j].CreatedAt) + }) + for idx, job := range actualJobs { + assert.EqualValues(t, allJobs[idx], job.ProvisionerJob) + } + + // Then: the queue size should be set correctly + var queueSizes []int64 + for _, job := range actualJobs { + queueSizes = append(queueSizes, job.QueueSize) + } + assert.EqualValues(t, []int64{0, 0, 0, 0, 0, 2, 2}, queueSizes, "expected queue positions to be set correctly") + + // Then: the queue position should be set correctly: + var queuePositions []int64 + for _, job := range actualJobs { + queuePositions = append(queuePositions, job.QueuePosition) + } + assert.EqualValues(t, []int64{0, 0, 0, 0, 0, 1, 2}, queuePositions, "expected queue positions to be set correctly") +} + +func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) { + t.Parallel() + + db, _ := dbtestutil.NewDB(t) + now := dbtime.Now() + ctx := testutil.Context(t, testutil.WaitShort) + + // Create the following provisioner jobs: + allJobs := []database.ProvisionerJob{ + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-4 * time.Minute), + // Ensure the `tags` field is NOT NULL for both provisioner jobs and provisioner daemons; + // otherwise, provisioner daemons won't be able to pick up any jobs. + Tags: database.StringMap{}, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-5 * time.Minute), + Tags: database.StringMap{}, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-6 * time.Minute), + Tags: database.StringMap{}, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-3 * time.Minute), + Tags: database.StringMap{}, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-2 * time.Minute), + Tags: database.StringMap{}, + }), + + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-1 * time.Minute), + Tags: database.StringMap{}, + }), + } + + // Create default provisioner daemon: + dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{ + Name: "default_provisioner", + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + Tags: database.StringMap{}, + }) + + // Assert invariant: the jobs are in the expected order + require.Len(t, allJobs, 6, "expected 7 jobs") + for idx, status := range []database.ProvisionerJobStatus{ + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + } { + require.Equal(t, status, allJobs[idx].JobStatus, "expected job %d to have status %s", idx, status) + } + + var jobIDs []uuid.UUID + for _, job := range allJobs { + jobIDs = append(jobIDs, job.ID) + } + + // When: we fetch the jobs by their IDs + actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs) + require.NoError(t, err) + require.Len(t, actualJobs, len(allJobs), "should return all jobs") + + // Then: the jobs should be returned in the correct order (sorted by createdAt) + sort.Slice(allJobs, func(i, j int) bool { + return allJobs[i].CreatedAt.Before(allJobs[j].CreatedAt) + }) for idx, job := range actualJobs { assert.EqualValues(t, allJobs[idx], job.ProvisionerJob) + assert.EqualValues(t, allJobs[idx].CreatedAt, job.ProvisionerJob.CreatedAt) } // Then: the queue size should be set correctly + var queueSizes []int64 for _, job := range actualJobs { - assert.EqualValues(t, job.QueueSize, 2, "should have queue size 2") + queueSizes = append(queueSizes, job.QueueSize) } + assert.EqualValues(t, []int64{6, 6, 6, 6, 6, 6}, queueSizes, "expected queue positions to be set correctly") // Then: the queue position should be set correctly: var queuePositions []int64 for _, job := range actualJobs { queuePositions = append(queuePositions, job.QueuePosition) } - assert.EqualValues(t, []int64{2, 1, 0, 0, 0, 0, 0}, queuePositions, "expected queue positions to be set correctly") + assert.EqualValues(t, []int64{1, 2, 3, 4, 5, 6}, queuePositions, "expected queue positions to be set correctly") } func TestGroupRemovalTrigger(t *testing.T) { diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 0891bc8c9fcc6..a8421e62d8245 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -6627,45 +6627,69 @@ func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUI } const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByIDsWithQueuePosition :many -WITH pending_jobs AS ( - SELECT - id, created_at - FROM - provisioner_jobs - WHERE - started_at IS NULL - AND - canceled_at IS NULL - AND - completed_at IS NULL - AND - error IS NULL +WITH filtered_provisioner_jobs AS ( + -- Step 1: Filter provisioner_jobs + SELECT + id, created_at + FROM + provisioner_jobs + WHERE + id = ANY($1 :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOIN ), -queue_position AS ( - SELECT - id, - ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position - FROM - pending_jobs +pending_jobs AS ( + -- Step 2: Extract only pending jobs + SELECT + id, created_at, tags + FROM + provisioner_jobs + WHERE + job_status = 'pending' ), -queue_size AS ( - SELECT COUNT(*) AS count FROM pending_jobs +ranked_jobs AS ( + -- Step 3: Rank only pending jobs based on provisioner availability + SELECT + pj.id, + pj.created_at, + ROW_NUMBER() OVER (PARTITION BY pd.id ORDER BY pj.created_at ASC) AS queue_position, + COUNT(*) OVER (PARTITION BY pd.id) AS queue_size + FROM + pending_jobs pj + INNER JOIN provisioner_daemons pd + ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small pending set +), +final_jobs AS ( + -- Step 4: Compute best queue position and max queue size per job + SELECT + fpj.id, + fpj.created_at, + COALESCE(MIN(rj.queue_position), 0) :: BIGINT AS queue_position, -- Best queue position across provisioners + COALESCE(MAX(rj.queue_size), 0) :: BIGINT AS queue_size -- Max queue size across provisioners + FROM + filtered_provisioner_jobs fpj -- Use the pre-filtered dataset instead of full provisioner_jobs + LEFT JOIN ranked_jobs rj + ON fpj.id = rj.id -- Join with the ranking jobs CTE to assign a rank to each specified provisioner job. + GROUP BY + fpj.id, fpj.created_at ) SELECT + -- Step 5: Final SELECT with INNER JOIN provisioner_jobs + fj.id, + fj.created_at, pj.id, pj.created_at, pj.updated_at, pj.started_at, pj.canceled_at, pj.completed_at, pj.error, pj.organization_id, pj.initiator_id, pj.provisioner, pj.storage_method, pj.type, pj.input, pj.worker_id, pj.file_id, pj.tags, pj.error_code, pj.trace_metadata, pj.job_status, - COALESCE(qp.queue_position, 0) AS queue_position, - COALESCE(qs.count, 0) AS queue_size + fj.queue_position, + fj.queue_size FROM - provisioner_jobs pj -LEFT JOIN - queue_position qp ON qp.id = pj.id -LEFT JOIN - queue_size qs ON TRUE -WHERE - pj.id = ANY($1 :: uuid [ ]) + final_jobs fj + INNER JOIN provisioner_jobs pj + ON fj.id = pj.id -- Ensure we retrieve full details from ` + "`" + `provisioner_jobs` + "`" + `. + -- JOIN with pj is required for sqlc.embed(pj) to compile successfully. +ORDER BY + fj.created_at ` type GetProvisionerJobsByIDsWithQueuePositionRow struct { + ID uuid.UUID `db:"id" json:"id"` + CreatedAt time.Time `db:"created_at" json:"created_at"` ProvisionerJob ProvisionerJob `db:"provisioner_job" json:"provisioner_job"` QueuePosition int64 `db:"queue_position" json:"queue_position"` QueueSize int64 `db:"queue_size" json:"queue_size"` @@ -6681,6 +6705,8 @@ func (q *sqlQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Contex for rows.Next() { var i GetProvisionerJobsByIDsWithQueuePositionRow if err := rows.Scan( + &i.ID, + &i.CreatedAt, &i.ProvisionerJob.ID, &i.ProvisionerJob.CreatedAt, &i.ProvisionerJob.UpdatedAt, diff --git a/coderd/database/queries/provisionerjobs.sql b/coderd/database/queries/provisionerjobs.sql index 592b228af2806..2d544aedb9bd8 100644 --- a/coderd/database/queries/provisionerjobs.sql +++ b/coderd/database/queries/provisionerjobs.sql @@ -50,42 +50,64 @@ WHERE id = ANY(@ids :: uuid [ ]); -- name: GetProvisionerJobsByIDsWithQueuePosition :many -WITH pending_jobs AS ( - SELECT - id, created_at - FROM - provisioner_jobs - WHERE - started_at IS NULL - AND - canceled_at IS NULL - AND - completed_at IS NULL - AND - error IS NULL +WITH filtered_provisioner_jobs AS ( + -- Step 1: Filter provisioner_jobs + SELECT + id, created_at + FROM + provisioner_jobs + WHERE + id = ANY(@ids :: uuid [ ]) -- Apply filter early to reduce dataset size before expensive JOIN ), -queue_position AS ( - SELECT - id, - ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position - FROM - pending_jobs +pending_jobs AS ( + -- Step 2: Extract only pending jobs + SELECT + id, created_at, tags + FROM + provisioner_jobs + WHERE + job_status = 'pending' ), -queue_size AS ( - SELECT COUNT(*) AS count FROM pending_jobs +ranked_jobs AS ( + -- Step 3: Rank only pending jobs based on provisioner availability + SELECT + pj.id, + pj.created_at, + ROW_NUMBER() OVER (PARTITION BY pd.id ORDER BY pj.created_at ASC) AS queue_position, + COUNT(*) OVER (PARTITION BY pd.id) AS queue_size + FROM + pending_jobs pj + INNER JOIN provisioner_daemons pd + ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small pending set +), +final_jobs AS ( + -- Step 4: Compute best queue position and max queue size per job + SELECT + fpj.id, + fpj.created_at, + COALESCE(MIN(rj.queue_position), 0) :: BIGINT AS queue_position, -- Best queue position across provisioners + COALESCE(MAX(rj.queue_size), 0) :: BIGINT AS queue_size -- Max queue size across provisioners + FROM + filtered_provisioner_jobs fpj -- Use the pre-filtered dataset instead of full provisioner_jobs + LEFT JOIN ranked_jobs rj + ON fpj.id = rj.id -- Join with the ranking jobs CTE to assign a rank to each specified provisioner job. + GROUP BY + fpj.id, fpj.created_at ) SELECT + -- Step 5: Final SELECT with INNER JOIN provisioner_jobs + fj.id, + fj.created_at, sqlc.embed(pj), - COALESCE(qp.queue_position, 0) AS queue_position, - COALESCE(qs.count, 0) AS queue_size + fj.queue_position, + fj.queue_size FROM - provisioner_jobs pj -LEFT JOIN - queue_position qp ON qp.id = pj.id -LEFT JOIN - queue_size qs ON TRUE -WHERE - pj.id = ANY(@ids :: uuid [ ]); + final_jobs fj + INNER JOIN provisioner_jobs pj + ON fj.id = pj.id -- Ensure we retrieve full details from `provisioner_jobs`. + -- JOIN with pj is required for sqlc.embed(pj) to compile successfully. +ORDER BY + fj.created_at; -- name: GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner :many WITH pending_jobs AS (