Skip to content

Commit 4f77f67

Browse files
chore: update dbmem
1 parent 61a9f58 commit 4f77f67

File tree

2 files changed

+115
-9
lines changed

2 files changed

+115
-9
lines changed

coderd/database/dbmem/dbmem.go

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,119 @@ func getOwnerFromTags(tags map[string]string) string {
11471147
return ""
11481148
}
11491149

1150-
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLocked(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
1150+
// provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
1151+
func provisionerTagsetContains(daemonTags, jobTags map[string]string) bool {
1152+
for jobKey, jobValue := range jobTags {
1153+
if daemonValue, exists := daemonTags[jobKey]; !exists || daemonValue != jobValue {
1154+
return false
1155+
}
1156+
}
1157+
return true
1158+
}
1159+
1160+
// GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
1161+
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(_ context.Context, jobIDs []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
1162+
// Step 1: Filter provisionerJobs based on jobIDs
1163+
filteredJobs := make(map[uuid.UUID]database.ProvisionerJob)
1164+
for _, job := range q.provisionerJobs {
1165+
for _, id := range jobIDs {
1166+
if job.ID == id {
1167+
filteredJobs[job.ID] = job
1168+
}
1169+
}
1170+
}
1171+
1172+
// Step 2: Identify pending jobs
1173+
pendingJobs := make(map[uuid.UUID]database.ProvisionerJob)
1174+
for _, job := range q.provisionerJobs {
1175+
if job.JobStatus == "pending" {
1176+
pendingJobs[job.ID] = job
1177+
}
1178+
}
1179+
1180+
// Step 3: Identify pending jobs that have a matching provisioner
1181+
matchedJobs := make(map[uuid.UUID]struct{})
1182+
for _, job := range pendingJobs {
1183+
for _, daemon := range q.provisionerDaemons {
1184+
if provisionerTagsetContains(daemon.Tags, job.Tags) {
1185+
matchedJobs[job.ID] = struct{}{}
1186+
break
1187+
}
1188+
}
1189+
}
1190+
1191+
// Step 4: Rank pending jobs per provisioner
1192+
jobRanks := make(map[uuid.UUID][]database.ProvisionerJob)
1193+
for _, job := range pendingJobs {
1194+
for _, daemon := range q.provisionerDaemons {
1195+
if provisionerTagsetContains(daemon.Tags, job.Tags) {
1196+
jobRanks[daemon.ID] = append(jobRanks[daemon.ID], job)
1197+
}
1198+
}
1199+
}
1200+
1201+
// Sort jobs per provisioner by CreatedAt
1202+
for daemonID := range jobRanks {
1203+
sort.Slice(jobRanks[daemonID], func(i, j int) bool {
1204+
return jobRanks[daemonID][i].CreatedAt.Before(jobRanks[daemonID][j].CreatedAt)
1205+
})
1206+
}
1207+
1208+
// Step 5: Compute queue position & max queue size across all provisioners
1209+
jobQueueStats := make(map[uuid.UUID]database.GetProvisionerJobsByIDsWithQueuePositionRow)
1210+
for _, jobs := range jobRanks {
1211+
queueSize := int64(len(jobs)) // Queue size per provisioner
1212+
for i, job := range jobs {
1213+
queuePosition := int64(i + 1)
1214+
1215+
// If the job already exists, update only if this queuePosition is better
1216+
if existing, exists := jobQueueStats[job.ID]; exists {
1217+
jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{
1218+
ID: job.ID,
1219+
CreatedAt: job.CreatedAt,
1220+
ProvisionerJob: job,
1221+
QueuePosition: min(existing.QueuePosition, queuePosition),
1222+
QueueSize: max(existing.QueueSize, queueSize), // Take the maximum queue size across provisioners
1223+
}
1224+
} else {
1225+
jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{
1226+
ID: job.ID,
1227+
CreatedAt: job.CreatedAt,
1228+
ProvisionerJob: job,
1229+
QueuePosition: queuePosition,
1230+
QueueSize: queueSize,
1231+
}
1232+
}
1233+
}
1234+
}
1235+
1236+
// Step 6: Compute the final results with minimal checks
1237+
var results []database.GetProvisionerJobsByIDsWithQueuePositionRow
1238+
for _, job := range filteredJobs {
1239+
// If the job has a computed rank, use it
1240+
if rank, found := jobQueueStats[job.ID]; found {
1241+
results = append(results, rank)
1242+
} else {
1243+
// Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
1244+
results = append(results, database.GetProvisionerJobsByIDsWithQueuePositionRow{
1245+
ID: job.ID,
1246+
CreatedAt: job.CreatedAt,
1247+
ProvisionerJob: job,
1248+
QueuePosition: 0,
1249+
QueueSize: 0,
1250+
})
1251+
}
1252+
}
1253+
1254+
// Step 7: Sort results by CreatedAt
1255+
sort.Slice(results, func(i, j int) bool {
1256+
return results[i].CreatedAt.Before(results[j].CreatedAt)
1257+
})
1258+
1259+
return results, nil
1260+
}
1261+
1262+
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
11511263
// WITH pending_jobs AS (
11521264
// SELECT
11531265
// id, created_at
@@ -4149,7 +4261,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte
41494261
if ids == nil {
41504262
ids = []uuid.UUID{}
41514263
}
4152-
return q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, ids)
4264+
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, ids)
41534265
}
41544266

41554267
func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) {
@@ -4218,7 +4330,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition
42184330
LIMIT
42194331
sqlc.narg('limit')::int;
42204332
*/
4221-
rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, nil)
4333+
rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(ctx, nil)
42224334
if err != nil {
42234335
return nil, err
42244336
}

coderd/database/querier_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2168,9 +2168,6 @@ func TestExpectOne(t *testing.T) {
21682168

21692169
func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) {
21702170
t.Parallel()
2171-
if !dbtestutil.WillUsePostgres() {
2172-
t.SkipNow()
2173-
}
21742171

21752172
now := dbtime.Now()
21762173
ctx := testutil.Context(t, testutil.WaitShort)
@@ -2613,9 +2610,6 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) {
26132610

26142611
func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T) {
26152612
t.Parallel()
2616-
if !dbtestutil.WillUsePostgres() {
2617-
t.SkipNow()
2618-
}
26192613

26202614
db, _ := dbtestutil.NewDB(t)
26212615
now := dbtime.Now()

0 commit comments

Comments
 (0)