-
Notifications
You must be signed in to change notification settings - Fork 928
feat: add logging to pgPubsub #11953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,8 @@ import ( | |
"github.com/google/uuid" | ||
"github.com/lib/pq" | ||
"golang.org/x/xerrors" | ||
|
||
"cdr.dev/slog" | ||
) | ||
|
||
// Listener represents a pubsub handler. | ||
|
@@ -164,6 +166,7 @@ func (q *msgQueue) dropped() { | |
type pgPubsub struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
logger slog.Logger | ||
listenDone chan struct{} | ||
pgListener *pq.Listener | ||
db *sql.DB | ||
|
@@ -198,6 +201,9 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), | |
}() | ||
|
||
err = p.pgListener.Listen(event) | ||
if err == nil { | ||
p.logger.Debug(p.ctx, "started listening to event channel", slog.F("event", event)) | ||
} | ||
if errors.Is(err, pq.ErrChannelAlreadyOpen) { | ||
// It's ok if it's already open! | ||
err = nil | ||
|
@@ -223,12 +229,18 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), | |
delete(listeners, id) | ||
|
||
if len(listeners) == 0 { | ||
_ = p.pgListener.Unlisten(event) | ||
uErr := p.pgListener.Unlisten(event) | ||
if uErr != nil && !p.closedListener { | ||
p.logger.Warn(p.ctx, "failed to unlisten", slog.Error(uErr), slog.F("event", event)) | ||
} else { | ||
p.logger.Debug(p.ctx, "stopped listening to event channel", slog.F("event", event)) | ||
} | ||
} | ||
}, nil | ||
} | ||
|
||
func (p *pgPubsub) Publish(event string, message []byte) error { | ||
p.logger.Debug(p.ctx, "publish", slog.F("event", event), slog.F("message_len", len(message))) | ||
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't | ||
// support the first parameter being a prepared statement. | ||
//nolint:gosec | ||
|
@@ -241,9 +253,11 @@ func (p *pgPubsub) Publish(event string, message []byte) error { | |
|
||
// Close closes the pubsub instance. | ||
func (p *pgPubsub) Close() error { | ||
p.logger.Info(p.ctx, "pubsub is closing") | ||
p.cancel() | ||
err := p.closeListener() | ||
<-p.listenDone | ||
p.logger.Debug(p.ctx, "pubsub closed") | ||
return err | ||
} | ||
|
||
|
@@ -262,7 +276,11 @@ func (p *pgPubsub) closeListener() error { | |
// listen begins receiving messages on the pq listener. | ||
func (p *pgPubsub) listen() { | ||
defer func() { | ||
_ = p.closeListener() | ||
p.logger.Info(p.ctx, "pubsub listen stopped receiving notify") | ||
cErr := p.closeListener() | ||
if cErr != nil { | ||
p.logger.Error(p.ctx, "failed to close listener") | ||
} | ||
close(p.listenDone) | ||
}() | ||
|
||
|
@@ -281,6 +299,7 @@ func (p *pgPubsub) listen() { | |
} | ||
// A nil notification can be dispatched on reconnect. | ||
if notif == nil { | ||
p.logger.Debug(p.ctx, "notifying subscribers of a reconnection") | ||
p.recordReconnect() | ||
continue | ||
} | ||
|
@@ -312,10 +331,20 @@ func (p *pgPubsub) recordReconnect() { | |
} | ||
|
||
// New creates a new Pubsub implementation using a PostgreSQL connection. | ||
func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) { | ||
func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (Pubsub, error) { | ||
// Creates a new listener using pq. | ||
errCh := make(chan error) | ||
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(_ pq.ListenerEventType, err error) { | ||
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { | ||
switch t { | ||
case pq.ListenerEventConnected: | ||
logger.Info(ctx, "pubsub connected to postgres") | ||
case pq.ListenerEventDisconnected: | ||
logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say this is more of an info or warning than an error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not a "normal" disconnection when we asked it to disconnect. It means the connection to postgres dropped due to postgres going down or a network error, and we're going to have to try to reconnect. In the meantime, we could lose some messages sent during the time we are disconnected. It's bad news and should be an error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, that makes sense. I didn't think about us potentially losing messages. The disconnect/reconnect I see as expected in any scenario where we're dealing with networking, data/state loss not so much. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This got me thinking, how do we propagate this "potential loss of state"? If someone is viewing the dashboard for instance, and the disconnect happens to coincide with a workspace state update, we need a way to propagate that once connection is re-established. A "simple" re-fetch of the relevant information and send it to the user seems sensible, a diff/change detection if we want to be extreme. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated the Pubsub interface to include a |
||
case pq.ListenerEventReconnected: | ||
logger.Info(ctx, "pubsub reconnected to postgres") | ||
case pq.ListenerEventConnectionAttemptFailed: | ||
logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err)) | ||
} | ||
// This callback gets events whenever the connection state changes. | ||
// Don't send if the errChannel has already been closed. | ||
select { | ||
|
@@ -342,12 +371,13 @@ func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, erro | |
pgPubsub := &pgPubsub{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
logger: logger, | ||
listenDone: make(chan struct{}), | ||
db: database, | ||
pgListener: listener, | ||
queues: make(map[string]map[uuid.UUID]*msgQueue), | ||
} | ||
go pgPubsub.listen() | ||
|
||
logger.Info(ctx, "pubsub has started") | ||
return pgPubsub, nil | ||
} |
Uh oh!
There was an error while loading. Please reload this page.