23
23
#include "pqexpbuffer.h"
24
24
#include "access/xlog.h"
25
25
#include "miscadmin.h"
26
+ #include "pgstat.h"
26
27
#include "replication/walreceiver.h"
28
+ #include "storage/proc.h"
27
29
#include "utils/builtins.h"
28
30
29
- #ifdef HAVE_POLL_H
30
- #include <poll.h>
31
- #endif
32
- #ifdef HAVE_SYS_POLL_H
33
- #include <sys/poll.h>
34
- #endif
35
- #ifdef HAVE_SYS_SELECT_H
36
- #include <sys/select.h>
37
- #endif
38
-
39
31
PG_MODULE_MAGIC ;
40
32
41
33
void _PG_init (void );
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
59
51
static void libpqrcv_disconnect (void );
60
52
61
53
/* Prototypes for private functions */
62
- static bool libpq_select (int timeout_ms );
63
54
static PGresult * libpqrcv_PQexec (const char * query );
64
55
65
56
/*
@@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
366
357
PQclear (res );
367
358
}
368
359
369
- /*
370
- * Wait until we can read WAL stream, or timeout.
371
- *
372
- * Returns true if data has become available for reading, false if timed out
373
- * or interrupted by signal.
374
- *
375
- * This is based on pqSocketCheck.
376
- */
377
- static bool
378
- libpq_select (int timeout_ms )
379
- {
380
- int ret ;
381
-
382
- Assert (streamConn != NULL );
383
- if (PQsocket (streamConn ) < 0 )
384
- ereport (ERROR ,
385
- (errcode_for_socket_access (),
386
- errmsg ("invalid socket: %s" , PQerrorMessage (streamConn ))));
387
-
388
- /* We use poll(2) if available, otherwise select(2) */
389
- {
390
- #ifdef HAVE_POLL
391
- struct pollfd input_fd ;
392
-
393
- input_fd .fd = PQsocket (streamConn );
394
- input_fd .events = POLLIN | POLLERR ;
395
- input_fd .revents = 0 ;
396
-
397
- ret = poll (& input_fd , 1 , timeout_ms );
398
- #else /* !HAVE_POLL */
399
-
400
- fd_set input_mask ;
401
- struct timeval timeout ;
402
- struct timeval * ptr_timeout ;
403
-
404
- FD_ZERO (& input_mask );
405
- FD_SET (PQsocket (streamConn ), & input_mask );
406
-
407
- if (timeout_ms < 0 )
408
- ptr_timeout = NULL ;
409
- else
410
- {
411
- timeout .tv_sec = timeout_ms / 1000 ;
412
- timeout .tv_usec = (timeout_ms % 1000 ) * 1000 ;
413
- ptr_timeout = & timeout ;
414
- }
415
-
416
- ret = select (PQsocket (streamConn ) + 1 , & input_mask ,
417
- NULL , NULL , ptr_timeout );
418
- #endif /* HAVE_POLL */
419
- }
420
-
421
- if (ret == 0 || (ret < 0 && errno == EINTR ))
422
- return false;
423
- if (ret < 0 )
424
- ereport (ERROR ,
425
- (errcode_for_socket_access (),
426
- errmsg ("select() failed: %m" )));
427
- return true;
428
- }
429
-
430
360
/*
431
361
* Send a query and wait for the results by using the asynchronous libpq
432
362
* functions and the backend version of select().
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
470
400
*/
471
401
while (PQisBusy (streamConn ))
472
402
{
403
+ int rc ;
404
+
473
405
/*
474
406
* We don't need to break down the sleep into smaller increments,
475
- * and check for interrupts after each nap, since we can just
476
- * elog(FATAL) within SIGTERM signal handler if the signal arrives
477
- * in the middle of establishment of replication connection.
407
+ * since we'll get interrupted by signals and can either handle
408
+ * interrupts here or elog(FATAL) within SIGTERM signal handler if
409
+ * the signal arrives in the middle of establishment of
410
+ * replication connection.
478
411
*/
479
- if (!libpq_select (-1 ))
480
- continue ; /* interrupted */
412
+ ResetLatch (& MyProc -> procLatch );
413
+ rc = WaitLatchOrSocket (& MyProc -> procLatch ,
414
+ WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
415
+ WL_LATCH_SET ,
416
+ PQsocket (streamConn ),
417
+ 0 ,
418
+ WAIT_EVENT_LIBPQWALRECEIVER_READ );
419
+ if (rc & WL_POSTMASTER_DEATH )
420
+ exit (1 );
421
+
422
+ /* interrupted */
423
+ if (rc & WL_LATCH_SET )
424
+ {
425
+ CHECK_FOR_INTERRUPTS ();
426
+ continue ;
427
+ }
481
428
if (PQconsumeInput (streamConn ) == 0 )
482
429
return NULL ; /* trouble */
483
430
}
0 commit comments