Skip to content

feat: integrate Acquirer for provisioner jobs #9717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
code review improvements & fixes
Signed-off-by: Spike Curtis <spike@coder.com>
  • Loading branch information
spikecurtis committed Sep 13, 2023
commit 1ceac512331283ed449e3348f76dc6f12b2a017b
31 changes: 21 additions & 10 deletions coderd/provisionerdserver/acquirer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
)

const (
EventJobPosted = "provisioner_job_posted"
dbMaxBackoff = 10 * time.Second
EventJobPosted = "provisioner_job_posted"
dbMaxBackoff = 10 * time.Second
// backPollDuration is the period for the backup polling described in Acquirer comment
backupPollDuration = 30 * time.Second
)

Expand Down Expand Up @@ -201,17 +202,17 @@ func (a *Acquirer) cancel(dk dKey, clearance chan<- struct{}) error {
defer a.mu.Unlock()
d, ok := a.q[dk]
if !ok {
// this is a code error, as something removed the dKey early, or cancel
// this is a code error, as something removed the domain early, or cancel
// was called twice.
err := xerrors.New("canceled non-existent job acquisition")
err := xerrors.New("cancel for domain that doesn't exist")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
w, ok := d.acquirees[clearance]
if !ok {
// this is a code error, as something removed the dKey early, or cancel
// this is a code error, as something removed the acquiree early, or cancel
// was called twice.
err := xerrors.New("canceled non-existent job acquisition")
err := xerrors.New("cancel for an acquiree that doesn't exist")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
Expand Down Expand Up @@ -245,17 +246,17 @@ func (a *Acquirer) done(dk dKey, clearance chan struct{}) error {
defer a.mu.Unlock()
d, ok := a.q[dk]
if !ok {
// this is a code error, as something removed the dKey early, or done
// this is a code error, as something removed the domain early, or done
// was called twice.
err := xerrors.New("done with non-existent job acquisition")
err := xerrors.New("done for a domain that doesn't exist")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
w, ok := d.acquirees[clearance]
if !ok {
// this is a code error, as something removed the dKey early, or done
// was called twice.
err := xerrors.New("canceled non-existent job acquisition")
err := xerrors.New("done for an acquiree that doesn't exist")
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
return err
}
Expand Down Expand Up @@ -401,9 +402,19 @@ func (*Acquirer) clearOrPendLocked(d domain) {

type dKey string

// domainKey generates a canonical map key for the given provisioner types and
// tags. It uses the null byte (0x00) as a delimiter because it is an
// unprintable control character and won't show up in any "reasonable" set of
// string tags, even in non-Latin scripts. It is important that Tags are
// validated not to contain this control character prior to use.
func domainKey(pt []database.ProvisionerType, tags Tags) dKey {
// make a copy of pt before sorting, so that we don't mutate the original
// slice or underlying array.
pts := make([]database.ProvisionerType, len(pt))
copy(pts, pt)
slices.Sort(pts)
sb := strings.Builder{}
for _, t := range pt {
for _, t := range pts {
_, _ = sb.WriteString(string(t))
_ = sb.WriteByte(0x00)
}
Expand Down
41 changes: 17 additions & 24 deletions coderd/provisionerdserver/acquirer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ func TestAcquirer_Single(t *testing.T) {
}
acquiree := newTestAcquiree(t, workerID, pt, tags)
jobID := uuid.New()
go func() {
err := fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
assert.NoError(t, err)
}()
err := fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
acquiree.startAcquire(ctx, uut)
job := acquiree.success(ctx)
require.Equal(t, jobID, job.ID)
Expand Down Expand Up @@ -89,14 +87,12 @@ func TestAcquirer_MultipleSameDomain(t *testing.T) {
acquirees = append(acquirees, a)
a.startAcquire(ctx, uut)
}
go func() {
for i := 0; i < 10; i++ {
jobID := uuid.New()
jobIDs[jobID] = true
err := fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
assert.NoError(t, err)
}
}()
for i := 0; i < 10; i++ {
jobID := uuid.New()
jobIDs[jobID] = true
err := fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
}
gotJobIDs := make(map[uuid.UUID]bool)
for i := 0; i < 10; i++ {
j := acquirees[i].success(ctx)
Expand Down Expand Up @@ -129,12 +125,10 @@ func TestAcquirer_WaitsOnNoJobs(t *testing.T) {
}
acquiree := newTestAcquiree(t, workerID, pt, tags)
jobID := uuid.New()
go func() {
err := fs.sendCtx(ctx, database.ProvisionerJob{}, sql.ErrNoRows)
assert.NoError(t, err)
err = fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
assert.NoError(t, err)
}()
err := fs.sendCtx(ctx, database.ProvisionerJob{}, sql.ErrNoRows)
require.NoError(t, err)
err = fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
acquiree.startAcquire(ctx, uut)
require.Eventually(t, func() bool {
fs.mu.Lock()
Expand Down Expand Up @@ -274,12 +268,10 @@ func TestAcquirer_BackupPoll(t *testing.T) {
}
acquiree := newTestAcquiree(t, workerID, pt, tags)
jobID := uuid.New()
go func() {
err := fs.sendCtx(ctx, database.ProvisionerJob{}, sql.ErrNoRows)
assert.NoError(t, err)
err = fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
assert.NoError(t, err)
}()
err := fs.sendCtx(ctx, database.ProvisionerJob{}, sql.ErrNoRows)
require.NoError(t, err)
err = fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
acquiree.startAcquire(ctx, uut)
job := acquiree.success(ctx)
require.Equal(t, jobID, job.ID)
Expand Down Expand Up @@ -323,6 +315,7 @@ func TestAcquirer_UnblockOnCancel(t *testing.T) {
}

func postJob(t *testing.T, ps pubsub.Pubsub, pt database.ProvisionerType, tags provisionerdserver.Tags) {
t.Helper()
msg, err := json.Marshal(provisionerdserver.JobPosting{
ProvisionerType: pt,
Tags: tags,
Expand Down