Skip to content

fix(coderd/database): consider tag sets when calculating queue position #16685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1e35c53
fix(coderd/database): remove linux build tags from db package
evgeniy-scherbina Feb 19, 2025
5c49221
fix(coderd/database): consider tag sets when calculating queue position
evgeniy-scherbina Feb 20, 2025
4257ca0
test(coderd/database): skip tests when PostgreSQL is unavailable
evgeniy-scherbina Feb 25, 2025
30f007c
test(coderd/database): create default provisioner in tests
evgeniy-scherbina Feb 25, 2025
acb93ac
fix(coderd/database): correctly calculate pending and ranked jobs
evgeniy-scherbina Feb 26, 2025
5aa0ffa
test(coderd/database): improve test coverage
evgeniy-scherbina Feb 26, 2025
c1f421d
refactor(coderd/database): use empty tag sets
evgeniy-scherbina Feb 26, 2025
e7693ee
test(coderd/database): improve test coverage
evgeniy-scherbina Feb 27, 2025
a9ed7d2
test(coderd/database): wrap test cases in subtests
evgeniy-scherbina Feb 27, 2025
574cdf6
refactor(coderd/database): minor refactoring in tests
evgeniy-scherbina Feb 27, 2025
8b3446e
fix(coderd/database): linter
evgeniy-scherbina Feb 27, 2025
c4316fd
refactor(coderd/database): use job_status generated column
evgeniy-scherbina Feb 27, 2025
b4ea4db
perf: add index on provisioner_jobs table for status field
evgeniy-scherbina Feb 27, 2025
675b3e9
Merge remote-tracking branch 'origin/main' into 15843-queue-position
evgeniy-scherbina Feb 28, 2025
1f8e4e3
fix(coderd/database): increment migration number
evgeniy-scherbina Feb 28, 2025
f9c9711
fix(coderd/database): update dump.sql
evgeniy-scherbina Feb 28, 2025
61a9f58
perf: optimize get-queue-position sql query
evgeniy-scherbina Feb 28, 2025
4f77f67
chore: update dbmem
evgeniy-scherbina Feb 28, 2025
359a9e0
Merge remote-tracking branch 'origin/main' into 15843-queue-position
evgeniy-scherbina Mar 3, 2025
6f4da84
fix(coderd/database): increment migration number
evgeniy-scherbina Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 115 additions & 3 deletions coderd/database/dbmem/dbmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,119 @@ func getOwnerFromTags(tags map[string]string) string {
return ""
}

func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLocked(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
func provisionerTagsetContains(daemonTags, jobTags map[string]string) bool {
for jobKey, jobValue := range jobTags {
if daemonValue, exists := daemonTags[jobKey]; !exists || daemonValue != jobValue {
return false
}
}
return true
}

// GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(_ context.Context, jobIDs []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// Step 1: Filter provisionerJobs based on jobIDs
filteredJobs := make(map[uuid.UUID]database.ProvisionerJob)
for _, job := range q.provisionerJobs {
for _, id := range jobIDs {
if job.ID == id {
filteredJobs[job.ID] = job
}
}
}

// Step 2: Identify pending jobs
pendingJobs := make(map[uuid.UUID]database.ProvisionerJob)
for _, job := range q.provisionerJobs {
if job.JobStatus == "pending" {
pendingJobs[job.ID] = job
}
}

// Step 3: Identify pending jobs that have a matching provisioner
matchedJobs := make(map[uuid.UUID]struct{})
for _, job := range pendingJobs {
for _, daemon := range q.provisionerDaemons {
if provisionerTagsetContains(daemon.Tags, job.Tags) {
matchedJobs[job.ID] = struct{}{}
break
}
}
}

// Step 4: Rank pending jobs per provisioner
jobRanks := make(map[uuid.UUID][]database.ProvisionerJob)
for _, job := range pendingJobs {
for _, daemon := range q.provisionerDaemons {
if provisionerTagsetContains(daemon.Tags, job.Tags) {
jobRanks[daemon.ID] = append(jobRanks[daemon.ID], job)
}
}
}

// Sort jobs per provisioner by CreatedAt
for daemonID := range jobRanks {
sort.Slice(jobRanks[daemonID], func(i, j int) bool {
return jobRanks[daemonID][i].CreatedAt.Before(jobRanks[daemonID][j].CreatedAt)
})
}

// Step 5: Compute queue position & max queue size across all provisioners
jobQueueStats := make(map[uuid.UUID]database.GetProvisionerJobsByIDsWithQueuePositionRow)
for _, jobs := range jobRanks {
queueSize := int64(len(jobs)) // Queue size per provisioner
for i, job := range jobs {
queuePosition := int64(i + 1)

// If the job already exists, update only if this queuePosition is better
if existing, exists := jobQueueStats[job.ID]; exists {
jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{
ID: job.ID,
CreatedAt: job.CreatedAt,
ProvisionerJob: job,
QueuePosition: min(existing.QueuePosition, queuePosition),
QueueSize: max(existing.QueueSize, queueSize), // Take the maximum queue size across provisioners
}
} else {
jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{
ID: job.ID,
CreatedAt: job.CreatedAt,
ProvisionerJob: job,
QueuePosition: queuePosition,
QueueSize: queueSize,
}
}
}
}

// Step 6: Compute the final results with minimal checks
var results []database.GetProvisionerJobsByIDsWithQueuePositionRow
for _, job := range filteredJobs {
// If the job has a computed rank, use it
if rank, found := jobQueueStats[job.ID]; found {
results = append(results, rank)
} else {
// Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
results = append(results, database.GetProvisionerJobsByIDsWithQueuePositionRow{
ID: job.ID,
CreatedAt: job.CreatedAt,
ProvisionerJob: job,
QueuePosition: 0,
QueueSize: 0,
})
}
}

// Step 7: Sort results by CreatedAt
sort.Slice(results, func(i, j int) bool {
return results[i].CreatedAt.Before(results[j].CreatedAt)
})

return results, nil
}

func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// WITH pending_jobs AS (
// SELECT
// id, created_at
Expand Down Expand Up @@ -4237,7 +4349,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte
if ids == nil {
ids = []uuid.UUID{}
}
return q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, ids)
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, ids)
}

func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) {
Expand Down Expand Up @@ -4306,7 +4418,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition
LIMIT
sqlc.narg('limit')::int;
*/
rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, nil)
rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(ctx, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions coderd/database/dump.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX idx_provisioner_jobs_status;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obligatory reminder to check migration number before merge!

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX idx_provisioner_jobs_status ON provisioner_jobs USING btree (job_status);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: it took me a minute to figure out if this is supported, but apparently it is! The docs on generated columns specify that only stored generated columns are supported, and these are always written to disk. TIL!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnstcn
yes, that's what I figured as well
Also I thought about using Hash Index instead of BTree, because so far we're only using = or != operations, but wasn't sure, maybe we'll need other operations in the future.

Copy link
Member

@johnstcn johnstcn Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH this is probably fine, almost all of our other indexes are using BTree. If you were curious, I suppose you could validate with some EXPLAINs on a sample dataset.

Loading
Loading