@@ -1147,94 +1147,116 @@ func getOwnerFromTags(tags map[string]string) string {
1147
1147
return ""
1148
1148
}
1149
1149
1150
- func (q * FakeQuerier ) getProvisionerJobsByIDsWithQueuePositionLocked (_ context.Context , ids []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow , error ) {
1151
- // WITH pending_jobs AS (
1152
- // SELECT
1153
- // id, created_at
1154
- // FROM
1155
- // provisioner_jobs
1156
- // WHERE
1157
- // started_at IS NULL
1158
- // AND
1159
- // canceled_at IS NULL
1160
- // AND
1161
- // completed_at IS NULL
1162
- // AND
1163
- // error IS NULL
1164
- // ),
1165
- type pendingJobRow struct {
1166
- ID uuid.UUID
1167
- CreatedAt time.Time
1168
- }
1169
- pendingJobs := make ([]pendingJobRow , 0 )
1170
- for _ , job := range q .provisionerJobs {
1171
- if job .StartedAt .Valid ||
1172
- job .CanceledAt .Valid ||
1173
- job .CompletedAt .Valid ||
1174
- job .Error .Valid {
1175
- continue
1150
+ // provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
1151
+ func provisionerTagsetContains (daemonTags , jobTags map [string ]string ) bool {
1152
+ for jobKey , jobValue := range jobTags {
1153
+ if daemonValue , exists := daemonTags [jobKey ]; ! exists || daemonValue != jobValue {
1154
+ return false
1176
1155
}
1177
- pendingJobs = append (pendingJobs , pendingJobRow {
1178
- ID : job .ID ,
1179
- CreatedAt : job .CreatedAt ,
1180
- })
1181
1156
}
1157
+ return true
1158
+ }
1182
1159
1183
- // queue_position AS (
1184
- // SELECT
1185
- // id,
1186
- // ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position
1187
- // FROM
1188
- // pending_jobs
1189
- // ),
1190
- slices . SortFunc ( pendingJobs , func ( a , b pendingJobRow ) int {
1191
- c := a . CreatedAt . Compare ( b . CreatedAt )
1192
- return c
1193
- })
1160
+ // GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
1161
+ func ( q * FakeQuerier ) getProvisionerJobsByIDsWithQueuePositionLocked ( _ context. Context , jobIDs []uuid. UUID ) ([]database. GetProvisionerJobsByIDsWithQueuePositionRow , error ) {
1162
+ // Step 1: Filter provisionerJobs based on jobIDs
1163
+ filteredJobs := make ( map [uuid. UUID ]database. ProvisionerJob )
1164
+ for _ , job := range q . provisionerJobs {
1165
+ for _ , id := range jobIDs {
1166
+ if job . ID == id {
1167
+ filteredJobs [ job . ID ] = job
1168
+ }
1169
+ }
1170
+ }
1194
1171
1195
- queuePosition := make (map [uuid.UUID ]int64 )
1196
- for idx , pj := range pendingJobs {
1197
- queuePosition [pj .ID ] = int64 (idx + 1 )
1198
- }
1199
-
1200
- // queue_size AS (
1201
- // SELECT COUNT(*) AS count FROM pending_jobs
1202
- // ),
1203
- queueSize := len (pendingJobs )
1204
-
1205
- // SELECT
1206
- // sqlc.embed(pj),
1207
- // COALESCE(qp.queue_position, 0) AS queue_position,
1208
- // COALESCE(qs.count, 0) AS queue_size
1209
- // FROM
1210
- // provisioner_jobs pj
1211
- // LEFT JOIN
1212
- // queue_position qp ON pj.id = qp.id
1213
- // LEFT JOIN
1214
- // queue_size qs ON TRUE
1215
- // WHERE
1216
- // pj.id IN (...)
1217
- jobs := make ([]database.GetProvisionerJobsByIDsWithQueuePositionRow , 0 )
1172
+ // Step 2: Identify pending jobs
1173
+ pendingJobs := make (map [uuid.UUID ]database.ProvisionerJob )
1218
1174
for _ , job := range q .provisionerJobs {
1219
- if ids != nil && ! slices . Contains ( ids , job . ID ) {
1220
- continue
1175
+ if job . JobStatus == "pending" {
1176
+ pendingJobs [ job . ID ] = job
1221
1177
}
1222
- // clone the Tags before appending, since maps are reference types and
1223
- // we don't want the caller to be able to mutate the map we have inside
1224
- // dbmem!
1225
- job .Tags = maps .Clone (job .Tags )
1226
- job := database.GetProvisionerJobsByIDsWithQueuePositionRow {
1227
- // sqlc.embed(pj),
1228
- ProvisionerJob : job ,
1229
- // COALESCE(qp.queue_position, 0) AS queue_position,
1230
- QueuePosition : queuePosition [job .ID ],
1231
- // COALESCE(qs.count, 0) AS queue_size
1232
- QueueSize : int64 (queueSize ),
1178
+ }
1179
+
1180
+ // Step 3: Identify pending jobs that have a matching provisioner
1181
+ matchedJobs := make (map [uuid.UUID ]struct {})
1182
+ for _ , job := range pendingJobs {
1183
+ for _ , daemon := range q .provisionerDaemons {
1184
+ if provisionerTagsetContains (daemon .Tags , job .Tags ) {
1185
+ matchedJobs [job .ID ] = struct {}{}
1186
+ break
1187
+ }
1233
1188
}
1234
- jobs = append (jobs , job )
1235
1189
}
1236
1190
1237
- return jobs , nil
1191
+ // Step 4: Rank pending jobs per provisioner
1192
+ jobRanks := make (map [uuid.UUID ][]database.ProvisionerJob )
1193
+ for _ , job := range pendingJobs {
1194
+ for _ , daemon := range q .provisionerDaemons {
1195
+ if provisionerTagsetContains (daemon .Tags , job .Tags ) {
1196
+ jobRanks [daemon .ID ] = append (jobRanks [daemon .ID ], job )
1197
+ }
1198
+ }
1199
+ }
1200
+
1201
+ // Sort jobs per provisioner by CreatedAt
1202
+ for daemonID := range jobRanks {
1203
+ sort .Slice (jobRanks [daemonID ], func (i , j int ) bool {
1204
+ return jobRanks [daemonID ][i ].CreatedAt .Before (jobRanks [daemonID ][j ].CreatedAt )
1205
+ })
1206
+ }
1207
+
1208
+ // Step 5: Compute queue position & max queue size across all provisioners
1209
+ jobQueueStats := make (map [uuid.UUID ]database.GetProvisionerJobsByIDsWithQueuePositionRow )
1210
+ for _ , jobs := range jobRanks {
1211
+ queueSize := int64 (len (jobs )) // Queue size per provisioner
1212
+ for i , job := range jobs {
1213
+ queuePosition := int64 (i + 1 )
1214
+
1215
+ // If the job already exists, update only if this queuePosition is better
1216
+ if existing , exists := jobQueueStats [job .ID ]; exists {
1217
+ jobQueueStats [job .ID ] = database.GetProvisionerJobsByIDsWithQueuePositionRow {
1218
+ ID : job .ID ,
1219
+ CreatedAt : job .CreatedAt ,
1220
+ ProvisionerJob : job ,
1221
+ QueuePosition : min (existing .QueuePosition , queuePosition ),
1222
+ QueueSize : max (existing .QueueSize , queueSize ), // Take the maximum queue size across provisioners
1223
+ }
1224
+ } else {
1225
+ jobQueueStats [job .ID ] = database.GetProvisionerJobsByIDsWithQueuePositionRow {
1226
+ ID : job .ID ,
1227
+ CreatedAt : job .CreatedAt ,
1228
+ ProvisionerJob : job ,
1229
+ QueuePosition : queuePosition ,
1230
+ QueueSize : queueSize ,
1231
+ }
1232
+ }
1233
+ }
1234
+ }
1235
+
1236
+ // Step 6: Compute the final results with minimal checks
1237
+ var results []database.GetProvisionerJobsByIDsWithQueuePositionRow
1238
+ for _ , job := range filteredJobs {
1239
+ // If the job has a computed rank, use it
1240
+ if rank , found := jobQueueStats [job .ID ]; found {
1241
+ results = append (results , rank )
1242
+ } else {
1243
+ // Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
1244
+ results = append (results , database.GetProvisionerJobsByIDsWithQueuePositionRow {
1245
+ ID : job .ID ,
1246
+ CreatedAt : job .CreatedAt ,
1247
+ ProvisionerJob : job ,
1248
+ QueuePosition : 0 ,
1249
+ QueueSize : 0 ,
1250
+ })
1251
+ }
1252
+ }
1253
+
1254
+ // Step 7: Sort results by CreatedAt
1255
+ sort .Slice (results , func (i , j int ) bool {
1256
+ return results [i ].CreatedAt .Before (results [j ].CreatedAt )
1257
+ })
1258
+
1259
+ return results , nil
1238
1260
}
1239
1261
1240
1262
func (* FakeQuerier ) AcquireLock (_ context.Context , _ int64 ) error {
0 commit comments