Skip to content

fix: optimize queue position sql query #17974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion coderd/database/dbauthz/dbauthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -2341,7 +2341,7 @@ func (q *querier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID)
return provisionerJobs, nil
}

func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
// Details in https://github.com/coder/coder/issues/16160
return q.db.GetProvisionerJobsByIDsWithQueuePosition(ctx, ids)
Expand Down
2 changes: 1 addition & 1 deletion coderd/database/dbauthz/dbauthz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4345,7 +4345,7 @@ func (s *MethodTestSuite) TestSystemFunctions() {
check.Args([]uuid.UUID{uuid.New()}).Asserts(rbac.ResourceSystem, policy.ActionRead)
}))
s.Run("GetProvisionerJobsByIDsWithQueuePosition", s.Subtest(func(db database.Store, check *expects) {
check.Args([]uuid.UUID{}).Asserts()
check.Args(database.GetProvisionerJobsByIDsWithQueuePositionParams{}).Asserts()
}))
s.Run("GetReplicaByID", s.Subtest(func(db database.Store, check *expects) {
check.Args(uuid.New()).Asserts(rbac.ResourceSystem, policy.ActionRead).Errors(sql.ErrNoRows)
Expand Down
8 changes: 4 additions & 4 deletions coderd/database/dbmem/dbmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -4684,14 +4684,14 @@ func (q *FakeQuerier) GetProvisionerJobsByIDs(_ context.Context, ids []uuid.UUID
return jobs, nil
}

func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, arg database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()

if ids == nil {
ids = []uuid.UUID{}
if arg.IDs == nil {
arg.IDs = []uuid.UUID{}
}
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, ids)
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, arg.IDs)
}

func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) {
Expand Down
2 changes: 1 addition & 1 deletion coderd/database/dbmetrics/querymetrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions coderd/database/dbmock/dbmock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion coderd/database/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 21 additions & 6 deletions coderd/database/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/require"

"cdr.dev/slog/sloggers/slogtest"

"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/db2sdk"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/coder/coder/v2/coderd/database/migrations"
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/prebuilds"
"github.com/coder/coder/v2/coderd/provisionerdserver"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/provisionersdk"
Expand Down Expand Up @@ -1268,7 +1268,10 @@ func TestQueuePosition(t *testing.T) {
Tags: database.StringMap{},
})

queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, queued, jobCount)
sort.Slice(queued, func(i, j int) bool {
Expand Down Expand Up @@ -1296,7 +1299,10 @@ func TestQueuePosition(t *testing.T) {
require.NoError(t, err)
require.Equal(t, jobs[0].ID, job.ID)

queued, err = db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
queued, err = db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, queued, jobCount)
sort.Slice(queued, func(i, j int) bool {
Expand Down Expand Up @@ -2550,7 +2556,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) {
}

// When: we fetch the jobs by their IDs
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, filteredJobIDs)
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: filteredJobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, actualJobs, len(filteredJobs), "should return all unskipped jobs")

Expand Down Expand Up @@ -2693,7 +2702,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) {
}

// When: we fetch the jobs by their IDs
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, actualJobs, len(allJobs), "should return all jobs")

Expand Down Expand Up @@ -2788,7 +2800,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T)
}

// When: we fetch the jobs by their IDs
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, actualJobs, len(allJobs), "should return all jobs")

Expand Down
21 changes: 15 additions & 6 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions coderd/database/queries/provisionerjobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,21 @@ pending_jobs AS (
WHERE
job_status = 'pending'
),
online_provisioner_daemons AS (
SELECT id, tags FROM provisioner_daemons pd
WHERE pd.last_seen_at IS NOT NULL AND pd.last_seen_at >= (NOW() - (@stale_interval_ms::bigint || ' ms')::interval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a more descriptive param like provisioner_daemon_stale_interval_ms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use @stale_interval_ms everywhere - so I kept it for consistency.

),
ranked_jobs AS (
-- Step 3: Rank only pending jobs based on provisioner availability
SELECT
pj.id,
pj.created_at,
ROW_NUMBER() OVER (PARTITION BY pd.id ORDER BY pj.created_at ASC) AS queue_position,
COUNT(*) OVER (PARTITION BY pd.id) AS queue_size
ROW_NUMBER() OVER (PARTITION BY opd.id ORDER BY pj.created_at ASC) AS queue_position,
COUNT(*) OVER (PARTITION BY opd.id) AS queue_size
FROM
pending_jobs pj
INNER JOIN provisioner_daemons pd
ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small pending set
INNER JOIN online_provisioner_daemons opd
ON provisioner_tagset_contains(opd.tags, pj.tags) -- Join only on the small pending set
),
final_jobs AS (
-- Step 4: Compute best queue position and max queue size per job
Expand Down
35 changes: 28 additions & 7 deletions coderd/templateversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func (api *API) templateVersion(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
templateVersion := httpmw.TemplateVersionParam(r)

jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{templateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand Down Expand Up @@ -182,7 +185,10 @@ func (api *API) patchTemplateVersion(rw http.ResponseWriter, r *http.Request) {
return
}

jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{templateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand Down Expand Up @@ -733,7 +739,10 @@ func (api *API) fetchTemplateVersionDryRunJob(rw http.ResponseWriter, r *http.Re
return database.GetProvisionerJobsByIDsWithQueuePositionRow{}, false
}

jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{jobUUID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{jobUUID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if httpapi.Is404Error(err) {
httpapi.Write(ctx, rw, http.StatusNotFound, codersdk.Response{
Message: fmt.Sprintf("Provisioner job %q not found.", jobUUID),
Expand Down Expand Up @@ -865,7 +874,10 @@ func (api *API) templateVersionsByTemplate(rw http.ResponseWriter, r *http.Reque
for _, version := range versions {
jobIDs = append(jobIDs, version.JobID)
}
jobs, err := store.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
jobs, err := store.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand Down Expand Up @@ -933,7 +945,10 @@ func (api *API) templateVersionByName(rw http.ResponseWriter, r *http.Request) {
})
return
}
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{templateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand Down Expand Up @@ -1013,7 +1028,10 @@ func (api *API) templateVersionByOrganizationTemplateAndName(rw http.ResponseWri
})
return
}
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{templateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand Down Expand Up @@ -1115,7 +1133,10 @@ func (api *API) previousTemplateVersionByOrganizationTemplateAndName(rw http.Res
return
}

jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{previousTemplateVersion.JobID})
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: []uuid.UUID{previousTemplateVersion.JobID},
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil || len(jobs) == 0 {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Expand Down
5 changes: 4 additions & 1 deletion coderd/workspacebuilds.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,10 @@ func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []datab
for _, build := range workspaceBuilds {
jobIDs = append(jobIDs, build.JobID)
}
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("get provisioner jobs: %w", err)
}
Expand Down
Loading