|
4 | 4 | "context"
|
5 | 5 | "database/sql"
|
6 | 6 | "errors"
|
| 7 | + "net" |
7 | 8 | "sync"
|
8 | 9 | "time"
|
9 | 10 |
|
@@ -352,11 +353,63 @@ func (p *PGPubsub) recordReconnect() {
|
352 | 353 | }
|
353 | 354 | }
|
354 | 355 |
|
| 356 | +// logDialer is a pq.Dialer and pq.DialerContext that logs when it starts |
| 357 | +// connecting and when the TCP connection is established. |
| 358 | +type logDialer struct { |
| 359 | + logger slog.Logger |
| 360 | + d net.Dialer |
| 361 | +} |
| 362 | + |
| 363 | +var ( |
| 364 | + _ pq.Dialer = logDialer{} |
| 365 | + _ pq.DialerContext = logDialer{} |
| 366 | +) |
| 367 | + |
| 368 | +func (d logDialer) Dial(network, address string) (net.Conn, error) { |
| 369 | + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 370 | + defer cancel() |
| 371 | + return d.DialContext(ctx, network, address) |
| 372 | +} |
| 373 | + |
| 374 | +func (d logDialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) { |
| 375 | + ctx, cancel := context.WithTimeout(context.Background(), timeout) |
| 376 | + defer cancel() |
| 377 | + return d.DialContext(ctx, network, address) |
| 378 | +} |
| 379 | + |
| 380 | +func (d logDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { |
| 381 | + deadline, hasDeadline := ctx.Deadline() |
| 382 | + timeoutMS := 0 |
| 383 | + if hasDeadline { |
| 384 | + timeoutMS = int(time.Until(deadline) / time.Millisecond) |
| 385 | + } |
| 386 | + |
| 387 | + logger := d.logger.With(slog.F("network", network), slog.F("address", address), slog.F("timeout_ms", timeoutMS)) |
| 388 | + |
| 389 | + logger.Info(ctx, "pubsub dialing postgres") |
| 390 | + start := time.Now() |
| 391 | + conn, err := d.d.DialContext(ctx, network, address) |
| 392 | + if err != nil { |
| 393 | + logger.Error(ctx, "pubsub failed to dial postgres") |
| 394 | + return nil, err |
| 395 | + } |
| 396 | + elapsed := time.Since(start) |
| 397 | + logger.Info(ctx, "pubsub postgres TCP connection established", slog.F("elapsed_ms", elapsed.Milliseconds())) |
| 398 | + return conn, nil |
| 399 | +} |
| 400 | + |
355 | 401 | func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error {
|
356 | 402 | p.connected.Set(0)
|
357 | 403 | // Creates a new listener using pq.
|
358 |
| - errCh := make(chan error) |
359 |
| - p.pgListener = pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { |
| 404 | + var ( |
| 405 | + errCh = make(chan error) |
| 406 | + dialer = logDialer{ |
| 407 | + logger: p.logger, |
| 408 | + // pq.defaultDialer uses a zero net.Dialer as well. |
| 409 | + d: net.Dialer{}, |
| 410 | + } |
| 411 | + ) |
| 412 | + p.pgListener = pq.NewDialListener(dialer, connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) { |
360 | 413 | switch t {
|
361 | 414 | case pq.ListenerEventConnected:
|
362 | 415 | p.logger.Info(ctx, "pubsub connected to postgres")
|
|
0 commit comments