Skip to content

Commit 7834d20

Browse files
committed
Avoid slow shutdown of pg_basebackup.
pg_basebackup's child process did not pay any attention to the pipe from its parent while waiting for input from the source server. If no server data was arriving, it would only wake up and check the pipe every standby_message_timeout or so. This creates a problem since the parent process might determine and send the desired stop position only after the server has reached end-of-WAL and stopped sending data. In the src/test/recovery regression tests, the timing is repeatably such that it takes nearly 10 seconds for the child process to realize that it should shut down. It's not clear how often that would happen in real-world cases, but it sure seems like a bug --- and if the user turns off standby_message_timeout or sets it very large, the delay could be a lot worse. To fix, expand the StreamCtl API to allow the pipe input FD to be passed down to the low-level wait routine, and watch both sockets when sleeping. (Note: AFAICS this issue doesn't affect the Windows port, since it doesn't rely on a pipe to transfer the stop position to the child thread.) Discussion: https://postgr.es/m/6456.1493263884@sss.pgh.pa.us
1 parent 9f11fce commit 7834d20

File tree

4 files changed

+59
-28
lines changed

4 files changed

+59
-28
lines changed

src/bin/pg_basebackup/pg_basebackup.c

+5
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,11 @@ LogStreamerMain(logstreamer_param *param)
480480
stream.timeline = param->timeline;
481481
stream.sysidentifier = param->sysidentifier;
482482
stream.stream_stop = reached_end_position;
483+
#ifndef WIN32
484+
stream.stop_socket = bgpipe[0];
485+
#else
486+
stream.stop_socket = PGINVALID_SOCKET;
487+
#endif
483488
stream.standby_message_timeout = standby_message_timeout;
484489
stream.synchronous = false;
485490
stream.do_sync = do_sync;

src/bin/pg_basebackup/pg_receivewal.c

+1
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ StreamLog(void)
409409
stream.timeline);
410410

411411
stream.stream_stop = stop_streaming;
412+
stream.stop_socket = PGINVALID_SOCKET;
412413
stream.standby_message_timeout = standby_message_timeout;
413414
stream.synchronous = synchronous;
414415
stream.do_sync = true;

src/bin/pg_basebackup/receivelog.c

+50-28
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ static bool still_sending = true; /* feedback still needs to be sent? */
3939

4040
static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
4141
XLogRecPtr *stoppos);
42-
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
43-
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
42+
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
43+
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
44+
char **buffer);
4445
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
4546
int len, XLogRecPtr blockpos, TimestampTz *last_status);
4647
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
@@ -417,8 +418,15 @@ CheckServerVersionForStreaming(PGconn *conn)
417418
* return. As long as it returns false, streaming will continue
418419
* indefinitely.
419420
*
421+
* If stream_stop() checks for external input, stop_socket should be set to
422+
* the FD it checks. This will allow such input to be detected promptly
423+
* rather than after standby_message_timeout (which might be indefinite).
424+
* Note that signals will interrupt waits for input as well, but that is
425+
* race-y since a signal received while busy won't interrupt the wait.
426+
*
420427
* standby_message_timeout controls how often we send a message
421428
* back to the master letting it know our progress, in milliseconds.
429+
* Zero means no messages are sent.
422430
* This message will only contain the write location, and never
423431
* flush or replay.
424432
*
@@ -825,7 +833,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
825833
sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
826834
last_status);
827835

828-
r = CopyStreamReceive(conn, sleeptime, &copybuf);
836+
r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
829837
while (r != 0)
830838
{
831839
if (r == -1)
@@ -870,7 +878,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
870878
* Process the received data, and any subsequent data we can read
871879
* without blocking.
872880
*/
873-
r = CopyStreamReceive(conn, 0, &copybuf);
881+
r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
874882
}
875883
}
876884

@@ -881,28 +889,39 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
881889
}
882890

883891
/*
884-
* Wait until we can read CopyData message, or timeout.
892+
* Wait until we can read a CopyData message,
893+
* or timeout, or occurrence of a signal or input on the stop_socket.
894+
* (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
885895
*
886896
* Returns 1 if data has become available for reading, 0 if timed out
887-
* or interrupted by signal, and -1 on an error.
897+
* or interrupted by signal or stop_socket input, and -1 on an error.
888898
*/
889899
static int
890-
CopyStreamPoll(PGconn *conn, long timeout_ms)
900+
CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
891901
{
892902
int ret;
893903
fd_set input_mask;
904+
int connsocket;
905+
int maxfd;
894906
struct timeval timeout;
895907
struct timeval *timeoutptr;
896908

897-
if (PQsocket(conn) < 0)
909+
connsocket = PQsocket(conn);
910+
if (connsocket < 0)
898911
{
899912
fprintf(stderr, _("%s: invalid socket: %s"), progname,
900913
PQerrorMessage(conn));
901914
return -1;
902915
}
903916

904917
FD_ZERO(&input_mask);
905-
FD_SET(PQsocket(conn), &input_mask);
918+
FD_SET(connsocket, &input_mask);
919+
maxfd = connsocket;
920+
if (stop_socket != PGINVALID_SOCKET)
921+
{
922+
FD_SET(stop_socket, &input_mask);
923+
maxfd = Max(maxfd, stop_socket);
924+
}
906925

907926
if (timeout_ms < 0)
908927
timeoutptr = NULL;
@@ -913,17 +932,20 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
913932
timeoutptr = &timeout;
914933
}
915934

916-
ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
917-
if (ret == 0 || (ret < 0 && errno == EINTR))
918-
return 0; /* Got a timeout or signal */
919-
else if (ret < 0)
935+
ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
936+
937+
if (ret < 0)
920938
{
939+
if (errno == EINTR)
940+
return 0; /* Got a signal, so not an error */
921941
fprintf(stderr, _("%s: select() failed: %s\n"),
922942
progname, strerror(errno));
923943
return -1;
924944
}
945+
if (ret > 0 && FD_ISSET(connsocket, &input_mask))
946+
return 1; /* Got input on connection socket */
925947

926-
return 1;
948+
return 0; /* Got timeout or input on stop_socket */
927949
}
928950

929951
/*
@@ -934,11 +956,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
934956
* point to a buffer holding the received message. The buffer is only valid
935957
* until the next CopyStreamReceive call.
936958
*
937-
* 0 if no data was available within timeout, or wait was interrupted
938-
* by signal. -1 on error. -2 if the server ended the COPY.
959+
* Returns 0 if no data was available within timeout, or if wait was
960+
* interrupted by signal or stop_socket input.
961+
* -1 on error. -2 if the server ended the COPY.
939962
*/
940963
static int
941-
CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
964+
CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
965+
char **buffer)
942966
{
943967
char *copybuf = NULL;
944968
int rawlen;
@@ -951,20 +975,18 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
951975
rawlen = PQgetCopyData(conn, &copybuf, 1);
952976
if (rawlen == 0)
953977
{
978+
int ret;
979+
954980
/*
955-
* No data available. Wait for some to appear, but not longer than the
956-
* specified timeout, so that we can ping the server.
981+
* No data available. Wait for some to appear, but not longer than
982+
* the specified timeout, so that we can ping the server. Also stop
983+
* waiting if input appears on stop_socket.
957984
*/
958-
if (timeout != 0)
959-
{
960-
int ret;
961-
962-
ret = CopyStreamPoll(conn, timeout);
963-
if (ret <= 0)
964-
return ret;
965-
}
985+
ret = CopyStreamPoll(conn, timeout, stop_socket);
986+
if (ret <= 0)
987+
return ret;
966988

967-
/* Else there is actually data on the socket */
989+
/* Now there is actually data on the socket */
968990
if (PQconsumeInput(conn) == 0)
969991
{
970992
fprintf(stderr,

src/bin/pg_basebackup/receivelog.h

+3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ typedef struct StreamCtl
4242

4343
stream_stop_callback stream_stop; /* Stop streaming when returns true */
4444

45+
pgsocket stop_socket; /* if valid, watch for input on this socket
46+
* and check stream_stop() when there is any */
47+
4548
WalWriteMethod *walmethod; /* How to write the WAL */
4649
char *partial_suffix; /* Suffix appended to partially received files */
4750
char *replication_slot; /* Replication slot to use, or NULL */

0 commit comments

Comments
 (0)