Skip to content

feat: add metrics to PGPubsub #11971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,14 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
}()

options.Database = database.New(sqlDB)
options.Pubsub, err = pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
if err != nil {
return xerrors.Errorf("create pubsub: %w", err)
}
options.Pubsub = ps
if options.DeploymentValues.Prometheus.Enable {
options.PrometheusRegistry.MustRegister(ps)
}
defer options.Pubsub.Close()
}

Expand Down
192 changes: 166 additions & 26 deletions coderd/database/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/google/uuid"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/xerrors"

"cdr.dev/slog"
Expand Down Expand Up @@ -162,8 +163,8 @@ func (q *msgQueue) dropped() {
q.cond.Broadcast()
}

// Pubsub implementation using PostgreSQL.
type pgPubsub struct {
// PGPubsub is a pubsub implementation using PostgreSQL.
type PGPubsub struct {
ctx context.Context
cancel context.CancelFunc
logger slog.Logger
Expand All @@ -174,29 +175,40 @@ type pgPubsub struct {
queues map[string]map[uuid.UUID]*msgQueue
closedListener bool
closeListenerErr error

publishesTotal *prometheus.CounterVec
subscribesTotal *prometheus.CounterVec
messagesTotal *prometheus.CounterVec
publishedBytesTotal prometheus.Counter
receivedBytesTotal prometheus.Counter
disconnectionsTotal prometheus.Counter
connected prometheus.Gauge
}

// BufferSize is the maximum number of unhandled messages we will buffer
// for a subscriber before dropping messages.
const BufferSize = 2048

// Subscribe calls the listener when an event matching the name is received.
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
func (p *PGPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
return p.subscribeQueue(event, newMsgQueue(p.ctx, listener, nil))
}

func (p *pgPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
func (p *PGPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
return p.subscribeQueue(event, newMsgQueue(p.ctx, nil, listener))
}

func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) {
p.mut.Lock()
defer p.mut.Unlock()
defer func() {
if err != nil {
// if we hit an error, we need to close the queue so we don't
// leak its goroutine.
newQ.close()
p.subscribesTotal.WithLabelValues("false").Inc()
} else {
p.subscribesTotal.WithLabelValues("true").Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about including the event as well? Might be useful to see if there's an abnormal amount of subs/pubs/data going for a specific event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a bad idea because some events include the UUID of the thing, e.g. workspace, which would make the set of label values unbounded, which is a cardinal sin in metrics.

Not for this PR, but if we decide we really want that information, we could standardize event channel names to be a struct with a static root and then zero or more specific IDs, and then we could label the publishes, subscribes and messages with the static root.

}
}()

Expand Down Expand Up @@ -239,20 +251,23 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
}, nil
}

func (p *pgPubsub) Publish(event string, message []byte) error {
func (p *PGPubsub) Publish(event string, message []byte) error {
p.logger.Debug(p.ctx, "publish", slog.F("event", event), slog.F("message_len", len(message)))
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
// support the first parameter being a prepared statement.
//nolint:gosec
_, err := p.db.ExecContext(p.ctx, `select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`, message)
if err != nil {
p.publishesTotal.WithLabelValues("false").Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does false label represent error here? Is that common prom metric practice? (Just checking since I'm not that experienced adding prom metrics.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The label name is success with values true or false. Just trying to keep our labeling consistent within the product and that's how we do it at

}, []string{"success"}),

return xerrors.Errorf("exec pg_notify: %w", err)
}
p.publishesTotal.WithLabelValues("true").Inc()
p.publishedBytesTotal.Add(float64(len(message)))
return nil
}

// Close closes the pubsub instance.
func (p *pgPubsub) Close() error {
func (p *PGPubsub) Close() error {
p.logger.Info(p.ctx, "pubsub is closing")
p.cancel()
err := p.closeListener()
Expand All @@ -262,7 +277,7 @@ func (p *pgPubsub) Close() error {
}

// closeListener closes the pgListener, unless it has already been closed.
func (p *pgPubsub) closeListener() error {
func (p *PGPubsub) closeListener() error {
p.mut.Lock()
defer p.mut.Unlock()
if p.closedListener {
Expand All @@ -274,7 +289,7 @@ func (p *pgPubsub) closeListener() error {
}

// listen begins receiving messages on the pq listener.
func (p *pgPubsub) listen() {
func (p *PGPubsub) listen() {
defer func() {
p.logger.Info(p.ctx, "pubsub listen stopped receiving notify")
cErr := p.closeListener()
Expand Down Expand Up @@ -307,7 +322,14 @@ func (p *pgPubsub) listen() {
}
}

func (p *pgPubsub) listenReceive(notif *pq.Notification) {
func (p *PGPubsub) listenReceive(notif *pq.Notification) {
sizeLabel := messageSizeNormal
if len(notif.Extra) >= colossalThreshold {
sizeLabel = messageSizeColossal
}
p.messagesTotal.WithLabelValues(sizeLabel).Inc()
p.receivedBytesTotal.Add(float64(len(notif.Extra)))

p.mut.Lock()
defer p.mut.Unlock()
queues, ok := p.queues[notif.Channel]
Expand All @@ -320,7 +342,7 @@ func (p *pgPubsub) listenReceive(notif *pq.Notification) {
}
}

func (p *pgPubsub) recordReconnect() {
func (p *PGPubsub) recordReconnect() {
p.mut.Lock()
defer p.mut.Unlock()
for _, listeners := range p.queues {
Expand All @@ -330,20 +352,23 @@ func (p *pgPubsub) recordReconnect() {
}
}

// New creates a new Pubsub implementation using a PostgreSQL connection.
func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (Pubsub, error) {
func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error {
p.connected.Set(0)
// Creates a new listener using pq.
errCh := make(chan error)
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
p.pgListener = pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
switch t {
case pq.ListenerEventConnected:
logger.Info(ctx, "pubsub connected to postgres")
p.logger.Info(ctx, "pubsub connected to postgres")
p.connected.Set(1.0)
case pq.ListenerEventDisconnected:
logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err))
p.logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err))
p.connected.Set(0)
case pq.ListenerEventReconnected:
logger.Info(ctx, "pubsub reconnected to postgres")
p.logger.Info(ctx, "pubsub reconnected to postgres")
p.connected.Set(1)
case pq.ListenerEventConnectionAttemptFailed:
logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
p.logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
}
// This callback gets events whenever the connection state changes.
// Don't send if the errChannel has already been closed.
Expand All @@ -358,26 +383,141 @@ func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL s
select {
case err := <-errCh:
if err != nil {
_ = listener.Close()
return nil, xerrors.Errorf("create pq listener: %w", err)
_ = p.pgListener.Close()
return xerrors.Errorf("create pq listener: %w", err)
}
case <-ctx.Done():
_ = listener.Close()
return nil, ctx.Err()
_ = p.pgListener.Close()
return ctx.Err()
}
return nil
}

// these are the metrics we compute implicitly from our existing data structures
var (
currentSubscribersDesc = prometheus.NewDesc(
"coder_pubsub_current_subscribers",
"The current number of active pubsub subscribers",
nil, nil,
)
currentEventsDesc = prometheus.NewDesc(
"coder_pubsub_current_events",
"The current number of pubsub event channels listened for",
nil, nil,
)
)

// We'll track messages as size "normal" and "colossal", where the
// latter are messages larger than 7600 bytes, or 95% of the postgres
// notify limit. If we see a lot of colossal packets that's an indication that
// we might be trying to send too much data over the pubsub and are in danger of
// failing to publish.
const (
colossalThreshold = 7600
messageSizeNormal = "normal"
messageSizeColossal = "colossal"
)

// Describe implements, along with Collect, the prometheus.Collector interface
// for metrics.
func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
// explicit metrics
p.publishesTotal.Describe(descs)
p.subscribesTotal.Describe(descs)
p.messagesTotal.Describe(descs)
p.publishedBytesTotal.Describe(descs)
p.receivedBytesTotal.Describe(descs)
p.disconnectionsTotal.Describe(descs)
p.connected.Describe(descs)

// implicit metrics
descs <- currentSubscribersDesc
descs <- currentEventsDesc
}

// Collect implements, along with Describe, the prometheus.Collector interface
// for metrics
func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
// explicit metrics
p.publishesTotal.Collect(metrics)
p.subscribesTotal.Collect(metrics)
p.messagesTotal.Collect(metrics)
p.publishedBytesTotal.Collect(metrics)
p.receivedBytesTotal.Collect(metrics)
p.disconnectionsTotal.Collect(metrics)
p.connected.Collect(metrics)

// implicit metrics
p.mut.Lock()
events := len(p.queues)
subs := 0
for _, subscriberMap := range p.queues {
subs += len(subscriberMap)
}
p.mut.Unlock()
metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs))
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))
}

// New creates a new Pubsub implementation using a PostgreSQL connection.
func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (*PGPubsub, error) {
// Start a new context that will be canceled when the pubsub is closed.
ctx, cancel := context.WithCancel(context.Background())
pgPubsub := &pgPubsub{
p := &PGPubsub{
ctx: ctx,
cancel: cancel,
logger: logger,
listenDone: make(chan struct{}),
db: database,
pgListener: listener,
queues: make(map[string]map[uuid.UUID]*msgQueue),

publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coder",
Subsystem: "pubsub",
Name: "publishes_total",
Help: "Total number of calls to Publish",
}, []string{"success"}),
subscribesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coder",
Subsystem: "pubsub",
Name: "subscribes_total",
Help: "Total number of calls to Subscribe/SubscribeWithErr",
}, []string{"success"}),
messagesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coder",
Subsystem: "pubsub",
Name: "messages_total",
Help: "Total number of messages received from postgres",
}, []string{"size"}),
publishedBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "coder",
Subsystem: "pubsub",
Name: "published_bytes_total",
Help: "Total number of bytes successfully published across all publishes",
}),
receivedBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "coder",
Subsystem: "pubsub",
Name: "received_bytes_total",
Help: "Total number of bytes received across all messages",
}),
disconnectionsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "coder",
Subsystem: "pubsub",
Name: "disconnections_total",
Help: "Total number of times we disconnected unexpectedly from postgres",
}),
connected: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "coder",
Subsystem: "pubsub",
Name: "connected",
Help: "Whether we are connected (1) or not connected (0) to postgres",
}),
}
if err := p.startListener(startCtx, connectURL); err != nil {
return nil, err
}
go pgPubsub.listen()
go p.listen()
logger.Info(ctx, "pubsub has started")
return pgPubsub, nil
return p, nil
}
Loading