-
Notifications
You must be signed in to change notification settings - Fork 889
feat: add metrics to PGPubsub #11971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -9,6 +9,7 @@ import ( | |||
|
||||
"github.com/google/uuid" | ||||
"github.com/lib/pq" | ||||
"github.com/prometheus/client_golang/prometheus" | ||||
"golang.org/x/xerrors" | ||||
|
||||
"cdr.dev/slog" | ||||
|
@@ -162,8 +163,8 @@ func (q *msgQueue) dropped() { | |||
q.cond.Broadcast() | ||||
} | ||||
|
||||
// Pubsub implementation using PostgreSQL. | ||||
type pgPubsub struct { | ||||
// PGPubsub is a pubsub implementation using PostgreSQL. | ||||
type PGPubsub struct { | ||||
ctx context.Context | ||||
cancel context.CancelFunc | ||||
logger slog.Logger | ||||
|
@@ -174,29 +175,40 @@ type pgPubsub struct { | |||
queues map[string]map[uuid.UUID]*msgQueue | ||||
closedListener bool | ||||
closeListenerErr error | ||||
|
||||
publishesTotal *prometheus.CounterVec | ||||
subscribesTotal *prometheus.CounterVec | ||||
messagesTotal *prometheus.CounterVec | ||||
publishedBytesTotal prometheus.Counter | ||||
receivedBytesTotal prometheus.Counter | ||||
disconnectionsTotal prometheus.Counter | ||||
connected prometheus.Gauge | ||||
} | ||||
|
||||
// BufferSize is the maximum number of unhandled messages we will buffer | ||||
// for a subscriber before dropping messages. | ||||
const BufferSize = 2048 | ||||
|
||||
// Subscribe calls the listener when an event matching the name is received. | ||||
func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) { | ||||
func (p *PGPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) { | ||||
return p.subscribeQueue(event, newMsgQueue(p.ctx, listener, nil)) | ||||
} | ||||
|
||||
func (p *pgPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) { | ||||
func (p *PGPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) { | ||||
return p.subscribeQueue(event, newMsgQueue(p.ctx, nil, listener)) | ||||
} | ||||
|
||||
func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) { | ||||
func (p *PGPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), err error) { | ||||
p.mut.Lock() | ||||
defer p.mut.Unlock() | ||||
defer func() { | ||||
if err != nil { | ||||
// if we hit an error, we need to close the queue so we don't | ||||
// leak its goroutine. | ||||
newQ.close() | ||||
p.subscribesTotal.WithLabelValues("false").Inc() | ||||
} else { | ||||
p.subscribesTotal.WithLabelValues("true").Inc() | ||||
} | ||||
}() | ||||
|
||||
|
@@ -239,20 +251,23 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), | |||
}, nil | ||||
} | ||||
|
||||
func (p *pgPubsub) Publish(event string, message []byte) error { | ||||
func (p *PGPubsub) Publish(event string, message []byte) error { | ||||
p.logger.Debug(p.ctx, "publish", slog.F("event", event), slog.F("message_len", len(message))) | ||||
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't | ||||
// support the first parameter being a prepared statement. | ||||
//nolint:gosec | ||||
_, err := p.db.ExecContext(p.ctx, `select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`, message) | ||||
if err != nil { | ||||
p.publishesTotal.WithLabelValues("false").Inc() | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does false label represent error here? Is that common prom metric practice? (Just checking since I'm not that experienced adding prom metrics.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The label name is coder/agent/agentscripts/agentscripts.go Line 66 in 3ace798
|
||||
return xerrors.Errorf("exec pg_notify: %w", err) | ||||
} | ||||
p.publishesTotal.WithLabelValues("true").Inc() | ||||
p.publishedBytesTotal.Add(float64(len(message))) | ||||
return nil | ||||
} | ||||
|
||||
// Close closes the pubsub instance. | ||||
func (p *pgPubsub) Close() error { | ||||
func (p *PGPubsub) Close() error { | ||||
p.logger.Info(p.ctx, "pubsub is closing") | ||||
p.cancel() | ||||
err := p.closeListener() | ||||
|
@@ -262,7 +277,7 @@ func (p *pgPubsub) Close() error { | |||
} | ||||
|
||||
// closeListener closes the pgListener, unless it has already been closed. | ||||
func (p *pgPubsub) closeListener() error { | ||||
func (p *PGPubsub) closeListener() error { | ||||
p.mut.Lock() | ||||
defer p.mut.Unlock() | ||||
if p.closedListener { | ||||
|
@@ -274,7 +289,7 @@ func (p *pgPubsub) closeListener() error { | |||
} | ||||
|
||||
// listen begins receiving messages on the pq listener. | ||||
func (p *pgPubsub) listen() { | ||||
func (p *PGPubsub) listen() { | ||||
defer func() { | ||||
p.logger.Info(p.ctx, "pubsub listen stopped receiving notify") | ||||
cErr := p.closeListener() | ||||
|
@@ -307,7 +322,14 @@ func (p *pgPubsub) listen() { | |||
} | ||||
} | ||||
|
||||
func (p *pgPubsub) listenReceive(notif *pq.Notification) { | ||||
func (p *PGPubsub) listenReceive(notif *pq.Notification) { | ||||
sizeLabel := messageSizeNormal | ||||
if len(notif.Extra) >= colossalThreshold { | ||||
sizeLabel = messageSizeColossal | ||||
} | ||||
spikecurtis marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
p.messagesTotal.WithLabelValues(sizeLabel).Inc() | ||||
p.receivedBytesTotal.Add(float64(len(notif.Extra))) | ||||
|
||||
p.mut.Lock() | ||||
defer p.mut.Unlock() | ||||
queues, ok := p.queues[notif.Channel] | ||||
|
@@ -320,7 +342,7 @@ func (p *pgPubsub) listenReceive(notif *pq.Notification) { | |||
} | ||||
} | ||||
|
||||
func (p *pgPubsub) recordReconnect() { | ||||
func (p *PGPubsub) recordReconnect() { | ||||
p.mut.Lock() | ||||
defer p.mut.Unlock() | ||||
for _, listeners := range p.queues { | ||||
|
@@ -330,20 +352,23 @@ func (p *pgPubsub) recordReconnect() { | |||
} | ||||
} | ||||
|
||||
// New creates a new Pubsub implementation using a PostgreSQL connection. | ||||
func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (Pubsub, error) { | ||||
func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { | ||||
p.connected.Set(0) | ||||
// Creates a new listener using pq. | ||||
errCh := make(chan error) | ||||
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { | ||||
p.pgListener = pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { | ||||
switch t { | ||||
case pq.ListenerEventConnected: | ||||
logger.Info(ctx, "pubsub connected to postgres") | ||||
p.logger.Info(ctx, "pubsub connected to postgres") | ||||
p.connected.Set(1.0) | ||||
case pq.ListenerEventDisconnected: | ||||
logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err)) | ||||
p.logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err)) | ||||
p.connected.Set(0) | ||||
case pq.ListenerEventReconnected: | ||||
logger.Info(ctx, "pubsub reconnected to postgres") | ||||
p.logger.Info(ctx, "pubsub reconnected to postgres") | ||||
p.connected.Set(1) | ||||
case pq.ListenerEventConnectionAttemptFailed: | ||||
logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err)) | ||||
p.logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err)) | ||||
} | ||||
// This callback gets events whenever the connection state changes. | ||||
// Don't send if the errChannel has already been closed. | ||||
|
@@ -358,26 +383,141 @@ func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL s | |||
select { | ||||
case err := <-errCh: | ||||
if err != nil { | ||||
_ = listener.Close() | ||||
return nil, xerrors.Errorf("create pq listener: %w", err) | ||||
_ = p.pgListener.Close() | ||||
return xerrors.Errorf("create pq listener: %w", err) | ||||
} | ||||
case <-ctx.Done(): | ||||
_ = listener.Close() | ||||
return nil, ctx.Err() | ||||
_ = p.pgListener.Close() | ||||
return ctx.Err() | ||||
} | ||||
return nil | ||||
} | ||||
|
||||
// these are the metrics we compute implicitly from our existing data structures | ||||
var ( | ||||
currentSubscribersDesc = prometheus.NewDesc( | ||||
"coder_pubsub_current_subscribers", | ||||
"The current number of active pubsub subscribers", | ||||
nil, nil, | ||||
) | ||||
currentEventsDesc = prometheus.NewDesc( | ||||
"coder_pubsub_current_events", | ||||
"The current number of pubsub event channels listened for", | ||||
nil, nil, | ||||
) | ||||
) | ||||
|
||||
// We'll track messages as size "normal" and "colossal", where the | ||||
// latter are messages larger than 7600 bytes, or 95% of the postgres | ||||
// notify limit. If we see a lot of colossal packets that's an indication that | ||||
// we might be trying to send too much data over the pubsub and are in danger of | ||||
// failing to publish. | ||||
const ( | ||||
colossalThreshold = 7600 | ||||
messageSizeNormal = "normal" | ||||
messageSizeColossal = "colossal" | ||||
) | ||||
|
||||
// Describe implements, along with Collect, the prometheus.Collector interface | ||||
// for metrics. | ||||
func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) { | ||||
// explicit metrics | ||||
p.publishesTotal.Describe(descs) | ||||
p.subscribesTotal.Describe(descs) | ||||
p.messagesTotal.Describe(descs) | ||||
p.publishedBytesTotal.Describe(descs) | ||||
p.receivedBytesTotal.Describe(descs) | ||||
p.disconnectionsTotal.Describe(descs) | ||||
p.connected.Describe(descs) | ||||
|
||||
// implicit metrics | ||||
descs <- currentSubscribersDesc | ||||
descs <- currentEventsDesc | ||||
} | ||||
|
||||
// Collect implements, along with Describe, the prometheus.Collector interface | ||||
// for metrics | ||||
func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) { | ||||
// explicit metrics | ||||
p.publishesTotal.Collect(metrics) | ||||
p.subscribesTotal.Collect(metrics) | ||||
p.messagesTotal.Collect(metrics) | ||||
p.publishedBytesTotal.Collect(metrics) | ||||
p.receivedBytesTotal.Collect(metrics) | ||||
p.disconnectionsTotal.Collect(metrics) | ||||
p.connected.Collect(metrics) | ||||
|
||||
// implicit metrics | ||||
p.mut.Lock() | ||||
events := len(p.queues) | ||||
subs := 0 | ||||
for _, subscriberMap := range p.queues { | ||||
subs += len(subscriberMap) | ||||
} | ||||
p.mut.Unlock() | ||||
metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs)) | ||||
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events)) | ||||
} | ||||
|
||||
// New creates a new Pubsub implementation using a PostgreSQL connection. | ||||
func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (*PGPubsub, error) { | ||||
// Start a new context that will be canceled when the pubsub is closed. | ||||
ctx, cancel := context.WithCancel(context.Background()) | ||||
pgPubsub := &pgPubsub{ | ||||
p := &PGPubsub{ | ||||
ctx: ctx, | ||||
cancel: cancel, | ||||
logger: logger, | ||||
listenDone: make(chan struct{}), | ||||
db: database, | ||||
pgListener: listener, | ||||
queues: make(map[string]map[uuid.UUID]*msgQueue), | ||||
|
||||
publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ | ||||
Namespace: "coder", | ||||
Subsystem: "pubsub", | ||||
Name: "publishes_total", | ||||
Help: "Total number of calls to Publish", | ||||
}, []string{"success"}), | ||||
subscribesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ | ||||
Namespace: "coder", | ||||
Subsystem: "pubsub", | ||||
Name: "subscribes_total", | ||||
Help: "Total number of calls to Subscribe/SubscribeWithErr", | ||||
}, []string{"success"}), | ||||
messagesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ | ||||
Namespace: "coder", | ||||
Subsystem: "pubsub", | ||||
Name: "messages_total", | ||||
Help: "Total number of messages received from postgres", | ||||
}, []string{"size"}), | ||||
publishedBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{ | ||||
Namespace: "coder", | ||||
Subsystem: "pubsub", | ||||
Name: "published_bytes_total", | ||||
Help: "Total number of bytes successfully published across all publishes", | ||||
}), | ||||
receivedBytesTotal: prometheus.NewCounter(prometheus.CounterOpts{ | ||||
Namespace: "coder", | ||||
Subsystem: "pubsub", | ||||
Name: "received_bytes_total", | ||||
Help: "Total number of bytes received across all messages", | ||||
}), | ||||
disconnectionsTotal: prometheus.NewCounter(prometheus.CounterOpts{ | ||||
Namespace: "coder", | ||||
Subsystem: "pubsub", | ||||
Name: "disconnections_total", | ||||
Help: "Total number of times we disconnected unexpectedly from postgres", | ||||
}), | ||||
connected: prometheus.NewGauge(prometheus.GaugeOpts{ | ||||
Namespace: "coder", | ||||
Subsystem: "pubsub", | ||||
Name: "connected", | ||||
Help: "Whether we are connected (1) or not connected (0) to postgres", | ||||
}), | ||||
spikecurtis marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
} | ||||
if err := p.startListener(startCtx, connectURL); err != nil { | ||||
return nil, err | ||||
} | ||||
go pgPubsub.listen() | ||||
go p.listen() | ||||
logger.Info(ctx, "pubsub has started") | ||||
return pgPubsub, nil | ||||
return p, nil | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about including the event as well? Might be useful to see if there's an abnormal amount of subs/pubs/data going for a specific event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a bad idea because some events include the UUID of the thing, e.g. workspace, which would make the set of label values unbounded, which is a cardinal sin in metrics.
Not for this PR, but if we decide we really want that information, we could standardize event channel names to be a struct with a static root and then zero or more specific IDs, and then we could label the publishes, subscribes and messages with the static root.