diff --git a/coderd/database/dbmem/dbmem.go b/coderd/database/dbmem/dbmem.go index d37b1fc029aa7..1aa98ade6f15d 100644 --- a/coderd/database/dbmem/dbmem.go +++ b/coderd/database/dbmem/dbmem.go @@ -3804,35 +3804,92 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(_ context.Context q.mutex.RLock() defer q.mutex.RUnlock() - jobs := make([]database.GetProvisionerJobsByIDsWithQueuePositionRow, 0) - queuePosition := int64(1) + // WITH pending_jobs AS ( + // SELECT + // id, created_at + // FROM + // provisioner_jobs + // WHERE + // started_at IS NULL + // AND + // canceled_at IS NULL + // AND + // completed_at IS NULL + // AND + // error IS NULL + // ), + type pendingJobRow struct { + ID uuid.UUID + CreatedAt time.Time + } + pendingJobs := make([]pendingJobRow, 0) for _, job := range q.provisionerJobs { - for _, id := range ids { - if id == job.ID { - // clone the Tags before appending, since maps are reference types and - // we don't want the caller to be able to mutate the map we have inside - // dbmem! - job.Tags = maps.Clone(job.Tags) - job := database.GetProvisionerJobsByIDsWithQueuePositionRow{ - ProvisionerJob: job, - } - if !job.ProvisionerJob.StartedAt.Valid { - job.QueuePosition = queuePosition - } - jobs = append(jobs, job) - break - } - } - if !job.StartedAt.Valid { - queuePosition++ + if job.StartedAt.Valid || + job.CanceledAt.Valid || + job.CompletedAt.Valid || + job.Error.Valid { + continue } + pendingJobs = append(pendingJobs, pendingJobRow{ + ID: job.ID, + CreatedAt: job.CreatedAt, + }) } - for _, job := range jobs { - if !job.ProvisionerJob.StartedAt.Valid { - // Set it to the max position! - job.QueueSize = queuePosition + + // queue_position AS ( + // SELECT + // id, + // ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position + // FROM + // pending_jobs + // ), + slices.SortFunc(pendingJobs, func(a, b pendingJobRow) int { + c := a.CreatedAt.Compare(b.CreatedAt) + return c + }) + + queuePosition := make(map[uuid.UUID]int64) + for idx, pj := range pendingJobs { + queuePosition[pj.ID] = int64(idx + 1) + } + + // queue_size AS ( + // SELECT COUNT(*) AS count FROM pending_jobs + // ), + queueSize := len(pendingJobs) + + // SELECT + // sqlc.embed(pj), + // COALESCE(qp.queue_position, 0) AS queue_position, + // COALESCE(qs.count, 0) AS queue_size + // FROM + // provisioner_jobs pj + // LEFT JOIN + // queue_position qp ON pj.id = qp.id + // LEFT JOIN + // queue_size qs ON TRUE + // WHERE + // pj.id IN (...) + jobs := make([]database.GetProvisionerJobsByIDsWithQueuePositionRow, 0) + for _, job := range q.provisionerJobs { + if !slices.Contains(ids, job.ID) { + continue + } + // clone the Tags before appending, since maps are reference types and + // we don't want the caller to be able to mutate the map we have inside + // dbmem! + job.Tags = maps.Clone(job.Tags) + job := database.GetProvisionerJobsByIDsWithQueuePositionRow{ + // sqlc.embed(pj), + ProvisionerJob: job, + // COALESCE(qp.queue_position, 0) AS queue_position, + QueuePosition: queuePosition[job.ID], + // COALESCE(qs.count, 0) AS queue_size + QueueSize: int64(queueSize), } + jobs = append(jobs, job) } + return jobs, nil } diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index 8fb12a5acf923..28d7108ae31ad 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "cdr.dev/slog/sloggers/slogtest" @@ -2037,6 +2038,126 @@ func TestExpectOne(t *testing.T) { }) } +func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.SkipNow() + } + + db, _ := dbtestutil.NewDB(t) + now := dbtime.Now() + ctx := testutil.Context(t, testutil.WaitShort) + + // Given the following provisioner jobs: + allJobs := []database.ProvisionerJob{ + // Pending. This will be the last in the queue because + // it was created most recently. + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{}, + }), + + // Another pending. This will come first in the queue + // because it was created before the previous job. + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-2 * time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{}, + }), + + // Running + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-3 * time.Minute), + StartedAt: sql.NullTime{Valid: true, Time: now}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{}, + }), + + // Succeeded + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-4 * time.Minute), + StartedAt: sql.NullTime{Valid: true, Time: now}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{Valid: true, Time: now}, + Error: sql.NullString{}, + }), + + // Canceling + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-5 * time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{Valid: true, Time: now}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{}, + }), + + // Canceled + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-6 * time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{Valid: true, Time: now}, + CompletedAt: sql.NullTime{Valid: true, Time: now}, + Error: sql.NullString{}, + }), + + // Failed + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-7 * time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{String: "failed", Valid: true}, + }), + } + + // Assert invariant: the jobs are in the expected order + require.Len(t, allJobs, 7, "expected 7 jobs") + for idx, status := range []database.ProvisionerJobStatus{ + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusRunning, + database.ProvisionerJobStatusSucceeded, + database.ProvisionerJobStatusCanceling, + database.ProvisionerJobStatusCanceled, + database.ProvisionerJobStatusFailed, + } { + require.Equal(t, status, allJobs[idx].JobStatus, "expected job %d to have status %s", idx, status) + } + + var jobIDs []uuid.UUID + for _, job := range allJobs { + jobIDs = append(jobIDs, job.ID) + } + + // When: we fetch the jobs by their IDs + actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs) + require.NoError(t, err) + require.Len(t, actualJobs, len(allJobs), "should return all jobs") + + // Then: the jobs should be returned in the correct order (by IDs in the input slice) + for idx, job := range actualJobs { + assert.EqualValues(t, allJobs[idx], job.ProvisionerJob) + } + + // Then: the queue size should be set correctly + for _, job := range actualJobs { + assert.EqualValues(t, job.QueueSize, 2, "should have queue size 2") + } + + // Then: the queue position should be set correctly: + var queuePositions []int64 + for _, job := range actualJobs { + queuePositions = append(queuePositions, job.QueuePosition) + } + assert.EqualValues(t, []int64{2, 1, 0, 0, 0, 0, 0}, queuePositions, "expected queue positions to be set correctly") +} + func TestGroupRemovalTrigger(t *testing.T) { t.Parallel() diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 4915c61025ea4..fa448d35f0b8e 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -5865,23 +5865,29 @@ func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUI } const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByIDsWithQueuePosition :many -WITH unstarted_jobs AS ( +WITH pending_jobs AS ( SELECT id, created_at FROM provisioner_jobs WHERE started_at IS NULL + AND + canceled_at IS NULL + AND + completed_at IS NULL + AND + error IS NULL ), queue_position AS ( SELECT id, ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position FROM - unstarted_jobs + pending_jobs ), queue_size AS ( - SELECT COUNT(*) as count FROM unstarted_jobs + SELECT COUNT(*) AS count FROM pending_jobs ) SELECT pj.id, pj.created_at, pj.updated_at, pj.started_at, pj.canceled_at, pj.completed_at, pj.error, pj.organization_id, pj.initiator_id, pj.provisioner, pj.storage_method, pj.type, pj.input, pj.worker_id, pj.file_id, pj.tags, pj.error_code, pj.trace_metadata, pj.job_status, diff --git a/coderd/database/queries/provisionerjobs.sql b/coderd/database/queries/provisionerjobs.sql index 95e8a88b84e6d..ac246d4e2ef68 100644 --- a/coderd/database/queries/provisionerjobs.sql +++ b/coderd/database/queries/provisionerjobs.sql @@ -50,23 +50,29 @@ WHERE id = ANY(@ids :: uuid [ ]); -- name: GetProvisionerJobsByIDsWithQueuePosition :many -WITH unstarted_jobs AS ( +WITH pending_jobs AS ( SELECT id, created_at FROM provisioner_jobs WHERE started_at IS NULL + AND + canceled_at IS NULL + AND + completed_at IS NULL + AND + error IS NULL ), queue_position AS ( SELECT id, ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position FROM - unstarted_jobs + pending_jobs ), queue_size AS ( - SELECT COUNT(*) as count FROM unstarted_jobs + SELECT COUNT(*) AS count FROM pending_jobs ) SELECT sqlc.embed(pj),