diff --git a/cli/server.go b/cli/server.go index 4631df82dfc44..1df5f49855909 100644 --- a/cli/server.go +++ b/cli/server.go @@ -673,7 +673,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. }() options.Database = database.New(sqlDB) - options.Pubsub, err = pubsub.New(ctx, sqlDB, dbURL) + options.Pubsub, err = pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL) if err != nil { return xerrors.Errorf("create pubsub: %w", err) } diff --git a/coderd/database/dbtestutil/db.go b/coderd/database/dbtestutil/db.go index c179917c0594a..efcbe326511c4 100644 --- a/coderd/database/dbtestutil/db.go +++ b/coderd/database/dbtestutil/db.go @@ -17,6 +17,8 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/xerrors" + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbmem" "github.com/coder/coder/v2/coderd/database/postgres" @@ -32,6 +34,7 @@ type options struct { fixedTimezone string dumpOnFailure bool returnSQLDB func(*sql.DB) + logger slog.Logger } type Option func(*options) @@ -50,6 +53,12 @@ func WithDumpOnFailure() Option { } } +func WithLogger(logger slog.Logger) Option { + return func(o *options) { + o.logger = logger + } +} + func withReturnSQLDB(f func(*sql.DB)) Option { return func(o *options) { o.returnSQLDB = f @@ -74,7 +83,7 @@ func NewDBWithSQLDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) { t.Helper() - var o options + o := options{logger: slogtest.Make(t, nil).Named("pubsub").Leveled(slog.LevelDebug)} for _, opt := range opts { opt(&o) } @@ -118,7 +127,7 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) { } db = database.New(sqlDB) - ps, err = pubsub.New(context.Background(), sqlDB, connectionURL) + ps, err = pubsub.New(context.Background(), o.logger, sqlDB, connectionURL) require.NoError(t, err) t.Cleanup(func() { _ = ps.Close() diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index 731466efd78e2..d70b5f5f9ce9a 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -10,6 +10,8 @@ import ( "github.com/google/uuid" "github.com/lib/pq" "golang.org/x/xerrors" + + "cdr.dev/slog" ) // Listener represents a pubsub handler. @@ -164,6 +166,7 @@ func (q *msgQueue) dropped() { type pgPubsub struct { ctx context.Context cancel context.CancelFunc + logger slog.Logger listenDone chan struct{} pgListener *pq.Listener db *sql.DB @@ -198,6 +201,9 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), }() err = p.pgListener.Listen(event) + if err == nil { + p.logger.Debug(p.ctx, "started listening to event channel", slog.F("event", event)) + } if errors.Is(err, pq.ErrChannelAlreadyOpen) { // It's ok if it's already open! err = nil @@ -223,12 +229,18 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(), delete(listeners, id) if len(listeners) == 0 { - _ = p.pgListener.Unlisten(event) + uErr := p.pgListener.Unlisten(event) + if uErr != nil && !p.closedListener { + p.logger.Warn(p.ctx, "failed to unlisten", slog.Error(uErr), slog.F("event", event)) + } else { + p.logger.Debug(p.ctx, "stopped listening to event channel", slog.F("event", event)) + } } }, nil } func (p *pgPubsub) Publish(event string, message []byte) error { + p.logger.Debug(p.ctx, "publish", slog.F("event", event), slog.F("message_len", len(message))) // This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't // support the first parameter being a prepared statement. //nolint:gosec @@ -241,9 +253,11 @@ func (p *pgPubsub) Publish(event string, message []byte) error { // Close closes the pubsub instance. func (p *pgPubsub) Close() error { + p.logger.Info(p.ctx, "pubsub is closing") p.cancel() err := p.closeListener() <-p.listenDone + p.logger.Debug(p.ctx, "pubsub closed") return err } @@ -262,7 +276,11 @@ func (p *pgPubsub) closeListener() error { // listen begins receiving messages on the pq listener. func (p *pgPubsub) listen() { defer func() { - _ = p.closeListener() + p.logger.Info(p.ctx, "pubsub listen stopped receiving notify") + cErr := p.closeListener() + if cErr != nil { + p.logger.Error(p.ctx, "failed to close listener") + } close(p.listenDone) }() @@ -281,6 +299,7 @@ func (p *pgPubsub) listen() { } // A nil notification can be dispatched on reconnect. if notif == nil { + p.logger.Debug(p.ctx, "notifying subscribers of a reconnection") p.recordReconnect() continue } @@ -312,10 +331,20 @@ func (p *pgPubsub) recordReconnect() { } // New creates a new Pubsub implementation using a PostgreSQL connection. -func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) { +func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (Pubsub, error) { // Creates a new listener using pq. errCh := make(chan error) - listener := pq.NewListener(connectURL, time.Second, time.Minute, func(_ pq.ListenerEventType, err error) { + listener := pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { + switch t { + case pq.ListenerEventConnected: + logger.Info(ctx, "pubsub connected to postgres") + case pq.ListenerEventDisconnected: + logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err)) + case pq.ListenerEventReconnected: + logger.Info(ctx, "pubsub reconnected to postgres") + case pq.ListenerEventConnectionAttemptFailed: + logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err)) + } // This callback gets events whenever the connection state changes. // Don't send if the errChannel has already been closed. select { @@ -342,12 +371,13 @@ func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, erro pgPubsub := &pgPubsub{ ctx: ctx, cancel: cancel, + logger: logger, listenDone: make(chan struct{}), db: database, pgListener: listener, queues: make(map[string]map[uuid.UUID]*msgQueue), } go pgPubsub.listen() - + logger.Info(ctx, "pubsub has started") return pgPubsub, nil } diff --git a/coderd/database/pubsub/pubsub_test.go b/coderd/database/pubsub/pubsub_test.go index 1d414d9edcd2c..c25af429a5d78 100644 --- a/coderd/database/pubsub/pubsub_test.go +++ b/coderd/database/pubsub/pubsub_test.go @@ -15,6 +15,8 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/xerrors" + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/coderd/database/postgres" "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/testutil" @@ -32,6 +34,7 @@ func TestPubsub(t *testing.T) { t.Run("Postgres", func(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) connectionURL, closePg, err := postgres.Open() require.NoError(t, err) @@ -39,7 +42,7 @@ func TestPubsub(t *testing.T) { db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := pubsub.New(ctx, db, connectionURL) + pubsub, err := pubsub.New(ctx, logger, db, connectionURL) require.NoError(t, err) defer pubsub.Close() event := "test" @@ -61,13 +64,14 @@ func TestPubsub(t *testing.T) { t.Run("PostgresCloseCancel", func(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) connectionURL, closePg, err := postgres.Open() require.NoError(t, err) defer closePg() db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := pubsub.New(ctx, db, connectionURL) + pubsub, err := pubsub.New(ctx, logger, db, connectionURL) require.NoError(t, err) defer pubsub.Close() cancelFunc() @@ -76,13 +80,14 @@ func TestPubsub(t *testing.T) { t.Run("NotClosedOnCancelContext", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) connectionURL, closePg, err := postgres.Open() require.NoError(t, err) defer closePg() db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := pubsub.New(ctx, db, connectionURL) + pubsub, err := pubsub.New(ctx, logger, db, connectionURL) require.NoError(t, err) defer pubsub.Close() @@ -108,13 +113,14 @@ func TestPubsub(t *testing.T) { t.Run("ClosePropagatesContextCancellationToSubscription", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) connectionURL, closePg, err := postgres.Open() require.NoError(t, err) defer closePg() db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := pubsub.New(ctx, db, connectionURL) + pubsub, err := pubsub.New(ctx, logger, db, connectionURL) require.NoError(t, err) defer pubsub.Close() @@ -164,6 +170,7 @@ func TestPubsub_ordering(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) connectionURL, closePg, err := postgres.Open() require.NoError(t, err) @@ -171,7 +178,7 @@ func TestPubsub_ordering(t *testing.T) { db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - ps, err := pubsub.New(ctx, db, connectionURL) + ps, err := pubsub.New(ctx, logger, db, connectionURL) require.NoError(t, err) defer ps.Close() event := "test" @@ -219,7 +226,8 @@ func TestPubsub_Disconnect(t *testing.T) { ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitSuperLong) defer cancelFunc() - ps, err := pubsub.New(ctx, db, connectionURL) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + ps, err := pubsub.New(ctx, logger, db, connectionURL) require.NoError(t, err) defer ps.Close() event := "test"