From c237534f316bfb8560f2aff07efd200a6b45d85d Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Wed, 11 Dec 2024 19:22:15 +0000 Subject: [PATCH 1/3] fix(coderd/database): exclude canceled jobs in queue position for GetProvisionerJobsByIDsWithQueuePosition --- coderd/database/dbmem/dbmem.go | 105 +++++++++++++---- coderd/database/querier_test.go | 121 ++++++++++++++++++++ coderd/database/queries.sql.go | 12 +- coderd/database/queries/provisionerjobs.sql | 12 +- 4 files changed, 220 insertions(+), 30 deletions(-) diff --git a/coderd/database/dbmem/dbmem.go b/coderd/database/dbmem/dbmem.go index d37b1fc029aa7..1aa98ade6f15d 100644 --- a/coderd/database/dbmem/dbmem.go +++ b/coderd/database/dbmem/dbmem.go @@ -3804,35 +3804,92 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(_ context.Context q.mutex.RLock() defer q.mutex.RUnlock() - jobs := make([]database.GetProvisionerJobsByIDsWithQueuePositionRow, 0) - queuePosition := int64(1) + // WITH pending_jobs AS ( + // SELECT + // id, created_at + // FROM + // provisioner_jobs + // WHERE + // started_at IS NULL + // AND + // canceled_at IS NULL + // AND + // completed_at IS NULL + // AND + // error IS NULL + // ), + type pendingJobRow struct { + ID uuid.UUID + CreatedAt time.Time + } + pendingJobs := make([]pendingJobRow, 0) for _, job := range q.provisionerJobs { - for _, id := range ids { - if id == job.ID { - // clone the Tags before appending, since maps are reference types and - // we don't want the caller to be able to mutate the map we have inside - // dbmem! - job.Tags = maps.Clone(job.Tags) - job := database.GetProvisionerJobsByIDsWithQueuePositionRow{ - ProvisionerJob: job, - } - if !job.ProvisionerJob.StartedAt.Valid { - job.QueuePosition = queuePosition - } - jobs = append(jobs, job) - break - } - } - if !job.StartedAt.Valid { - queuePosition++ + if job.StartedAt.Valid || + job.CanceledAt.Valid || + job.CompletedAt.Valid || + job.Error.Valid { + continue } + pendingJobs = append(pendingJobs, pendingJobRow{ + ID: job.ID, + CreatedAt: job.CreatedAt, + }) } - for _, job := range jobs { - if !job.ProvisionerJob.StartedAt.Valid { - // Set it to the max position! - job.QueueSize = queuePosition + + // queue_position AS ( + // SELECT + // id, + // ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position + // FROM + // pending_jobs + // ), + slices.SortFunc(pendingJobs, func(a, b pendingJobRow) int { + c := a.CreatedAt.Compare(b.CreatedAt) + return c + }) + + queuePosition := make(map[uuid.UUID]int64) + for idx, pj := range pendingJobs { + queuePosition[pj.ID] = int64(idx + 1) + } + + // queue_size AS ( + // SELECT COUNT(*) AS count FROM pending_jobs + // ), + queueSize := len(pendingJobs) + + // SELECT + // sqlc.embed(pj), + // COALESCE(qp.queue_position, 0) AS queue_position, + // COALESCE(qs.count, 0) AS queue_size + // FROM + // provisioner_jobs pj + // LEFT JOIN + // queue_position qp ON pj.id = qp.id + // LEFT JOIN + // queue_size qs ON TRUE + // WHERE + // pj.id IN (...) + jobs := make([]database.GetProvisionerJobsByIDsWithQueuePositionRow, 0) + for _, job := range q.provisionerJobs { + if !slices.Contains(ids, job.ID) { + continue + } + // clone the Tags before appending, since maps are reference types and + // we don't want the caller to be able to mutate the map we have inside + // dbmem! + job.Tags = maps.Clone(job.Tags) + job := database.GetProvisionerJobsByIDsWithQueuePositionRow{ + // sqlc.embed(pj), + ProvisionerJob: job, + // COALESCE(qp.queue_position, 0) AS queue_position, + QueuePosition: queuePosition[job.ID], + // COALESCE(qs.count, 0) AS queue_size + QueueSize: int64(queueSize), } + jobs = append(jobs, job) } + return jobs, nil } diff --git a/coderd/database/querier_test.go b/coderd/database/querier_test.go index 8fb12a5acf923..28d7108ae31ad 100644 --- a/coderd/database/querier_test.go +++ b/coderd/database/querier_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "cdr.dev/slog/sloggers/slogtest" @@ -2037,6 +2038,126 @@ func TestExpectOne(t *testing.T) { }) } +func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.SkipNow() + } + + db, _ := dbtestutil.NewDB(t) + now := dbtime.Now() + ctx := testutil.Context(t, testutil.WaitShort) + + // Given the following provisioner jobs: + allJobs := []database.ProvisionerJob{ + // Pending. This will be the last in the queue because + // it was created most recently. + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{}, + }), + + // Another pending. This will come first in the queue + // because it was created before the previous job. + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-2 * time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{}, + }), + + // Running + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-3 * time.Minute), + StartedAt: sql.NullTime{Valid: true, Time: now}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{}, + }), + + // Succeeded + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-4 * time.Minute), + StartedAt: sql.NullTime{Valid: true, Time: now}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{Valid: true, Time: now}, + Error: sql.NullString{}, + }), + + // Canceling + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-5 * time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{Valid: true, Time: now}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{}, + }), + + // Canceled + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-6 * time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{Valid: true, Time: now}, + CompletedAt: sql.NullTime{Valid: true, Time: now}, + Error: sql.NullString{}, + }), + + // Failed + dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: now.Add(-7 * time.Minute), + StartedAt: sql.NullTime{}, + CanceledAt: sql.NullTime{}, + CompletedAt: sql.NullTime{}, + Error: sql.NullString{String: "failed", Valid: true}, + }), + } + + // Assert invariant: the jobs are in the expected order + require.Len(t, allJobs, 7, "expected 7 jobs") + for idx, status := range []database.ProvisionerJobStatus{ + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusPending, + database.ProvisionerJobStatusRunning, + database.ProvisionerJobStatusSucceeded, + database.ProvisionerJobStatusCanceling, + database.ProvisionerJobStatusCanceled, + database.ProvisionerJobStatusFailed, + } { + require.Equal(t, status, allJobs[idx].JobStatus, "expected job %d to have status %s", idx, status) + } + + var jobIDs []uuid.UUID + for _, job := range allJobs { + jobIDs = append(jobIDs, job.ID) + } + + // When: we fetch the jobs by their IDs + actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs) + require.NoError(t, err) + require.Len(t, actualJobs, len(allJobs), "should return all jobs") + + // Then: the jobs should be returned in the correct order (by IDs in the input slice) + for idx, job := range actualJobs { + assert.EqualValues(t, allJobs[idx], job.ProvisionerJob) + } + + // Then: the queue size should be set correctly + for _, job := range actualJobs { + assert.EqualValues(t, job.QueueSize, 2, "should have queue size 2") + } + + // Then: the queue position should be set correctly: + var queuePositions []int64 + for _, job := range actualJobs { + queuePositions = append(queuePositions, job.QueuePosition) + } + assert.EqualValues(t, []int64{2, 1, 0, 0, 0, 0, 0}, queuePositions, "expected queue positions to be set correctly") +} + func TestGroupRemovalTrigger(t *testing.T) { t.Parallel() diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 4915c61025ea4..8bca63a7c0527 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -5865,23 +5865,29 @@ func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUI } const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByIDsWithQueuePosition :many -WITH unstarted_jobs AS ( +WITH pending_jobs AS ( SELECT id, created_at FROM provisioner_jobs WHERE started_at IS NULL + AND + canceled_at IS NULL + AND + completed_at IS NULL + AND + error IS NULL ), queue_position AS ( SELECT id, ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position FROM - unstarted_jobs + pending_jobs ), queue_size AS ( - SELECT COUNT(*) as count FROM unstarted_jobs + SELECT COUNT(*) AS count FROM pending_jobs ) SELECT pj.id, pj.created_at, pj.updated_at, pj.started_at, pj.canceled_at, pj.completed_at, pj.error, pj.organization_id, pj.initiator_id, pj.provisioner, pj.storage_method, pj.type, pj.input, pj.worker_id, pj.file_id, pj.tags, pj.error_code, pj.trace_metadata, pj.job_status, diff --git a/coderd/database/queries/provisionerjobs.sql b/coderd/database/queries/provisionerjobs.sql index 95e8a88b84e6d..f7ef4eba614c9 100644 --- a/coderd/database/queries/provisionerjobs.sql +++ b/coderd/database/queries/provisionerjobs.sql @@ -50,23 +50,29 @@ WHERE id = ANY(@ids :: uuid [ ]); -- name: GetProvisionerJobsByIDsWithQueuePosition :many -WITH unstarted_jobs AS ( +WITH pending_jobs AS ( SELECT id, created_at FROM provisioner_jobs WHERE started_at IS NULL + AND + canceled_at IS NULL + AND + completed_at IS NULL + AND + error IS NULL ), queue_position AS ( SELECT id, ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position FROM - unstarted_jobs + pending_jobs ), queue_size AS ( - SELECT COUNT(*) as count FROM unstarted_jobs + SELECT COUNT(*) AS count FROM pending_jobs ) SELECT sqlc.embed(pj), From 964ffcfa45863a613a1cd33c314de99f98ddae83 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 12 Dec 2024 08:44:57 +0000 Subject: [PATCH 2/3] Update coderd/database/queries/provisionerjobs.sql Co-authored-by: Mathias Fredriksson --- coderd/database/queries/provisionerjobs.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/database/queries/provisionerjobs.sql b/coderd/database/queries/provisionerjobs.sql index f7ef4eba614c9..ac246d4e2ef68 100644 --- a/coderd/database/queries/provisionerjobs.sql +++ b/coderd/database/queries/provisionerjobs.sql @@ -69,7 +69,7 @@ queue_position AS ( id, ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position FROM - pending_jobs + pending_jobs ), queue_size AS ( SELECT COUNT(*) AS count FROM pending_jobs From 9317815da231529b47f622a13ead6ccc5e6f63e0 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 12 Dec 2024 09:05:23 +0000 Subject: [PATCH 3/3] make gen --- coderd/database/queries.sql.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 8bca63a7c0527..fa448d35f0b8e 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -5884,7 +5884,7 @@ queue_position AS ( id, ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position FROM - pending_jobs + pending_jobs ), queue_size AS ( SELECT COUNT(*) AS count FROM pending_jobs