From d5b4c50b2acdcff7bd0d8d7609a9bf89aa48c430 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 10 Nov 2022 21:54:04 +0000 Subject: [PATCH] fix: Debounce AcquireJob when no jobs are available This prevents constant database spam at scale to a maximum of 60 queries/s per coderd instance. --- cli/server.go | 13 ++++++--- coderd/coderdtest/coderdtest.go | 9 ++++-- coderd/provisionerdaemons.go | 18 ++++++------ .../provisionerdserver/provisionerdserver.go | 22 +++++++++++++++ .../provisionerdserver_test.go | 28 +++++++++++++++++++ 5 files changed, 75 insertions(+), 15 deletions(-) diff --git a/cli/server.go b/cli/server.go index bc4ccad533d7f..0a531316029b5 100644 --- a/cli/server.go +++ b/cli/server.go @@ -68,8 +68,9 @@ import ( "github.com/coder/coder/provisioner/echo" "github.com/coder/coder/provisioner/terraform" "github.com/coder/coder/provisionerd" + "github.com/coder/coder/provisionerd/proto" "github.com/coder/coder/provisionersdk" - "github.com/coder/coder/provisionersdk/proto" + sdkproto "github.com/coder/coder/provisionersdk/proto" "github.com/coder/coder/tailnet" ) @@ -899,7 +900,7 @@ func newProvisionerDaemon( } provisioners := provisionerd.Provisioners{ - string(database.ProvisionerTypeTerraform): proto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)), + string(database.ProvisionerTypeTerraform): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(terraformClient)), } // include echo provisioner when in dev mode if dev { @@ -920,9 +921,13 @@ func newProvisionerDaemon( } } }() - provisioners[string(database.ProvisionerTypeEcho)] = proto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)) + provisioners[string(database.ProvisionerTypeEcho)] = sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)) } - return provisionerd.New(coderAPI.ListenProvisionerDaemon, &provisionerd.Options{ + return provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) { + // This debounces calls to listen every second. Read the comment + // in provisionerdserver.go to learn more! + return coderAPI.ListenProvisionerDaemon(ctx, time.Second) + }, &provisionerd.Options{ Logger: logger, PollInterval: 500 * time.Millisecond, UpdateInterval: 500 * time.Millisecond, diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 0fe4b5a340d2e..e57e396fde0e0 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -69,8 +69,9 @@ import ( "github.com/coder/coder/cryptorand" "github.com/coder/coder/provisioner/echo" "github.com/coder/coder/provisionerd" + "github.com/coder/coder/provisionerd/proto" "github.com/coder/coder/provisionersdk" - "github.com/coder/coder/provisionersdk/proto" + sdkproto "github.com/coder/coder/provisionersdk/proto" "github.com/coder/coder/tailnet" "github.com/coder/coder/testutil" ) @@ -325,14 +326,16 @@ func NewProvisionerDaemon(t *testing.T, coderAPI *coderd.API) io.Closer { assert.NoError(t, err) }() - closer := provisionerd.New(coderAPI.ListenProvisionerDaemon, &provisionerd.Options{ + closer := provisionerd.New(func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error) { + return coderAPI.ListenProvisionerDaemon(ctx, 0) + }, &provisionerd.Options{ Filesystem: fs, Logger: slogtest.Make(t, nil).Named("provisionerd").Leveled(slog.LevelDebug), PollInterval: 50 * time.Millisecond, UpdateInterval: 250 * time.Millisecond, ForceCancelInterval: time.Second, Provisioners: provisionerd.Provisioners{ - string(database.ProvisionerTypeEcho): proto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)), + string(database.ProvisionerTypeEcho): sdkproto.NewDRPCProvisionerClient(provisionersdk.Conn(echoClient)), }, WorkDirectory: t.TempDir(), }) diff --git a/coderd/provisionerdaemons.go b/coderd/provisionerdaemons.go index 1336e84a6fb0c..cc99eefc779e1 100644 --- a/coderd/provisionerdaemons.go +++ b/coderd/provisionerdaemons.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "time" "github.com/google/uuid" "github.com/moby/moby/pkg/namesgenerator" @@ -55,7 +56,7 @@ func (api *API) provisionerDaemons(rw http.ResponseWriter, r *http.Request) { // ListenProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd // in the same process. -func (api *API) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) { +func (api *API) ListenProvisionerDaemon(ctx context.Context, acquireJobDebounce time.Duration) (client proto.DRPCProvisionerDaemonClient, err error) { clientSession, serverSession := provisionersdk.TransportPipe() defer func() { if err != nil { @@ -77,13 +78,14 @@ func (api *API) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCP mux := drpcmux.New() err = proto.DRPCRegisterProvisionerDaemon(mux, &provisionerdserver.Server{ - AccessURL: api.AccessURL, - ID: daemon.ID, - Database: api.Database, - Pubsub: api.Pubsub, - Provisioners: daemon.Provisioners, - Telemetry: api.Telemetry, - Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)), + AccessURL: api.AccessURL, + ID: daemon.ID, + Database: api.Database, + Pubsub: api.Pubsub, + Provisioners: daemon.Provisioners, + Telemetry: api.Telemetry, + Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)), + AcquireJobDebounce: acquireJobDebounce, }) if err != nil { return nil, err diff --git a/coderd/provisionerdserver/provisionerdserver.go b/coderd/provisionerdserver/provisionerdserver.go index ab47fc42f6047..901e19e2a67c5 100644 --- a/coderd/provisionerdserver/provisionerdserver.go +++ b/coderd/provisionerdserver/provisionerdserver.go @@ -8,6 +8,7 @@ import ( "fmt" "net/url" "reflect" + "sync" "time" "github.com/google/uuid" @@ -27,6 +28,11 @@ import ( sdkproto "github.com/coder/coder/provisionersdk/proto" ) +var ( + lastAcquire time.Time + lastAcquireMutex sync.RWMutex +) + type Server struct { AccessURL *url.URL ID uuid.UUID @@ -35,10 +41,23 @@ type Server struct { Database database.Store Pubsub database.Pubsub Telemetry telemetry.Reporter + + AcquireJobDebounce time.Duration } // AcquireJob queries the database to lock a job. func (server *Server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error) { + // This prevents loads of provisioner daemons from consistently + // querying the database when no jobs are available. + // + // The debounce only occurs when no job is returned, so if loads of + // jobs are added at once, they will start after at most this duration. + lastAcquireMutex.RLock() + if !lastAcquire.IsZero() && time.Since(lastAcquire) < server.AcquireJobDebounce { + lastAcquireMutex.RUnlock() + return &proto.AcquiredJob{}, nil + } + lastAcquireMutex.RUnlock() // This marks the job as locked in the database. job, err := server.Database.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{ StartedAt: sql.NullTime{ @@ -54,6 +73,9 @@ func (server *Server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.Ac if errors.Is(err, sql.ErrNoRows) { // The provisioner daemon assumes no jobs are available if // an empty struct is returned. + lastAcquireMutex.Lock() + lastAcquire = time.Now() + lastAcquireMutex.Unlock() return &proto.AcquiredJob{}, nil } if err != nil { diff --git a/coderd/provisionerdserver/provisionerdserver_test.go b/coderd/provisionerdserver/provisionerdserver_test.go index dff118a8150b8..d8046f5a64e01 100644 --- a/coderd/provisionerdserver/provisionerdserver_test.go +++ b/coderd/provisionerdserver/provisionerdserver_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "net/url" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/require" @@ -22,6 +23,33 @@ import ( func TestAcquireJob(t *testing.T) { t.Parallel() + t.Run("Debounce", func(t *testing.T) { + t.Parallel() + db := databasefake.New() + pubsub := database.NewPubsubInMemory() + srv := &provisionerdserver.Server{ + ID: uuid.New(), + Logger: slogtest.Make(t, nil), + AccessURL: &url.URL{}, + Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, + Database: db, + Pubsub: pubsub, + Telemetry: telemetry.NewNoop(), + AcquireJobDebounce: time.Hour, + } + job, err := srv.AcquireJob(context.Background(), nil) + require.NoError(t, err) + require.Equal(t, &proto.AcquiredJob{}, job) + _, err = srv.Database.InsertProvisionerJob(context.Background(), database.InsertProvisionerJobParams{ + ID: uuid.New(), + InitiatorID: uuid.New(), + Provisioner: database.ProvisionerTypeEcho, + }) + require.NoError(t, err) + job, err = srv.AcquireJob(context.Background(), nil) + require.NoError(t, err) + require.Equal(t, &proto.AcquiredJob{}, job) + }) t.Run("NoJobs", func(t *testing.T) { t.Parallel() srv := setup(t)