Skip to content

Commit b85ba58

Browse files
fix(coderd/database): consider tag sets when calculating queue position (#16685)
Relates to #15843 ## PR Contents - Reimplementation of the `GetProvisionerJobsByIDsWithQueuePosition` SQL query to **take into account** provisioner job tags and provisioner daemon tags. - Unit tests covering different **tag sets**, **job statuses**, and **job ordering** scenarios. ## Notes - The original row order is preserved by introducing the `ordinality` field. - Unnecessary rows are filtered as early as possible to ensure that expensive joins operate on a smaller dataset. - A "fake" join with `provisioner_jobs` is added at the end to ensure `sqlc.embed` compiles successfully. - **Backward compatibility is preserved**—only the SQL query has been updated, while the Go code remains unchanged.
1 parent 7637d39 commit b85ba58

File tree

7 files changed

+658
-67
lines changed

7 files changed

+658
-67
lines changed

coderd/database/dbmem/dbmem.go

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,7 +1149,119 @@ func getOwnerFromTags(tags map[string]string) string {
11491149
return ""
11501150
}
11511151

1152-
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLocked(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
1152+
// provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
1153+
func provisionerTagsetContains(daemonTags, jobTags map[string]string) bool {
1154+
for jobKey, jobValue := range jobTags {
1155+
if daemonValue, exists := daemonTags[jobKey]; !exists || daemonValue != jobValue {
1156+
return false
1157+
}
1158+
}
1159+
return true
1160+
}
1161+
1162+
// GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
1163+
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(_ context.Context, jobIDs []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
1164+
// Step 1: Filter provisionerJobs based on jobIDs
1165+
filteredJobs := make(map[uuid.UUID]database.ProvisionerJob)
1166+
for _, job := range q.provisionerJobs {
1167+
for _, id := range jobIDs {
1168+
if job.ID == id {
1169+
filteredJobs[job.ID] = job
1170+
}
1171+
}
1172+
}
1173+
1174+
// Step 2: Identify pending jobs
1175+
pendingJobs := make(map[uuid.UUID]database.ProvisionerJob)
1176+
for _, job := range q.provisionerJobs {
1177+
if job.JobStatus == "pending" {
1178+
pendingJobs[job.ID] = job
1179+
}
1180+
}
1181+
1182+
// Step 3: Identify pending jobs that have a matching provisioner
1183+
matchedJobs := make(map[uuid.UUID]struct{})
1184+
for _, job := range pendingJobs {
1185+
for _, daemon := range q.provisionerDaemons {
1186+
if provisionerTagsetContains(daemon.Tags, job.Tags) {
1187+
matchedJobs[job.ID] = struct{}{}
1188+
break
1189+
}
1190+
}
1191+
}
1192+
1193+
// Step 4: Rank pending jobs per provisioner
1194+
jobRanks := make(map[uuid.UUID][]database.ProvisionerJob)
1195+
for _, job := range pendingJobs {
1196+
for _, daemon := range q.provisionerDaemons {
1197+
if provisionerTagsetContains(daemon.Tags, job.Tags) {
1198+
jobRanks[daemon.ID] = append(jobRanks[daemon.ID], job)
1199+
}
1200+
}
1201+
}
1202+
1203+
// Sort jobs per provisioner by CreatedAt
1204+
for daemonID := range jobRanks {
1205+
sort.Slice(jobRanks[daemonID], func(i, j int) bool {
1206+
return jobRanks[daemonID][i].CreatedAt.Before(jobRanks[daemonID][j].CreatedAt)
1207+
})
1208+
}
1209+
1210+
// Step 5: Compute queue position & max queue size across all provisioners
1211+
jobQueueStats := make(map[uuid.UUID]database.GetProvisionerJobsByIDsWithQueuePositionRow)
1212+
for _, jobs := range jobRanks {
1213+
queueSize := int64(len(jobs)) // Queue size per provisioner
1214+
for i, job := range jobs {
1215+
queuePosition := int64(i + 1)
1216+
1217+
// If the job already exists, update only if this queuePosition is better
1218+
if existing, exists := jobQueueStats[job.ID]; exists {
1219+
jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{
1220+
ID: job.ID,
1221+
CreatedAt: job.CreatedAt,
1222+
ProvisionerJob: job,
1223+
QueuePosition: min(existing.QueuePosition, queuePosition),
1224+
QueueSize: max(existing.QueueSize, queueSize), // Take the maximum queue size across provisioners
1225+
}
1226+
} else {
1227+
jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{
1228+
ID: job.ID,
1229+
CreatedAt: job.CreatedAt,
1230+
ProvisionerJob: job,
1231+
QueuePosition: queuePosition,
1232+
QueueSize: queueSize,
1233+
}
1234+
}
1235+
}
1236+
}
1237+
1238+
// Step 6: Compute the final results with minimal checks
1239+
var results []database.GetProvisionerJobsByIDsWithQueuePositionRow
1240+
for _, job := range filteredJobs {
1241+
// If the job has a computed rank, use it
1242+
if rank, found := jobQueueStats[job.ID]; found {
1243+
results = append(results, rank)
1244+
} else {
1245+
// Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
1246+
results = append(results, database.GetProvisionerJobsByIDsWithQueuePositionRow{
1247+
ID: job.ID,
1248+
CreatedAt: job.CreatedAt,
1249+
ProvisionerJob: job,
1250+
QueuePosition: 0,
1251+
QueueSize: 0,
1252+
})
1253+
}
1254+
}
1255+
1256+
// Step 7: Sort results by CreatedAt
1257+
sort.Slice(results, func(i, j int) bool {
1258+
return results[i].CreatedAt.Before(results[j].CreatedAt)
1259+
})
1260+
1261+
return results, nil
1262+
}
1263+
1264+
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
11531265
// WITH pending_jobs AS (
11541266
// SELECT
11551267
// id, created_at
@@ -4237,7 +4349,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte
42374349
if ids == nil {
42384350
ids = []uuid.UUID{}
42394351
}
4240-
return q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, ids)
4352+
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, ids)
42414353
}
42424354

42434355
func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) {
@@ -4306,7 +4418,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition
43064418
LIMIT
43074419
sqlc.narg('limit')::int;
43084420
*/
4309-
rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, nil)
4421+
rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(ctx, nil)
43104422
if err != nil {
43114423
return nil, err
43124424
}

coderd/database/dump.sql

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP INDEX idx_provisioner_jobs_status;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
CREATE INDEX idx_provisioner_jobs_status ON provisioner_jobs USING btree (job_status);

0 commit comments

Comments
 (0)