Skip to content

Commit 597a87c

Browse files
committed
Use latch instead of select() in walreceiver
Replace use of poll()/select() by WaitLatchOrSocket(), which is more portable and flexible. Also change walreceiver to use its procLatch instead of a custom latch. From: Petr Jelinek <petr@2ndquadrant.com>
1 parent b999c24 commit 597a87c

File tree

6 files changed

+43
-89
lines changed

6 files changed

+43
-89
lines changed

src/backend/postmaster/pgstat.c

+3
Original file line numberDiff line numberDiff line change
@@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w)
33383338
case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
33393339
event_name = "WalReceiverWaitStart";
33403340
break;
3341+
case WAIT_EVENT_LIBPQWALRECEIVER_READ:
3342+
event_name = "LibPQWalReceiverRead";
3343+
break;
33413344
case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
33423345
event_name = "WalSenderWaitForWAL";
33433346
break;

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

+24-77
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,11 @@
2323
#include "pqexpbuffer.h"
2424
#include "access/xlog.h"
2525
#include "miscadmin.h"
26+
#include "pgstat.h"
2627
#include "replication/walreceiver.h"
28+
#include "storage/proc.h"
2729
#include "utils/builtins.h"
2830

29-
#ifdef HAVE_POLL_H
30-
#include <poll.h>
31-
#endif
32-
#ifdef HAVE_SYS_POLL_H
33-
#include <sys/poll.h>
34-
#endif
35-
#ifdef HAVE_SYS_SELECT_H
36-
#include <sys/select.h>
37-
#endif
38-
3931
PG_MODULE_MAGIC;
4032

4133
void _PG_init(void);
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
5951
static void libpqrcv_disconnect(void);
6052

6153
/* Prototypes for private functions */
62-
static bool libpq_select(int timeout_ms);
6354
static PGresult *libpqrcv_PQexec(const char *query);
6455

6556
/*
@@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
366357
PQclear(res);
367358
}
368359

369-
/*
370-
* Wait until we can read WAL stream, or timeout.
371-
*
372-
* Returns true if data has become available for reading, false if timed out
373-
* or interrupted by signal.
374-
*
375-
* This is based on pqSocketCheck.
376-
*/
377-
static bool
378-
libpq_select(int timeout_ms)
379-
{
380-
int ret;
381-
382-
Assert(streamConn != NULL);
383-
if (PQsocket(streamConn) < 0)
384-
ereport(ERROR,
385-
(errcode_for_socket_access(),
386-
errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
387-
388-
/* We use poll(2) if available, otherwise select(2) */
389-
{
390-
#ifdef HAVE_POLL
391-
struct pollfd input_fd;
392-
393-
input_fd.fd = PQsocket(streamConn);
394-
input_fd.events = POLLIN | POLLERR;
395-
input_fd.revents = 0;
396-
397-
ret = poll(&input_fd, 1, timeout_ms);
398-
#else /* !HAVE_POLL */
399-
400-
fd_set input_mask;
401-
struct timeval timeout;
402-
struct timeval *ptr_timeout;
403-
404-
FD_ZERO(&input_mask);
405-
FD_SET(PQsocket(streamConn), &input_mask);
406-
407-
if (timeout_ms < 0)
408-
ptr_timeout = NULL;
409-
else
410-
{
411-
timeout.tv_sec = timeout_ms / 1000;
412-
timeout.tv_usec = (timeout_ms % 1000) * 1000;
413-
ptr_timeout = &timeout;
414-
}
415-
416-
ret = select(PQsocket(streamConn) + 1, &input_mask,
417-
NULL, NULL, ptr_timeout);
418-
#endif /* HAVE_POLL */
419-
}
420-
421-
if (ret == 0 || (ret < 0 && errno == EINTR))
422-
return false;
423-
if (ret < 0)
424-
ereport(ERROR,
425-
(errcode_for_socket_access(),
426-
errmsg("select() failed: %m")));
427-
return true;
428-
}
429-
430360
/*
431361
* Send a query and wait for the results by using the asynchronous libpq
432362
* functions and the backend version of select().
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
470400
*/
471401
while (PQisBusy(streamConn))
472402
{
403+
int rc;
404+
473405
/*
474406
* We don't need to break down the sleep into smaller increments,
475-
* and check for interrupts after each nap, since we can just
476-
* elog(FATAL) within SIGTERM signal handler if the signal arrives
477-
* in the middle of establishment of replication connection.
407+
* since we'll get interrupted by signals and can either handle
408+
* interrupts here or elog(FATAL) within SIGTERM signal handler if
409+
* the signal arrives in the middle of establishment of
410+
* replication connection.
478411
*/
479-
if (!libpq_select(-1))
480-
continue; /* interrupted */
412+
ResetLatch(&MyProc->procLatch);
413+
rc = WaitLatchOrSocket(&MyProc->procLatch,
414+
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
415+
WL_LATCH_SET,
416+
PQsocket(streamConn),
417+
0,
418+
WAIT_EVENT_LIBPQWALRECEIVER_READ);
419+
if (rc & WL_POSTMASTER_DEATH)
420+
exit(1);
421+
422+
/* interrupted */
423+
if (rc & WL_LATCH_SET)
424+
{
425+
CHECK_FOR_INTERRUPTS();
426+
continue;
427+
}
481428
if (PQconsumeInput(streamConn) == 0)
482429
return NULL; /* trouble */
483430
}

src/backend/replication/walreceiver.c

+10-8
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ WalReceiverMain(void)
261261
/* Arrange to clean up at walreceiver exit */
262262
on_shmem_exit(WalRcvDie, 0);
263263

264-
OwnLatch(&walrcv->latch);
264+
walrcv->latch = &MyProc->procLatch;
265265

266266
/* Properly accept or ignore signals the postmaster might send us */
267267
pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
@@ -483,15 +483,15 @@ WalReceiverMain(void)
483483
* avoiding some system calls.
484484
*/
485485
Assert(wait_fd != PGINVALID_SOCKET);
486-
rc = WaitLatchOrSocket(&walrcv->latch,
486+
rc = WaitLatchOrSocket(walrcv->latch,
487487
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
488488
WL_TIMEOUT | WL_LATCH_SET,
489489
wait_fd,
490490
NAPTIME_PER_CYCLE,
491491
WAIT_EVENT_WAL_RECEIVER_MAIN);
492492
if (rc & WL_LATCH_SET)
493493
{
494-
ResetLatch(&walrcv->latch);
494+
ResetLatch(walrcv->latch);
495495
if (walrcv->force_reply)
496496
{
497497
/*
@@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
652652
WakeupRecovery();
653653
for (;;)
654654
{
655-
ResetLatch(&walrcv->latch);
655+
ResetLatch(walrcv->latch);
656656

657657
/*
658658
* Emergency bailout if postmaster has died. This is to avoid the
@@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
687687
}
688688
SpinLockRelease(&walrcv->mutex);
689689

690-
WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
690+
WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
691691
WAIT_EVENT_WAL_RECEIVER_WAIT_START);
692692
}
693693

@@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg)
763763
/* Ensure that all WAL records received are flushed to disk */
764764
XLogWalRcvFlush(true);
765765

766-
DisownLatch(&walrcv->latch);
766+
walrcv->latch = NULL;
767767

768768
SpinLockAcquire(&walrcv->mutex);
769769
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
812812

813813
got_SIGTERM = true;
814814

815-
SetLatch(&WalRcv->latch);
815+
if (WalRcv->latch)
816+
SetLatch(WalRcv->latch);
816817

817818
/* Don't joggle the elbow of proc_exit */
818819
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
@@ -1297,7 +1298,8 @@ void
12971298
WalRcvForceReply(void)
12981299
{
12991300
WalRcv->force_reply = true;
1300-
SetLatch(&WalRcv->latch);
1301+
if (WalRcv->latch)
1302+
SetLatch(WalRcv->latch);
13011303
}
13021304

13031305
/*

src/backend/replication/walreceiverfuncs.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ WalRcvShmemInit(void)
6464
MemSet(WalRcv, 0, WalRcvShmemSize());
6565
WalRcv->walRcvState = WALRCV_STOPPED;
6666
SpinLockInit(&WalRcv->mutex);
67-
InitSharedLatch(&WalRcv->latch);
67+
WalRcv->latch = NULL;
6868
}
6969
}
7070

@@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
279279

280280
if (launch)
281281
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
282-
else
283-
SetLatch(&walrcv->latch);
282+
else if (walrcv->latch)
283+
SetLatch(walrcv->latch);
284284
}
285285

286286
/*

src/include/pgstat.h

+1
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,7 @@ typedef enum
763763
WAIT_EVENT_CLIENT_WRITE,
764764
WAIT_EVENT_SSL_OPEN_SERVER,
765765
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
766+
WAIT_EVENT_LIBPQWALRECEIVER_READ,
766767
WAIT_EVENT_WAL_SENDER_WAIT_WAL,
767768
WAIT_EVENT_WAL_SENDER_WRITE_DATA
768769
} WaitEventClient;

src/include/replication/walreceiver.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,9 @@ typedef struct
127127
* where to start streaming (after setting receiveStart and
128128
* receiveStartTLI), and also to tell it to send apply feedback to the
129129
* primary whenever specially marked commit records are applied.
130+
* This is normally mapped to procLatch when walreceiver is running.
130131
*/
131-
Latch latch;
132+
Latch *latch;
132133
} WalRcvData;
133134

134135
extern WalRcvData *WalRcv;

0 commit comments

Comments
 (0)