Skip to content

feat: add logging to pgPubsub #11953

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
}()

options.Database = database.New(sqlDB)
options.Pubsub, err = pubsub.New(ctx, sqlDB, dbURL)
options.Pubsub, err = pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
if err != nil {
return xerrors.Errorf("create pubsub: %w", err)
}
Expand Down
13 changes: 11 additions & 2 deletions coderd/database/dbtestutil/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbmem"
"github.com/coder/coder/v2/coderd/database/postgres"
Expand All @@ -32,6 +34,7 @@ type options struct {
fixedTimezone string
dumpOnFailure bool
returnSQLDB func(*sql.DB)
logger slog.Logger
}

type Option func(*options)
Expand All @@ -50,6 +53,12 @@ func WithDumpOnFailure() Option {
}
}

func WithLogger(logger slog.Logger) Option {
return func(o *options) {
o.logger = logger
}
}

func withReturnSQLDB(f func(*sql.DB)) Option {
return func(o *options) {
o.returnSQLDB = f
Expand All @@ -74,7 +83,7 @@ func NewDBWithSQLDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub
func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
t.Helper()

var o options
o := options{logger: slogtest.Make(t, nil).Named("pubsub").Leveled(slog.LevelDebug)}
for _, opt := range opts {
opt(&o)
}
Expand Down Expand Up @@ -118,7 +127,7 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
}
db = database.New(sqlDB)

ps, err = pubsub.New(context.Background(), sqlDB, connectionURL)
ps, err = pubsub.New(context.Background(), o.logger, sqlDB, connectionURL)
require.NoError(t, err)
t.Cleanup(func() {
_ = ps.Close()
Expand Down
40 changes: 35 additions & 5 deletions coderd/database/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/google/uuid"
"github.com/lib/pq"
"golang.org/x/xerrors"

"cdr.dev/slog"
)

// Listener represents a pubsub handler.
Expand Down Expand Up @@ -164,6 +166,7 @@ func (q *msgQueue) dropped() {
type pgPubsub struct {
ctx context.Context
cancel context.CancelFunc
logger slog.Logger
listenDone chan struct{}
pgListener *pq.Listener
db *sql.DB
Expand Down Expand Up @@ -198,6 +201,9 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
}()

err = p.pgListener.Listen(event)
if err == nil {
p.logger.Debug(p.ctx, "started listening to event channel", slog.F("event", event))
}
if errors.Is(err, pq.ErrChannelAlreadyOpen) {
// It's ok if it's already open!
err = nil
Expand All @@ -223,12 +229,18 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
delete(listeners, id)

if len(listeners) == 0 {
_ = p.pgListener.Unlisten(event)
uErr := p.pgListener.Unlisten(event)
if uErr != nil && !p.closedListener {
p.logger.Warn(p.ctx, "failed to unlisten", slog.Error(uErr), slog.F("event", event))
} else {
p.logger.Debug(p.ctx, "stopped listening to event channel", slog.F("event", event))
}
}
}, nil
}

func (p *pgPubsub) Publish(event string, message []byte) error {
p.logger.Debug(p.ctx, "publish", slog.F("event", event), slog.F("message_len", len(message)))
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
// support the first parameter being a prepared statement.
//nolint:gosec
Expand All @@ -241,9 +253,11 @@ func (p *pgPubsub) Publish(event string, message []byte) error {

// Close closes the pubsub instance.
func (p *pgPubsub) Close() error {
p.logger.Info(p.ctx, "pubsub is closing")
p.cancel()
err := p.closeListener()
<-p.listenDone
p.logger.Debug(p.ctx, "pubsub closed")
return err
}

Expand All @@ -262,7 +276,11 @@ func (p *pgPubsub) closeListener() error {
// listen begins receiving messages on the pq listener.
func (p *pgPubsub) listen() {
defer func() {
_ = p.closeListener()
p.logger.Info(p.ctx, "pubsub listen stopped receiving notify")
cErr := p.closeListener()
if cErr != nil {
p.logger.Error(p.ctx, "failed to close listener")
}
close(p.listenDone)
}()

Expand All @@ -281,6 +299,7 @@ func (p *pgPubsub) listen() {
}
// A nil notification can be dispatched on reconnect.
if notif == nil {
p.logger.Debug(p.ctx, "notifying subscribers of a reconnection")
p.recordReconnect()
continue
}
Expand Down Expand Up @@ -312,10 +331,20 @@ func (p *pgPubsub) recordReconnect() {
}

// New creates a new Pubsub implementation using a PostgreSQL connection.
func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) {
func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (Pubsub, error) {
// Creates a new listener using pq.
errCh := make(chan error)
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(_ pq.ListenerEventType, err error) {
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
switch t {
case pq.ListenerEventConnected:
logger.Info(ctx, "pubsub connected to postgres")
case pq.ListenerEventDisconnected:
logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say this is more of an info or warning than an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not a "normal" disconnection when we asked it to disconnect. It means the connection to postgres dropped due to postgres going down or a network error, and we're going to have to try to reconnect. In the meantime, we could lose some messages sent during the time we are disconnected. It's bad news and should be an error.

Copy link
Member

@mafredri mafredri Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, that makes sense. I didn't think about us potentially losing messages. The disconnect/reconnect I see as expected in any scenario where we're dealing with networking, data/state loss not so much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This got me thinking, how do we propagate this "potential loss of state"? If someone is viewing the dashboard for instance, and the disconnect happens to coincide with a workspace state update, we need a way to propagate that once connection is re-established. A "simple" re-fetch of the relevant information and send it to the user seems sensible, a diff/change detection if we want to be extreme.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the Pubsub interface to include a SubscribeWithErr function that informs the subscriber about potential data loss, so that it can resync or punt the error up to a higher layer to retry. Not everywhere uses it yet.

case pq.ListenerEventReconnected:
logger.Info(ctx, "pubsub reconnected to postgres")
case pq.ListenerEventConnectionAttemptFailed:
logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
}
// This callback gets events whenever the connection state changes.
// Don't send if the errChannel has already been closed.
select {
Expand All @@ -342,12 +371,13 @@ func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, erro
pgPubsub := &pgPubsub{
ctx: ctx,
cancel: cancel,
logger: logger,
listenDone: make(chan struct{}),
db: database,
pgListener: listener,
queues: make(map[string]map[uuid.UUID]*msgQueue),
}
go pgPubsub.listen()

logger.Info(ctx, "pubsub has started")
return pgPubsub, nil
}
20 changes: 14 additions & 6 deletions coderd/database/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database/postgres"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/testutil"
Expand All @@ -32,14 +34,15 @@ func TestPubsub(t *testing.T) {
t.Run("Postgres", func(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

connectionURL, closePg, err := postgres.Open()
require.NoError(t, err)
defer closePg()
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
defer db.Close()
pubsub, err := pubsub.New(ctx, db, connectionURL)
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)
defer pubsub.Close()
event := "test"
Expand All @@ -61,13 +64,14 @@ func TestPubsub(t *testing.T) {
t.Run("PostgresCloseCancel", func(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
connectionURL, closePg, err := postgres.Open()
require.NoError(t, err)
defer closePg()
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
defer db.Close()
pubsub, err := pubsub.New(ctx, db, connectionURL)
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)
defer pubsub.Close()
cancelFunc()
Expand All @@ -76,13 +80,14 @@ func TestPubsub(t *testing.T) {
t.Run("NotClosedOnCancelContext", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
connectionURL, closePg, err := postgres.Open()
require.NoError(t, err)
defer closePg()
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
defer db.Close()
pubsub, err := pubsub.New(ctx, db, connectionURL)
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)
defer pubsub.Close()

Expand All @@ -108,13 +113,14 @@ func TestPubsub(t *testing.T) {
t.Run("ClosePropagatesContextCancellationToSubscription", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
connectionURL, closePg, err := postgres.Open()
require.NoError(t, err)
defer closePg()
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
defer db.Close()
pubsub, err := pubsub.New(ctx, db, connectionURL)
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)
defer pubsub.Close()

Expand Down Expand Up @@ -164,14 +170,15 @@ func TestPubsub_ordering(t *testing.T) {

ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)

connectionURL, closePg, err := postgres.Open()
require.NoError(t, err)
defer closePg()
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
defer db.Close()
ps, err := pubsub.New(ctx, db, connectionURL)
ps, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)
defer ps.Close()
event := "test"
Expand Down Expand Up @@ -219,7 +226,8 @@ func TestPubsub_Disconnect(t *testing.T) {

ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
defer cancelFunc()
ps, err := pubsub.New(ctx, db, connectionURL)
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
ps, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)
defer ps.Close()
event := "test"
Expand Down