From ab040dce36a83cfa659e2f1d7a65593014299082 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Mon, 5 Feb 2024 15:11:03 +0000 Subject: [PATCH 1/3] chore: add logs to pq notification dialer --- coderd/database/pubsub/pubsub.go | 54 ++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index 6bab8d279bdc1..f5796052da341 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "net" "sync" "time" @@ -352,11 +353,60 @@ func (p *PGPubsub) recordReconnect() { } } +// logDialer is a pq.Dialer and pq.DialerContext that logs when it starts +// connecting and when the TCP connection is established. +type logDialer struct { + logCtx context.Context + logger slog.Logger + d net.Dialer +} + +var _ pq.Dialer = logDialer{} +var _ pq.DialerContext = logDialer{} + +func (d logDialer) Dial(network, address string) (net.Conn, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + return d.DialContext(ctx, network, address) +} + +func (d logDialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return d.DialContext(ctx, network, address) +} + +func (d logDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + deadline, hasDeadline := ctx.Deadline() + timeoutMS := 0 + if hasDeadline { + timeoutMS = int(time.Until(deadline) / time.Millisecond) + } + + d.logger.Info(d.logCtx, "pubsub dialing postgres", slog.F("network", network), slog.F("address", address), slog.F("timeout_ms", timeoutMS)) + start := time.Now() + conn, err := d.d.DialContext(ctx, network, address) + if err != nil { + d.logger.Error(d.logCtx, "pubsub failed to dial postgres", slog.F("network", network), slog.F("address", address), slog.F("timeout_ms", timeoutMS), slog.Error(err)) + return nil, err + } + elapsed := time.Since(start) + d.logger.Info(d.logCtx, "pubsub postgres TCP connection established", slog.F("elapsed_ms", elapsed.Milliseconds())) + return conn, nil +} + func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { p.connected.Set(0) // Creates a new listener using pq. - errCh := make(chan error) - p.pgListener = pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { + var ( + errCh = make(chan error) + dialer = logDialer{ + logCtx: ctx, + logger: p.logger, + d: net.Dialer{}, + } + ) + p.pgListener = pq.NewDialListener(dialer, connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { switch t { case pq.ListenerEventConnected: p.logger.Info(ctx, "pubsub connected to postgres") From 0047c7030c51663b5af084e5fb90ba9cca5dc21a Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Mon, 5 Feb 2024 15:12:13 +0000 Subject: [PATCH 2/3] fixup! chore: add logs to pq notification dialer --- coderd/database/pubsub/pubsub.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index f5796052da341..1f388c887bda3 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -403,7 +403,8 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { dialer = logDialer{ logCtx: ctx, logger: p.logger, - d: net.Dialer{}, + // pq.defaultDialer uses a zero net.Dialer as well. + d: net.Dialer{}, } ) p.pgListener = pq.NewDialListener(dialer, connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { From 87cea55564be4a62ac95814f0e48b2b917eee4d9 Mon Sep 17 00:00:00 2001 From: Dean Sheather Date: Tue, 6 Feb 2024 11:19:04 +0000 Subject: [PATCH 3/3] PR comments --- coderd/database/pubsub/pubsub.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index 1f388c887bda3..33b3c083b66f8 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -356,13 +356,14 @@ func (p *PGPubsub) recordReconnect() { // logDialer is a pq.Dialer and pq.DialerContext that logs when it starts // connecting and when the TCP connection is established. type logDialer struct { - logCtx context.Context logger slog.Logger d net.Dialer } -var _ pq.Dialer = logDialer{} -var _ pq.DialerContext = logDialer{} +var ( + _ pq.Dialer = logDialer{} + _ pq.DialerContext = logDialer{} +) func (d logDialer) Dial(network, address string) (net.Conn, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -383,15 +384,17 @@ func (d logDialer) DialContext(ctx context.Context, network, address string) (ne timeoutMS = int(time.Until(deadline) / time.Millisecond) } - d.logger.Info(d.logCtx, "pubsub dialing postgres", slog.F("network", network), slog.F("address", address), slog.F("timeout_ms", timeoutMS)) + logger := d.logger.With(slog.F("network", network), slog.F("address", address), slog.F("timeout_ms", timeoutMS)) + + logger.Info(ctx, "pubsub dialing postgres") start := time.Now() conn, err := d.d.DialContext(ctx, network, address) if err != nil { - d.logger.Error(d.logCtx, "pubsub failed to dial postgres", slog.F("network", network), slog.F("address", address), slog.F("timeout_ms", timeoutMS), slog.Error(err)) + logger.Error(ctx, "pubsub failed to dial postgres") return nil, err } elapsed := time.Since(start) - d.logger.Info(d.logCtx, "pubsub postgres TCP connection established", slog.F("elapsed_ms", elapsed.Milliseconds())) + logger.Info(ctx, "pubsub postgres TCP connection established", slog.F("elapsed_ms", elapsed.Milliseconds())) return conn, nil } @@ -401,7 +404,6 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { var ( errCh = make(chan error) dialer = logDialer{ - logCtx: ctx, logger: p.logger, // pq.defaultDialer uses a zero net.Dialer as well. d: net.Dialer{},