Skip to content

Commit c97b89d

Browse files
committed
feat: add logging to pgPubsub
1 parent 0c30dde commit c97b89d

File tree

4 files changed

+61
-14
lines changed

4 files changed

+61
-14
lines changed

cli/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
673673
}()
674674

675675
options.Database = database.New(sqlDB)
676-
options.Pubsub, err = pubsub.New(ctx, sqlDB, dbURL)
676+
options.Pubsub, err = pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
677677
if err != nil {
678678
return xerrors.Errorf("create pubsub: %w", err)
679679
}

coderd/database/dbtestutil/db.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/stretchr/testify/require"
1818
"golang.org/x/xerrors"
1919

20+
"cdr.dev/slog"
21+
"cdr.dev/slog/sloggers/slogtest"
2022
"github.com/coder/coder/v2/coderd/database"
2123
"github.com/coder/coder/v2/coderd/database/dbmem"
2224
"github.com/coder/coder/v2/coderd/database/postgres"
@@ -32,6 +34,7 @@ type options struct {
3234
fixedTimezone string
3335
dumpOnFailure bool
3436
returnSQLDB func(*sql.DB)
37+
logger slog.Logger
3538
}
3639

3740
type Option func(*options)
@@ -50,6 +53,12 @@ func WithDumpOnFailure() Option {
5053
}
5154
}
5255

56+
func WithLogger(logger slog.Logger) Option {
57+
return func(o *options) {
58+
o.logger = logger
59+
}
60+
}
61+
5362
func withReturnSQLDB(f func(*sql.DB)) Option {
5463
return func(o *options) {
5564
o.returnSQLDB = f
@@ -74,7 +83,7 @@ func NewDBWithSQLDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub
7483
func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
7584
t.Helper()
7685

77-
var o options
86+
o := options{logger: slogtest.Make(t, nil).Named("pubsub").Leveled(slog.LevelDebug)}
7887
for _, opt := range opts {
7988
opt(&o)
8089
}
@@ -118,7 +127,7 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
118127
}
119128
db = database.New(sqlDB)
120129

121-
ps, err = pubsub.New(context.Background(), sqlDB, connectionURL)
130+
ps, err = pubsub.New(context.Background(), o.logger, sqlDB, connectionURL)
122131
require.NoError(t, err)
123132
t.Cleanup(func() {
124133
_ = ps.Close()

coderd/database/pubsub/pubsub.go

+35-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/google/uuid"
1111
"github.com/lib/pq"
1212
"golang.org/x/xerrors"
13+
14+
"cdr.dev/slog"
1315
)
1416

1517
// Listener represents a pubsub handler.
@@ -164,6 +166,7 @@ func (q *msgQueue) dropped() {
164166
type pgPubsub struct {
165167
ctx context.Context
166168
cancel context.CancelFunc
169+
logger slog.Logger
167170
listenDone chan struct{}
168171
pgListener *pq.Listener
169172
db *sql.DB
@@ -198,6 +201,9 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
198201
}()
199202

200203
err = p.pgListener.Listen(event)
204+
if err == nil {
205+
p.logger.Debug(p.ctx, "started listening to event channel", slog.F("event", event))
206+
}
201207
if errors.Is(err, pq.ErrChannelAlreadyOpen) {
202208
// It's ok if it's already open!
203209
err = nil
@@ -223,12 +229,18 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
223229
delete(listeners, id)
224230

225231
if len(listeners) == 0 {
226-
_ = p.pgListener.Unlisten(event)
232+
uErr := p.pgListener.Unlisten(event)
233+
if uErr != nil && !p.closedListener {
234+
p.logger.Warn(p.ctx, "failed to unlisten", slog.Error(uErr), slog.F("event", event))
235+
} else {
236+
p.logger.Debug(p.ctx, "stopped listening to event channel", slog.F("event", event))
237+
}
227238
}
228239
}, nil
229240
}
230241

231242
func (p *pgPubsub) Publish(event string, message []byte) error {
243+
p.logger.Debug(p.ctx, "publish", slog.F("event", event))
232244
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
233245
// support the first parameter being a prepared statement.
234246
//nolint:gosec
@@ -241,9 +253,11 @@ func (p *pgPubsub) Publish(event string, message []byte) error {
241253

242254
// Close closes the pubsub instance.
243255
func (p *pgPubsub) Close() error {
256+
p.logger.Info(p.ctx, "pubsub is closing")
244257
p.cancel()
245258
err := p.closeListener()
246259
<-p.listenDone
260+
p.logger.Debug(p.ctx, "pubsub closed")
247261
return err
248262
}
249263

@@ -262,7 +276,11 @@ func (p *pgPubsub) closeListener() error {
262276
// listen begins receiving messages on the pq listener.
263277
func (p *pgPubsub) listen() {
264278
defer func() {
265-
_ = p.closeListener()
279+
p.logger.Info(p.ctx, "pubsub listen stopped receiving notify")
280+
cErr := p.closeListener()
281+
if cErr != nil {
282+
p.logger.Error(p.ctx, "failed to close listener")
283+
}
266284
close(p.listenDone)
267285
}()
268286

@@ -281,6 +299,7 @@ func (p *pgPubsub) listen() {
281299
}
282300
// A nil notification can be dispatched on reconnect.
283301
if notif == nil {
302+
p.logger.Debug(p.ctx, "notifying subscribers of a reconnection")
284303
p.recordReconnect()
285304
continue
286305
}
@@ -312,10 +331,20 @@ func (p *pgPubsub) recordReconnect() {
312331
}
313332

314333
// New creates a new Pubsub implementation using a PostgreSQL connection.
315-
func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) {
334+
func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (Pubsub, error) {
316335
// Creates a new listener using pq.
317336
errCh := make(chan error)
318-
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(_ pq.ListenerEventType, err error) {
337+
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
338+
switch t {
339+
case pq.ListenerEventConnected:
340+
logger.Info(ctx, "pubsub connected to postgres")
341+
case pq.ListenerEventDisconnected:
342+
logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err))
343+
case pq.ListenerEventReconnected:
344+
logger.Info(ctx, "pubsub reconnected to postgres")
345+
case pq.ListenerEventConnectionAttemptFailed:
346+
logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
347+
}
319348
// This callback gets events whenever the connection state changes.
320349
// Don't send if the errChannel has already been closed.
321350
select {
@@ -342,12 +371,13 @@ func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, erro
342371
pgPubsub := &pgPubsub{
343372
ctx: ctx,
344373
cancel: cancel,
374+
logger: logger,
345375
listenDone: make(chan struct{}),
346376
db: database,
347377
pgListener: listener,
348378
queues: make(map[string]map[uuid.UUID]*msgQueue),
349379
}
350380
go pgPubsub.listen()
351-
381+
logger.Info(ctx, "pubsub has started")
352382
return pgPubsub, nil
353383
}

coderd/database/pubsub/pubsub_test.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"github.com/stretchr/testify/require"
1616
"golang.org/x/xerrors"
1717

18+
"cdr.dev/slog"
19+
"cdr.dev/slog/sloggers/slogtest"
1820
"github.com/coder/coder/v2/coderd/database/postgres"
1921
"github.com/coder/coder/v2/coderd/database/pubsub"
2022
"github.com/coder/coder/v2/testutil"
@@ -32,14 +34,15 @@ func TestPubsub(t *testing.T) {
3234
t.Run("Postgres", func(t *testing.T) {
3335
ctx, cancelFunc := context.WithCancel(context.Background())
3436
defer cancelFunc()
37+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
3538

3639
connectionURL, closePg, err := postgres.Open()
3740
require.NoError(t, err)
3841
defer closePg()
3942
db, err := sql.Open("postgres", connectionURL)
4043
require.NoError(t, err)
4144
defer db.Close()
42-
pubsub, err := pubsub.New(ctx, db, connectionURL)
45+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
4346
require.NoError(t, err)
4447
defer pubsub.Close()
4548
event := "test"
@@ -61,13 +64,14 @@ func TestPubsub(t *testing.T) {
6164
t.Run("PostgresCloseCancel", func(t *testing.T) {
6265
ctx, cancelFunc := context.WithCancel(context.Background())
6366
defer cancelFunc()
67+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
6468
connectionURL, closePg, err := postgres.Open()
6569
require.NoError(t, err)
6670
defer closePg()
6771
db, err := sql.Open("postgres", connectionURL)
6872
require.NoError(t, err)
6973
defer db.Close()
70-
pubsub, err := pubsub.New(ctx, db, connectionURL)
74+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
7175
require.NoError(t, err)
7276
defer pubsub.Close()
7377
cancelFunc()
@@ -76,13 +80,14 @@ func TestPubsub(t *testing.T) {
7680
t.Run("NotClosedOnCancelContext", func(t *testing.T) {
7781
ctx, cancel := context.WithCancel(context.Background())
7882
defer cancel()
83+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
7984
connectionURL, closePg, err := postgres.Open()
8085
require.NoError(t, err)
8186
defer closePg()
8287
db, err := sql.Open("postgres", connectionURL)
8388
require.NoError(t, err)
8489
defer db.Close()
85-
pubsub, err := pubsub.New(ctx, db, connectionURL)
90+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
8691
require.NoError(t, err)
8792
defer pubsub.Close()
8893

@@ -108,13 +113,14 @@ func TestPubsub(t *testing.T) {
108113
t.Run("ClosePropagatesContextCancellationToSubscription", func(t *testing.T) {
109114
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
110115
defer cancel()
116+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
111117
connectionURL, closePg, err := postgres.Open()
112118
require.NoError(t, err)
113119
defer closePg()
114120
db, err := sql.Open("postgres", connectionURL)
115121
require.NoError(t, err)
116122
defer db.Close()
117-
pubsub, err := pubsub.New(ctx, db, connectionURL)
123+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
118124
require.NoError(t, err)
119125
defer pubsub.Close()
120126

@@ -164,14 +170,15 @@ func TestPubsub_ordering(t *testing.T) {
164170

165171
ctx, cancelFunc := context.WithCancel(context.Background())
166172
defer cancelFunc()
173+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
167174

168175
connectionURL, closePg, err := postgres.Open()
169176
require.NoError(t, err)
170177
defer closePg()
171178
db, err := sql.Open("postgres", connectionURL)
172179
require.NoError(t, err)
173180
defer db.Close()
174-
ps, err := pubsub.New(ctx, db, connectionURL)
181+
ps, err := pubsub.New(ctx, logger, db, connectionURL)
175182
require.NoError(t, err)
176183
defer ps.Close()
177184
event := "test"
@@ -219,7 +226,8 @@ func TestPubsub_Disconnect(t *testing.T) {
219226

220227
ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
221228
defer cancelFunc()
222-
ps, err := pubsub.New(ctx, db, connectionURL)
229+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
230+
ps, err := pubsub.New(ctx, logger, db, connectionURL)
223231
require.NoError(t, err)
224232
defer ps.Close()
225233
event := "test"

0 commit comments

Comments
 (0)