Skip to content

Commit 7078fc2

Browse files
fix: optimize queue position sql query
1 parent 3a6d5f5 commit 7078fc2

File tree

10 files changed

+88
-34
lines changed

10 files changed

+88
-34
lines changed

coderd/database/dbauthz/dbauthz.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2332,7 +2332,7 @@ func (q *querier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID)
23322332
return provisionerJobs, nil
23332333
}
23342334

2335-
func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
2335+
func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
23362336
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
23372337
// Details in https://github.com/coder/coder/issues/16160
23382338
return q.db.GetProvisionerJobsByIDsWithQueuePosition(ctx, ids)

coderd/database/dbmem/dbmem.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4677,14 +4677,14 @@ func (q *FakeQuerier) GetProvisionerJobsByIDs(_ context.Context, ids []uuid.UUID
46774677
return jobs, nil
46784678
}
46794679

4680-
func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
4680+
func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, arg database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
46814681
q.mutex.RLock()
46824682
defer q.mutex.RUnlock()
46834683

4684-
if ids == nil {
4685-
ids = []uuid.UUID{}
4684+
if arg.IDs == nil {
4685+
arg.IDs = []uuid.UUID{}
46864686
}
4687-
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, ids)
4687+
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, arg.IDs)
46884688
}
46894689

46904690
func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) {

coderd/database/dbmetrics/querymetrics.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/querier.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/querier_test.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/coder/coder/v2/coderd/provisionerdserver"
13+
1214
"github.com/google/uuid"
1315
"github.com/prometheus/client_golang/prometheus"
1416
"github.com/stretchr/testify/assert"
@@ -1268,7 +1270,10 @@ func TestQueuePosition(t *testing.T) {
12681270
Tags: database.StringMap{},
12691271
})
12701272

1271-
queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
1273+
queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
1274+
IDs: jobIDs,
1275+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
1276+
})
12721277
require.NoError(t, err)
12731278
require.Len(t, queued, jobCount)
12741279
sort.Slice(queued, func(i, j int) bool {
@@ -1296,7 +1301,10 @@ func TestQueuePosition(t *testing.T) {
12961301
require.NoError(t, err)
12971302
require.Equal(t, jobs[0].ID, job.ID)
12981303

1299-
queued, err = db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
1304+
queued, err = db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
1305+
IDs: jobIDs,
1306+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
1307+
})
13001308
require.NoError(t, err)
13011309
require.Len(t, queued, jobCount)
13021310
sort.Slice(queued, func(i, j int) bool {
@@ -2550,7 +2558,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) {
25502558
}
25512559

25522560
// When: we fetch the jobs by their IDs
2553-
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, filteredJobIDs)
2561+
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
2562+
IDs: filteredJobIDs,
2563+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
2564+
})
25542565
require.NoError(t, err)
25552566
require.Len(t, actualJobs, len(filteredJobs), "should return all unskipped jobs")
25562567

@@ -2693,7 +2704,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_MixedStatuses(t *testing.T) {
26932704
}
26942705

26952706
// When: we fetch the jobs by their IDs
2696-
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
2707+
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
2708+
IDs: jobIDs,
2709+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
2710+
})
26972711
require.NoError(t, err)
26982712
require.Len(t, actualJobs, len(allJobs), "should return all jobs")
26992713

@@ -2788,7 +2802,10 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T)
27882802
}
27892803

27902804
// When: we fetch the jobs by their IDs
2791-
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
2805+
actualJobs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
2806+
IDs: jobIDs,
2807+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
2808+
})
27922809
require.NoError(t, err)
27932810
require.Len(t, actualJobs, len(allJobs), "should return all jobs")
27942811

coderd/database/queries.sql.go

Lines changed: 15 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/provisionerjobs.sql

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,21 @@ pending_jobs AS (
8080
WHERE
8181
job_status = 'pending'
8282
),
83+
online_provisioner_daemons AS (
84+
SELECT id, tags FROM provisioner_daemons pd
85+
WHERE pd.last_seen_at IS NOT NULL AND pd.last_seen_at >= (NOW() - (@stale_interval_ms::bigint || ' ms')::interval)
86+
),
8387
ranked_jobs AS (
8488
-- Step 3: Rank only pending jobs based on provisioner availability
8589
SELECT
8690
pj.id,
8791
pj.created_at,
88-
ROW_NUMBER() OVER (PARTITION BY pd.id ORDER BY pj.created_at ASC) AS queue_position,
89-
COUNT(*) OVER (PARTITION BY pd.id) AS queue_size
92+
ROW_NUMBER() OVER (PARTITION BY opd.id ORDER BY pj.created_at ASC) AS queue_position,
93+
COUNT(*) OVER (PARTITION BY opd.id) AS queue_size
9094
FROM
9195
pending_jobs pj
92-
INNER JOIN provisioner_daemons pd
93-
ON provisioner_tagset_contains(pd.tags, pj.tags) -- Join only on the small pending set
96+
INNER JOIN online_provisioner_daemons opd
97+
ON provisioner_tagset_contains(opd.tags, pj.tags) -- Join only on the small pending set
9498
),
9599
final_jobs AS (
96100
-- Step 4: Compute best queue position and max queue size per job

coderd/templateversions.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ func (api *API) templateVersion(rw http.ResponseWriter, r *http.Request) {
5353
ctx := r.Context()
5454
templateVersion := httpmw.TemplateVersionParam(r)
5555

56-
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
56+
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
57+
IDs: []uuid.UUID{templateVersion.JobID},
58+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
59+
})
5760
if err != nil || len(jobs) == 0 {
5861
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
5962
Message: "Internal error fetching provisioner job.",
@@ -182,7 +185,10 @@ func (api *API) patchTemplateVersion(rw http.ResponseWriter, r *http.Request) {
182185
return
183186
}
184187

185-
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
188+
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
189+
IDs: []uuid.UUID{templateVersion.JobID},
190+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
191+
})
186192
if err != nil || len(jobs) == 0 {
187193
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
188194
Message: "Internal error fetching provisioner job.",
@@ -733,7 +739,10 @@ func (api *API) fetchTemplateVersionDryRunJob(rw http.ResponseWriter, r *http.Re
733739
return database.GetProvisionerJobsByIDsWithQueuePositionRow{}, false
734740
}
735741

736-
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{jobUUID})
742+
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
743+
IDs: []uuid.UUID{jobUUID},
744+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
745+
})
737746
if httpapi.Is404Error(err) {
738747
httpapi.Write(ctx, rw, http.StatusNotFound, codersdk.Response{
739748
Message: fmt.Sprintf("Provisioner job %q not found.", jobUUID),
@@ -865,7 +874,10 @@ func (api *API) templateVersionsByTemplate(rw http.ResponseWriter, r *http.Reque
865874
for _, version := range versions {
866875
jobIDs = append(jobIDs, version.JobID)
867876
}
868-
jobs, err := store.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
877+
jobs, err := store.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
878+
IDs: jobIDs,
879+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
880+
})
869881
if err != nil {
870882
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
871883
Message: "Internal error fetching provisioner job.",
@@ -933,7 +945,10 @@ func (api *API) templateVersionByName(rw http.ResponseWriter, r *http.Request) {
933945
})
934946
return
935947
}
936-
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
948+
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
949+
IDs: []uuid.UUID{templateVersion.JobID},
950+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
951+
})
937952
if err != nil || len(jobs) == 0 {
938953
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
939954
Message: "Internal error fetching provisioner job.",
@@ -1013,7 +1028,10 @@ func (api *API) templateVersionByOrganizationTemplateAndName(rw http.ResponseWri
10131028
})
10141029
return
10151030
}
1016-
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{templateVersion.JobID})
1031+
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
1032+
IDs: []uuid.UUID{templateVersion.JobID},
1033+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
1034+
})
10171035
if err != nil || len(jobs) == 0 {
10181036
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
10191037
Message: "Internal error fetching provisioner job.",
@@ -1115,7 +1133,10 @@ func (api *API) previousTemplateVersionByOrganizationTemplateAndName(rw http.Res
11151133
return
11161134
}
11171135

1118-
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, []uuid.UUID{previousTemplateVersion.JobID})
1136+
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
1137+
IDs: []uuid.UUID{previousTemplateVersion.JobID},
1138+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
1139+
})
11191140
if err != nil || len(jobs) == 0 {
11201141
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
11211142
Message: "Internal error fetching provisioner job.",

coderd/workspacebuilds.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,10 @@ func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []datab
797797
for _, build := range workspaceBuilds {
798798
jobIDs = append(jobIDs, build.JobID)
799799
}
800-
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
800+
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
801+
IDs: jobIDs,
802+
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
803+
})
801804
if err != nil && !errors.Is(err, sql.ErrNoRows) {
802805
return workspaceBuildsData{}, xerrors.Errorf("get provisioner jobs: %w", err)
803806
}

0 commit comments

Comments
 (0)