Skip to content

feat: add provisionerd prometheus metrics #4909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/google/go-github/v43/github"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/afero"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -358,6 +359,7 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
AgentStatsRefreshInterval: cfg.AgentStatRefreshInterval.Value,
Experimental: ExperimentalEnabled(cmd),
DeploymentConfig: cfg,
PrometheusRegistry: prometheus.NewRegistry(),
}
if tlsConfig != nil {
options.TLSCertificates = tlsConfig.Certificates
Expand Down Expand Up @@ -505,21 +507,25 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
defer serveHandler(ctx, logger, nil, cfg.Pprof.Address.Value, "pprof")()
}
if cfg.Prometheus.Enable.Value {
options.PrometheusRegisterer = prometheus.DefaultRegisterer
closeUsersFunc, err := prometheusmetrics.ActiveUsers(ctx, options.PrometheusRegisterer, options.Database, 0)
options.PrometheusRegistry.MustRegister(collectors.NewGoCollector())
options.PrometheusRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))

closeUsersFunc, err := prometheusmetrics.ActiveUsers(ctx, options.PrometheusRegistry, options.Database, 0)
if err != nil {
return xerrors.Errorf("register active users prometheus metric: %w", err)
}
defer closeUsersFunc()

closeWorkspacesFunc, err := prometheusmetrics.Workspaces(ctx, options.PrometheusRegisterer, options.Database, 0)
closeWorkspacesFunc, err := prometheusmetrics.Workspaces(ctx, options.PrometheusRegistry, options.Database, 0)
if err != nil {
return xerrors.Errorf("register workspaces prometheus metric: %w", err)
}
defer closeWorkspacesFunc()

//nolint:revive
defer serveHandler(ctx, logger, promhttp.Handler(), cfg.Prometheus.Address.Value, "prometheus")()
defer serveHandler(ctx, logger, promhttp.InstrumentMetricHandler(
options.PrometheusRegistry, promhttp.HandlerFor(options.PrometheusRegistry, promhttp.HandlerOpts{}),
), cfg.Prometheus.Address.Value, "prometheus")()
}

// We use a separate coderAPICloser so the Enterprise API
Expand Down Expand Up @@ -555,8 +561,9 @@ func Server(vip *viper.Viper, newAPI func(context.Context, *coderd.Options) (*co
_ = daemon.Close()
}
}()
provisionerdMetrics := provisionerd.NewMetrics(options.PrometheusRegistry)
for i := 0; i < cfg.ProvisionerDaemons.Value; i++ {
daemon, err := newProvisionerDaemon(ctx, coderAPI, logger, cfg.CacheDirectory.Value, errCh, false)
daemon, err := newProvisionerDaemon(ctx, coderAPI, provisionerdMetrics, logger, cfg.CacheDirectory.Value, errCh, false)
if err != nil {
return xerrors.Errorf("create provisioner daemon: %w", err)
}
Expand Down Expand Up @@ -823,6 +830,7 @@ func shutdownWithTimeout(shutdown func(context.Context) error, timeout time.Dura
func newProvisionerDaemon(
ctx context.Context,
coderAPI *coderd.API,
metrics provisionerd.Metrics,
logger slog.Logger,
cacheDir string,
errCh chan error,
Expand Down Expand Up @@ -899,7 +907,8 @@ func newProvisionerDaemon(
UpdateInterval: 500 * time.Millisecond,
Provisioners: provisioners,
WorkDirectory: tempDir,
Tracer: coderAPI.TracerProvider,
TracerProvider: coderAPI.TracerProvider,
Metrics: &metrics,
}), nil
}

Expand Down
8 changes: 4 additions & 4 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Options struct {
GoogleTokenValidator *idtoken.Validator
GithubOAuth2Config *GithubOAuth2Config
OIDCConfig *OIDCConfig
PrometheusRegisterer prometheus.Registerer
PrometheusRegistry *prometheus.Registry
SecureAuthCookie bool
SSHKeygenAlgorithm gitsshkey.Algorithm
Telemetry telemetry.Reporter
Expand Down Expand Up @@ -132,8 +132,8 @@ func New(options *Options) *API {
if options.Authorizer == nil {
options.Authorizer = rbac.NewAuthorizer()
}
if options.PrometheusRegisterer == nil {
options.PrometheusRegisterer = prometheus.NewRegistry()
if options.PrometheusRegistry == nil {
options.PrometheusRegistry = prometheus.NewRegistry()
}
if options.TailnetCoordinator == nil {
options.TailnetCoordinator = tailnet.NewCoordinator()
Expand Down Expand Up @@ -204,7 +204,7 @@ func New(options *Options) *API {
httpmw.Recover(api.Logger),
httpmw.ExtractRealIP(api.RealIPConfig),
httpmw.Logger(api.Logger),
httpmw.Prometheus(options.PrometheusRegisterer),
httpmw.Prometheus(options.PrometheusRegistry),
// handleSubdomainApplications checks if the first subdomain is a valid
// app URL. If it is, it will serve that application.
api.handleSubdomainApplications(
Expand Down
2 changes: 2 additions & 0 deletions coderd/prometheusmetrics/prometheusmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db datab
return
case <-ticker.C:
}

apiKeys, err := db.GetAPIKeysLastUsedAfter(ctx, database.Now().Add(-1*time.Hour))
if err != nil {
continue
Expand Down Expand Up @@ -82,6 +83,7 @@ func Workspaces(ctx context.Context, registerer prometheus.Registerer, db databa
return
case <-ticker.C:
}

builds, err := db.GetLatestWorkspaceBuilds(ctx)
if err != nil {
continue
Expand Down
57 changes: 51 additions & 6 deletions provisionerd/provisionerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"

"github.com/hashicorp/yamux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/spf13/afero"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
Expand Down Expand Up @@ -41,9 +43,10 @@ type Provisioners map[string]sdkproto.DRPCProvisionerClient

// Options provides customizations to the behavior of a provisioner daemon.
type Options struct {
Filesystem afero.Fs
Logger slog.Logger
Tracer trace.TracerProvider
Filesystem afero.Fs
Logger slog.Logger
TracerProvider trace.TracerProvider
Metrics *Metrics

ForceCancelInterval time.Duration
UpdateInterval time.Duration
Expand All @@ -66,14 +69,19 @@ func New(clientDialer Dialer, opts *Options) *Server {
if opts.Filesystem == nil {
opts.Filesystem = afero.NewOsFs()
}
if opts.Tracer == nil {
opts.Tracer = trace.NewNoopTracerProvider()
if opts.TracerProvider == nil {
opts.TracerProvider = trace.NewNoopTracerProvider()
}
if opts.Metrics == nil {
reg := prometheus.NewRegistry()
mets := NewMetrics(reg)
opts.Metrics = &mets
}

ctx, ctxCancel := context.WithCancel(context.Background())
daemon := &Server{
opts: opts,
tracer: opts.Tracer.Tracer(tracing.TracerName),
tracer: opts.TracerProvider.Tracer(tracing.TracerName),

clientDialer: clientDialer,

Expand Down Expand Up @@ -103,6 +111,42 @@ type Server struct {
activeJob *runner.Runner
}

type Metrics struct {
Runner runner.Metrics
}

func NewMetrics(reg prometheus.Registerer) Metrics {
auto := promauto.With(reg)
durationToFloatMs := func(d time.Duration) float64 {
return float64(d.Milliseconds())
}

return Metrics{
Runner: runner.Metrics{
ConcurrentJobs: auto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "provisionerd",
Name: "jobs_current",
}, []string{"provisioner"}),
JobTimings: auto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "coderd",
Subsystem: "provisionerd",
Name: "job_timings_ms",
Buckets: []float64{
durationToFloatMs(1 * time.Second),
durationToFloatMs(10 * time.Second),
durationToFloatMs(30 * time.Second),
durationToFloatMs(1 * time.Minute),
durationToFloatMs(5 * time.Minute),
durationToFloatMs(10 * time.Minute),
durationToFloatMs(30 * time.Minute),
durationToFloatMs(1 * time.Hour),
},
}, []string{"provisioner", "status"}),
},
}
}

// Connect establishes a connection to coderd.
func (p *Server) connect(ctx context.Context) {
// An exponential back-off occurs when the connection is failing to dial.
Expand Down Expand Up @@ -282,6 +326,7 @@ func (p *Server) acquireJob(ctx context.Context) {
p.opts.UpdateInterval,
p.opts.ForceCancelInterval,
p.tracer,
p.opts.Metrics.Runner,
)

go p.activeJob.Run()
Expand Down
23 changes: 23 additions & 0 deletions provisionerd/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.11.0"
Expand All @@ -34,6 +35,7 @@ const (

type Runner struct {
tracer trace.Tracer
metrics Metrics
job *proto.AcquiredJob
sender JobUpdater
logger slog.Logger
Expand Down Expand Up @@ -65,6 +67,12 @@ type Runner struct {
okToSend bool
}

type Metrics struct {
ConcurrentJobs *prometheus.GaugeVec
// JobTimings also counts the total amount of jobs.
JobTimings *prometheus.HistogramVec
}

type JobUpdater interface {
UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error)
FailJob(ctx context.Context, in *proto.FailedJob) error
Expand All @@ -82,6 +90,7 @@ func NewRunner(
updateInterval time.Duration,
forceCancelInterval time.Duration,
tracer trace.Tracer,
metrics Metrics,
) *Runner {
m := new(sync.Mutex)

Expand All @@ -91,6 +100,7 @@ func NewRunner(

return &Runner{
tracer: tracer,
metrics: metrics,
job: job,
sender: updater,
logger: logger.With(slog.F("job_id", job.JobId)),
Expand Down Expand Up @@ -120,9 +130,22 @@ func NewRunner(
// that goroutine on the context passed into Fail(), and it marks okToSend false to signal us here
// that this function should not also send a terminal message.
func (r *Runner) Run() {
start := time.Now()
ctx, span := r.startTrace(r.notStopped, tracing.FuncName())
defer span.End()

concurrentGauge := r.metrics.ConcurrentJobs.WithLabelValues(r.job.Provisioner)
concurrentGauge.Inc()
defer func() {
status := "success"
if r.failedJob != nil {
status = "failed"
}

concurrentGauge.Dec()
r.metrics.JobTimings.WithLabelValues(r.job.Provisioner, status).Observe(float64(time.Since(start).Milliseconds()))
}()

r.mutex.Lock()
defer r.mutex.Unlock()
defer r.stop()
Expand Down