Skip to content

Commit d5b4c50

Browse files
committed
fix: Debounce AcquireJob when no jobs are available
This prevents constant database spam at scale to a maximum of 60 queries/s per coderd instance.
1 parent 2042b57 commit d5b4c50

File tree

5 files changed

+75
-15
lines changed

5 files changed

+75
-15
lines changed

cli/server.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ import (
6868
"github.com/coder/coder/provisioner/echo"
6969
"github.com/coder/coder/provisioner/terraform"
7070
"github.com/coder/coder/provisionerd"
71+
"github.com/coder/coder/provisionerd/proto"
7172
"github.com/coder/coder/provisionersdk"
72-
"github.com/coder/coder/provisionersdk/proto"
73+
sdkproto "github.com/coder/coder/provisionersdk/proto"
7374
"github.com/coder/coder/tailnet"
7475
)
7576

@@ -899,7 +900,7 @@ func newProvisionerDaemon(
899900
}
900901

901902
provisioners := provisionerd.Provisioners{
902-
string(database.ProvisionerTypeTerraform): proto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)),
903+
string(database.ProvisionerTypeTerraform): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)),
903904
}
904905
// include echo provisioner when in dev mode
905906
if dev {
@@ -920,9 +921,13 @@ func newProvisionerDaemon(
920921
}
921922
}
922923
}()
923-
provisioners[string(database.ProvisionerTypeEcho)] = proto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient))
924+
provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient))
924925
}
925-
return provisionerd.New(coderAPI.ListenProvisionerDaemon, &provisionerd.Options{
926+
return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
927+
// This debounces calls to listen every second. Read the comment
928+
// in provisionerdserver.go to learn more!
929+
return coderAPI.ListenProvisionerDaemon(ctx, time.Second)
930+
}, &provisionerd.Options{
926931
Logger: logger,
927932
PollInterval: 500 * time.Millisecond,
928933
UpdateInterval: 500 * time.Millisecond,

coderd/coderdtest/coderdtest.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ import (
6969
"github.com/coder/coder/cryptorand"
7070
"github.com/coder/coder/provisioner/echo"
7171
"github.com/coder/coder/provisionerd"
72+
"github.com/coder/coder/provisionerd/proto"
7273
"github.com/coder/coder/provisionersdk"
73-
"github.com/coder/coder/provisionersdk/proto"
74+
sdkproto "github.com/coder/coder/provisionersdk/proto"
7475
"github.com/coder/coder/tailnet"
7576
"github.com/coder/coder/testutil"
7677
)
@@ -325,14 +326,16 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer {
325326
assert.NoError(t, err)
326327
}()
327328

328-
closer := provisionerd.New(coderAPI.ListenProvisionerDaemon, &provisionerd.Options{
329+
closer := provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) {
330+
return coderAPI.ListenProvisionerDaemon(ctx, 0)
331+
}, &provisionerd.Options{
329332
Filesystem: fs,
330333
Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug),
331334
PollInterval: 50 * time.Millisecond,
332335
UpdateInterval: 250 * time.Millisecond,
333336
ForceCancelInterval: time.Second,
334337
Provisioners: provisionerd.Provisioners{
335-
string(database.ProvisionerTypeEcho): proto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)),
338+
string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)),
336339
},
337340
WorkDirectory: t.TempDir(),
338341
})

coderd/provisionerdaemons.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"time"
1011

1112
"github.com/google/uuid"
1213
"github.com/moby/moby/pkg/namesgenerator"
@@ -55,7 +56,7 @@ func (api *API) provisionerDaemons(rw http.ResponseWriter, r *http.Request) {
5556

5657
// ListenProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
5758
// in the same process.
58-
func (api *API) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) {
59+
func (api *API) ListenProvisionerDaemon(ctx context.Context, acquireJobDebounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) {
5960
clientSession, serverSession := provisionersdk.TransportPipe()
6061
defer func() {
6162
if err != nil {
@@ -77,13 +78,14 @@ func (api *API) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCP
7778

7879
mux := drpcmux.New()
7980
err = proto.DRPCRegisterProvisionerDaemon(mux, &provisionerdserver.Server{
80-
AccessURL: api.AccessURL,
81-
ID: daemon.ID,
82-
Database: api.Database,
83-
Pubsub: api.Pubsub,
84-
Provisioners: daemon.Provisioners,
85-
Telemetry: api.Telemetry,
86-
Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)),
81+
AccessURL: api.AccessURL,
82+
ID: daemon.ID,
83+
Database: api.Database,
84+
Pubsub: api.Pubsub,
85+
Provisioners: daemon.Provisioners,
86+
Telemetry: api.Telemetry,
87+
Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)),
88+
AcquireJobDebounce: acquireJobDebounce,
8789
})
8890
if err != nil {
8991
return nil, err

coderd/provisionerdserver/provisionerdserver.go

+22
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"net/url"
1010
"reflect"
11+
"sync"
1112
"time"
1213

1314
"github.com/google/uuid"
@@ -27,6 +28,11 @@ import (
2728
sdkproto "github.com/coder/coder/provisionersdk/proto"
2829
)
2930

31+
var (
32+
lastAcquire time.Time
33+
lastAcquireMutex sync.RWMutex
34+
)
35+
3036
type Server struct {
3137
AccessURL *url.URL
3238
ID uuid.UUID
@@ -35,10 +41,23 @@ type Server struct {
3541
Database database.Store
3642
Pubsub database.Pubsub
3743
Telemetry telemetry.Reporter
44+
45+
AcquireJobDebounce time.Duration
3846
}
3947

4048
// AcquireJob queries the database to lock a job.
4149
func (server *Server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error) {
50+
// This prevents loads of provisioner daemons from consistently
51+
// querying the database when no jobs are available.
52+
//
53+
// The debounce only occurs when no job is returned, so if loads of
54+
// jobs are added at once, they will start after at most this duration.
55+
lastAcquireMutex.RLock()
56+
if !lastAcquire.IsZero() && time.Since(lastAcquire) < server.AcquireJobDebounce {
57+
lastAcquireMutex.RUnlock()
58+
return &proto.AcquiredJob{}, nil
59+
}
60+
lastAcquireMutex.RUnlock()
4261
// This marks the job as locked in the database.
4362
job, err := server.Database.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
4463
StartedAt: sql.NullTime{
@@ -54,6 +73,9 @@ func (server *Server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.Ac
5473
if errors.Is(err, sql.ErrNoRows) {
5574
// The provisioner daemon assumes no jobs are available if
5675
// an empty struct is returned.
76+
lastAcquireMutex.Lock()
77+
lastAcquire = time.Now()
78+
lastAcquireMutex.Unlock()
5779
return &proto.AcquiredJob{}, nil
5880
}
5981
if err != nil {

coderd/provisionerdserver/provisionerdserver_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"net/url"
88
"testing"
9+
"time"
910

1011
"github.com/google/uuid"
1112
"github.com/stretchr/testify/require"
@@ -22,6 +23,33 @@ import (
2223

2324
func TestAcquireJob(t *testing.T) {
2425
t.Parallel()
26+
t.Run("Debounce", func(t *testing.T) {
27+
t.Parallel()
28+
db := databasefake.New()
29+
pubsub := database.NewPubsubInMemory()
30+
srv := &provisionerdserver.Server{
31+
ID: uuid.New(),
32+
Logger: slogtest.Make(t, nil),
33+
AccessURL: &url.URL{},
34+
Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho},
35+
Database: db,
36+
Pubsub: pubsub,
37+
Telemetry: telemetry.NewNoop(),
38+
AcquireJobDebounce: time.Hour,
39+
}
40+
job, err := srv.AcquireJob(context.Background(), nil)
41+
require.NoError(t, err)
42+
require.Equal(t, &proto.AcquiredJob{}, job)
43+
_, err = srv.Database.InsertProvisionerJob(context.Background(), database.InsertProvisionerJobParams{
44+
ID: uuid.New(),
45+
InitiatorID: uuid.New(),
46+
Provisioner: database.ProvisionerTypeEcho,
47+
})
48+
require.NoError(t, err)
49+
job, err = srv.AcquireJob(context.Background(), nil)
50+
require.NoError(t, err)
51+
require.Equal(t, &proto.AcquiredJob{}, job)
52+
})
2553
t.Run("NoJobs", func(t *testing.T) {
2654
t.Parallel()
2755
srv := setup(t)

0 commit comments

Comments
 (0)