Skip to content

Commit 3796e17

Browse files
committed
feat(coderd/database): update AcquireProvisionerJob query to allow specifying exact tag match behaviour
1 parent 91c3df7 commit 3796e17

File tree

9 files changed

+150
-32
lines changed

9 files changed

+150
-32
lines changed

coderd/database/dbfake/dbfake.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,9 @@ func (b WorkspaceBuildBuilder) Do() WorkspaceResponse {
192192
UUID: uuid.New(),
193193
Valid: true,
194194
},
195-
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
196-
Tags: []byte(`{"scope": "organization"}`),
195+
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
196+
Tags: []byte(`{"scope": "organization", "owner": ""}`),
197+
ExactTagMatch: false,
197198
})
198199
require.NoError(b.t, err, "acquire starting job")
199200
if j.ID == job.ID {

coderd/database/dbgen/dbgen.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,10 +417,11 @@ func ProvisionerJob(t testing.TB, db database.Store, ps pubsub.Pubsub, orig data
417417
}
418418
if !orig.StartedAt.Time.IsZero() {
419419
job, err = db.AcquireProvisionerJob(genCtx, database.AcquireProvisionerJobParams{
420-
StartedAt: orig.StartedAt,
421-
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
422-
Tags: must(json.Marshal(orig.Tags)),
423-
WorkerID: uuid.NullUUID{},
420+
StartedAt: orig.StartedAt,
421+
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
422+
Tags: must(json.Marshal(orig.Tags)),
423+
WorkerID: uuid.NullUUID{},
424+
ExactTagMatch: false,
424425
})
425426
require.NoError(t, err)
426427
// There is no easy way to make sure we acquire the correct job.

coderd/database/dbmem/dbmem.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,19 @@ var deletedUserLinkError = &pq.Error{
748748
Routine: "exec_stmt_raise",
749749
}
750750

751+
func tagsEqual(m1, m2 map[string]string) bool {
752+
return tagsSubset(m1, m2) && tagsSubset(m2, m1)
753+
}
754+
755+
func tagsSubset(needle, haystack map[string]string) bool {
756+
for k := range needle {
757+
if needle[k] != haystack[k] {
758+
return false
759+
}
760+
}
761+
return true
762+
}
763+
751764
func (*FakeQuerier) AcquireLock(_ context.Context, _ int64) error {
752765
return xerrors.New("AcquireLock must only be called within a transaction")
753766
}
@@ -783,19 +796,11 @@ func (q *FakeQuerier) AcquireProvisionerJob(_ context.Context, arg database.Acqu
783796
}
784797
}
785798

786-
missing := false
787-
for key, value := range provisionerJob.Tags {
788-
provided, found := tags[key]
789-
if !found {
790-
missing = true
791-
break
792-
}
793-
if provided != value {
794-
missing = true
795-
break
796-
}
799+
matchFunc := tagsSubset
800+
if arg.ExactTagMatch {
801+
matchFunc = tagsEqual
797802
}
798-
if missing {
803+
if !matchFunc(tags, provisionerJob.Tags) {
799804
continue
800805
}
801806
provisionerJob.StartedAt = arg.StartedAt

coderd/database/queries.sql.go

Lines changed: 12 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/provisionerjobs.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ WHERE
2121
nested.started_at IS NULL
2222
-- Ensure the caller has the correct provisioner.
2323
AND nested.provisioner = ANY(@types :: provisioner_type [ ])
24-
-- Ensure the caller satisfies all job tags.
25-
AND nested.tags <@ @tags :: jsonb
24+
-- Ensure the caller satisfies all job tags according to the operator
25+
AND CASE
26+
WHEN @exact_tag_match :: boolean THEN nested.tags = @tags :: jsonb
27+
ELSE
28+
nested.tags = @tags :: jsonb
29+
END
2630
ORDER BY
2731
nested.created_at
2832
FOR UPDATE

coderd/provisionerdserver/acquirer.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ type Acquirer struct {
4949
mu sync.Mutex
5050
q map[dKey]domain
5151

52+
exactTagMatch bool
53+
5254
// testing only
5355
backupPollDuration time.Duration
5456
}
@@ -61,6 +63,12 @@ func TestingBackupPollDuration(dur time.Duration) AcquirerOption {
6163
}
6264
}
6365

66+
func WithExactTagMatch() AcquirerOption {
67+
return func(a *Acquirer) {
68+
a.exactTagMatch = true
69+
}
70+
}
71+
6472
// AcquirerStore is the subset of database.Store that the Acquirer needs
6573
type AcquirerStore interface {
6674
AcquireProvisionerJob(context.Context, database.AcquireProvisionerJobParams) (database.ProvisionerJob, error)
@@ -76,6 +84,7 @@ func NewAcquirer(ctx context.Context, logger slog.Logger, store AcquirerStore, p
7684
ps: ps,
7785
q: make(map[dKey]domain),
7886
backupPollDuration: backupPollDuration,
87+
exactTagMatch: false,
7988
}
8089
for _, opt := range opts {
8190
opt(a)
@@ -96,7 +105,8 @@ func (a *Acquirer) AcquireJob(
96105
logger := a.logger.With(
97106
slog.F("worker_id", worker),
98107
slog.F("provisioner_types", pt),
99-
slog.F("tags", tags))
108+
slog.F("tags", tags),
109+
slog.F("exact_tag_match", a.exactTagMatch))
100110
logger.Debug(ctx, "acquiring job")
101111
dk := domainKey(pt, tags)
102112
dbTags, err := tags.ToJSON()
@@ -128,8 +138,9 @@ func (a *Acquirer) AcquireJob(
128138
UUID: worker,
129139
Valid: true,
130140
},
131-
Types: pt,
132-
Tags: dbTags,
141+
Types: pt,
142+
Tags: dbTags,
143+
ExactTagMatch: a.exactTagMatch,
133144
})
134145
if xerrors.Is(err, sql.ErrNoRows) {
135146
logger.Debug(ctx, "no job available")

coderd/provisionerdserver/acquirer_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/google/uuid"
12+
"github.com/sqlc-dev/pqtype"
1213
"github.com/stretchr/testify/assert"
1314
"github.com/stretchr/testify/require"
1415
"go.uber.org/goleak"
@@ -18,6 +19,8 @@ import (
1819
"cdr.dev/slog/sloggers/slogtest"
1920
"github.com/coder/coder/v2/coderd/database"
2021
"github.com/coder/coder/v2/coderd/database/dbmem"
22+
"github.com/coder/coder/v2/coderd/database/dbtestutil"
23+
"github.com/coder/coder/v2/coderd/database/dbtime"
2124
"github.com/coder/coder/v2/coderd/database/provisionerjobs"
2225
"github.com/coder/coder/v2/coderd/database/pubsub"
2326
"github.com/coder/coder/v2/coderd/provisionerdserver"
@@ -315,6 +318,91 @@ func TestAcquirer_UnblockOnCancel(t *testing.T) {
315318
require.Equal(t, jobID, job.ID)
316319
}
317320

321+
func TestAcquirer_ExactTagMatch(t *testing.T) {
322+
t.Parallel()
323+
if testing.Short() {
324+
t.Skip("skipping this test due to -short")
325+
}
326+
327+
for _, tt := range []struct {
328+
name string
329+
provisionerJobTags map[string]string
330+
acquireJobTags map[string]string
331+
expectAcquire bool
332+
}{
333+
{
334+
name: "match",
335+
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "foo": "bar"},
336+
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "foo": "bar"},
337+
expectAcquire: true,
338+
},
339+
{
340+
name: "subset",
341+
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "foo": "bar"},
342+
acquireJobTags: map[string]string{"scope": "organization", "owner": ""},
343+
expectAcquire: false,
344+
},
345+
{
346+
name: "key mismatch",
347+
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "fop": "bar"},
348+
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "foo": "bar"},
349+
expectAcquire: false,
350+
},
351+
{
352+
name: "value mismatch",
353+
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "foo": "baz"},
354+
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "foo": "bar"},
355+
expectAcquire: false,
356+
},
357+
} {
358+
tt := tt
359+
t.Run(tt.name, func(t *testing.T) {
360+
t.Parallel()
361+
362+
ctx := testutil.Context(t, testutil.WaitShort)
363+
// NOTE: explicitly not using fake store for this test.
364+
db, ps := dbtestutil.NewDB(t)
365+
log := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
366+
org, err := db.InsertOrganization(ctx, database.InsertOrganizationParams{
367+
ID: uuid.New(),
368+
Name: "test org",
369+
Description: "the organization of testing",
370+
CreatedAt: dbtime.Now(),
371+
UpdatedAt: dbtime.Now(),
372+
})
373+
require.NoError(t, err)
374+
pj, err := db.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
375+
ID: uuid.New(),
376+
CreatedAt: dbtime.Now(),
377+
UpdatedAt: dbtime.Now(),
378+
OrganizationID: org.ID,
379+
InitiatorID: uuid.New(),
380+
Provisioner: database.ProvisionerTypeEcho,
381+
StorageMethod: database.ProvisionerStorageMethodFile,
382+
FileID: uuid.New(),
383+
Type: database.ProvisionerJobTypeWorkspaceBuild,
384+
Input: []byte("{}"),
385+
Tags: tt.provisionerJobTags,
386+
TraceMetadata: pqtype.NullRawMessage{},
387+
})
388+
require.NoError(t, err)
389+
ptypes := []database.ProvisionerType{database.ProvisionerTypeEcho}
390+
opts := []provisionerdserver.AcquirerOption{
391+
provisionerdserver.WithExactTagMatch(),
392+
}
393+
acq := provisionerdserver.NewAcquirer(ctx, log, db, ps, opts...)
394+
aj, err := acq.AcquireJob(ctx, uuid.New(), ptypes, tt.acquireJobTags)
395+
if tt.expectAcquire {
396+
require.NoError(t, err)
397+
require.Equal(t, pj.ID, aj.ID)
398+
} else {
399+
require.ErrorIs(t, err, context.DeadlineExceeded, "should have timed out")
400+
require.Empty(t, aj, "should not have acquired job")
401+
}
402+
})
403+
}
404+
}
405+
318406
func postJob(t *testing.T, ps pubsub.Pubsub, pt database.ProvisionerType, tags provisionerdserver.Tags) {
319407
t.Helper()
320408
msg, err := json.Marshal(provisionerjobs.JobPosting{

coderd/provisionerdserver/provisionerdserver_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,8 @@ func TestUpdateJob(t *testing.T) {
573573
UUID: srvID,
574574
Valid: true,
575575
},
576-
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
576+
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
577+
ExactTagMatch: false,
577578
})
578579
require.NoError(t, err)
579580
return job.ID

enterprise/coderd/schedule/template_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,9 @@ func TestTemplateUpdateBuildDeadlines(t *testing.T) {
181181
UUID: uuid.New(),
182182
Valid: true,
183183
},
184-
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
185-
Tags: json.RawMessage(fmt.Sprintf(`{%q: "yeah"}`, c.name)),
184+
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
185+
Tags: json.RawMessage(fmt.Sprintf(`{%q: "yeah"}`, c.name)),
186+
ExactTagMatch: false,
186187
})
187188
require.NoError(t, err)
188189
require.Equal(t, job.ID, acquiredJob.ID)

0 commit comments

Comments
 (0)