@@ -1147,7 +1147,119 @@ func getOwnerFromTags(tags map[string]string) string {
1147
1147
return ""
1148
1148
}
1149
1149
1150
- func (q * FakeQuerier ) getProvisionerJobsByIDsWithQueuePositionLocked (_ context.Context , ids []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow , error ) {
1150
+ // provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
1151
+ func provisionerTagsetContains (daemonTags , jobTags map [string ]string ) bool {
1152
+ for jobKey , jobValue := range jobTags {
1153
+ if daemonValue , exists := daemonTags [jobKey ]; ! exists || daemonValue != jobValue {
1154
+ return false
1155
+ }
1156
+ }
1157
+ return true
1158
+ }
1159
+
1160
+ // GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
1161
+ func (q * FakeQuerier ) getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue (_ context.Context , jobIDs []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow , error ) {
1162
+ // Step 1: Filter provisionerJobs based on jobIDs
1163
+ filteredJobs := make (map [uuid.UUID ]database.ProvisionerJob )
1164
+ for _ , job := range q .provisionerJobs {
1165
+ for _ , id := range jobIDs {
1166
+ if job .ID == id {
1167
+ filteredJobs [job .ID ] = job
1168
+ }
1169
+ }
1170
+ }
1171
+
1172
+ // Step 2: Identify pending jobs
1173
+ pendingJobs := make (map [uuid.UUID ]database.ProvisionerJob )
1174
+ for _ , job := range q .provisionerJobs {
1175
+ if job .JobStatus == "pending" {
1176
+ pendingJobs [job .ID ] = job
1177
+ }
1178
+ }
1179
+
1180
+ // Step 3: Identify pending jobs that have a matching provisioner
1181
+ matchedJobs := make (map [uuid.UUID ]struct {})
1182
+ for _ , job := range pendingJobs {
1183
+ for _ , daemon := range q .provisionerDaemons {
1184
+ if provisionerTagsetContains (daemon .Tags , job .Tags ) {
1185
+ matchedJobs [job .ID ] = struct {}{}
1186
+ break
1187
+ }
1188
+ }
1189
+ }
1190
+
1191
+ // Step 4: Rank pending jobs per provisioner
1192
+ jobRanks := make (map [uuid.UUID ][]database.ProvisionerJob )
1193
+ for _ , job := range pendingJobs {
1194
+ for _ , daemon := range q .provisionerDaemons {
1195
+ if provisionerTagsetContains (daemon .Tags , job .Tags ) {
1196
+ jobRanks [daemon .ID ] = append (jobRanks [daemon .ID ], job )
1197
+ }
1198
+ }
1199
+ }
1200
+
1201
+ // Sort jobs per provisioner by CreatedAt
1202
+ for daemonID := range jobRanks {
1203
+ sort .Slice (jobRanks [daemonID ], func (i , j int ) bool {
1204
+ return jobRanks [daemonID ][i ].CreatedAt .Before (jobRanks [daemonID ][j ].CreatedAt )
1205
+ })
1206
+ }
1207
+
1208
+ // Step 5: Compute queue position & max queue size across all provisioners
1209
+ jobQueueStats := make (map [uuid.UUID ]database.GetProvisionerJobsByIDsWithQueuePositionRow )
1210
+ for _ , jobs := range jobRanks {
1211
+ queueSize := int64 (len (jobs )) // Queue size per provisioner
1212
+ for i , job := range jobs {
1213
+ queuePosition := int64 (i + 1 )
1214
+
1215
+ // If the job already exists, update only if this queuePosition is better
1216
+ if existing , exists := jobQueueStats [job .ID ]; exists {
1217
+ jobQueueStats [job .ID ] = database.GetProvisionerJobsByIDsWithQueuePositionRow {
1218
+ ID : job .ID ,
1219
+ CreatedAt : job .CreatedAt ,
1220
+ ProvisionerJob : job ,
1221
+ QueuePosition : min (existing .QueuePosition , queuePosition ),
1222
+ QueueSize : max (existing .QueueSize , queueSize ), // Take the maximum queue size across provisioners
1223
+ }
1224
+ } else {
1225
+ jobQueueStats [job .ID ] = database.GetProvisionerJobsByIDsWithQueuePositionRow {
1226
+ ID : job .ID ,
1227
+ CreatedAt : job .CreatedAt ,
1228
+ ProvisionerJob : job ,
1229
+ QueuePosition : queuePosition ,
1230
+ QueueSize : queueSize ,
1231
+ }
1232
+ }
1233
+ }
1234
+ }
1235
+
1236
+ // Step 6: Compute the final results with minimal checks
1237
+ var results []database.GetProvisionerJobsByIDsWithQueuePositionRow
1238
+ for _ , job := range filteredJobs {
1239
+ // If the job has a computed rank, use it
1240
+ if rank , found := jobQueueStats [job .ID ]; found {
1241
+ results = append (results , rank )
1242
+ } else {
1243
+ // Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
1244
+ results = append (results , database.GetProvisionerJobsByIDsWithQueuePositionRow {
1245
+ ID : job .ID ,
1246
+ CreatedAt : job .CreatedAt ,
1247
+ ProvisionerJob : job ,
1248
+ QueuePosition : 0 ,
1249
+ QueueSize : 0 ,
1250
+ })
1251
+ }
1252
+ }
1253
+
1254
+ // Step 7: Sort results by CreatedAt
1255
+ sort .Slice (results , func (i , j int ) bool {
1256
+ return results [i ].CreatedAt .Before (results [j ].CreatedAt )
1257
+ })
1258
+
1259
+ return results , nil
1260
+ }
1261
+
1262
+ func (q * FakeQuerier ) getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue (_ context.Context , ids []uuid.UUID ) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow , error ) {
1151
1263
// WITH pending_jobs AS (
1152
1264
// SELECT
1153
1265
// id, created_at
@@ -4149,7 +4261,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte
4149
4261
if ids == nil {
4150
4262
ids = []uuid.UUID {}
4151
4263
}
4152
- return q .getProvisionerJobsByIDsWithQueuePositionLocked (ctx , ids )
4264
+ return q .getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue (ctx , ids )
4153
4265
}
4154
4266
4155
4267
func (q * FakeQuerier ) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner (ctx context.Context , arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams ) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow , error ) {
@@ -4218,7 +4330,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition
4218
4330
LIMIT
4219
4331
sqlc.narg('limit')::int;
4220
4332
*/
4221
- rowsWithQueuePosition , err := q .getProvisionerJobsByIDsWithQueuePositionLocked (ctx , nil )
4333
+ rowsWithQueuePosition , err := q .getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue (ctx , nil )
4222
4334
if err != nil {
4223
4335
return nil , err
4224
4336
}
0 commit comments