Skip to content

Commit 74cbe96

Browse files
committed
Refactor pg_receivexlog main loop code, for readability.
Previously the source codes for receiving the data and for polling the socket were included in pg_receivexlog main loop. This commit splits out them as separate functions. This is useful for improving the readability of main loop code and making the future pg_receivexlog-related patch simpler.
1 parent 644d853 commit 74cbe96

File tree

1 file changed

+142
-67
lines changed

1 file changed

+142
-67
lines changed

src/bin/pg_basebackup/receivelog.c

Lines changed: 142 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
3535
uint32 timeline, char *basedir,
3636
stream_stop_callback stream_stop, int standby_message_timeout,
3737
char *partial_suffix, XLogRecPtr *stoppos);
38+
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
39+
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
3840

3941
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
4042
uint32 *timeline);
@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
744746
int bytes_written;
745747
int64 now;
746748
int hdr_len;
747-
748-
if (copybuf != NULL)
749-
{
750-
PQfreemem(copybuf);
751-
copybuf = NULL;
752-
}
749+
long sleeptime;
753750

754751
/*
755752
* Check if we should continue streaming, or abort at this point.
@@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
784781
last_status = now;
785782
}
786783

787-
r = PQgetCopyData(conn, &copybuf, 1);
788-
if (r == 0)
784+
/*
785+
* Compute how long send/receive loops should sleep
786+
*/
787+
if (standby_message_timeout && still_sending)
789788
{
790-
/*
791-
* No data available. Wait for some to appear, but not longer than
792-
* the specified timeout, so that we can ping the server.
793-
*/
794-
fd_set input_mask;
795-
struct timeval timeout;
796-
struct timeval *timeoutptr;
797-
798-
FD_ZERO(&input_mask);
799-
FD_SET(PQsocket(conn), &input_mask);
800-
if (standby_message_timeout && still_sending)
789+
int64 targettime;
790+
long secs;
791+
int usecs;
792+
793+
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
794+
feTimestampDifference(now,
795+
targettime,
796+
&secs,
797+
&usecs);
798+
/* Always sleep at least 1 sec */
799+
if (secs <= 0)
801800
{
802-
int64 targettime;
803-
long secs;
804-
int usecs;
805-
806-
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
807-
feTimestampDifference(now,
808-
targettime,
809-
&secs,
810-
&usecs);
811-
if (secs <= 0)
812-
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
813-
else
814-
timeout.tv_sec = secs;
815-
timeout.tv_usec = usecs;
816-
timeoutptr = &timeout;
801+
secs = 1;
802+
usecs = 0;
817803
}
818-
else
819-
timeoutptr = NULL;
820804

821-
r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
822-
if (r == 0 || (r < 0 && errno == EINTR))
823-
{
824-
/*
825-
* Got a timeout or signal. Continue the loop and either
826-
* deliver a status packet to the server or just go back into
827-
* blocking.
828-
*/
829-
continue;
830-
}
831-
else if (r < 0)
832-
{
833-
fprintf(stderr, _("%s: select() failed: %s\n"),
834-
progname, strerror(errno));
835-
goto error;
836-
}
837-
/* Else there is actually data on the socket */
838-
if (PQconsumeInput(conn) == 0)
839-
{
840-
fprintf(stderr,
841-
_("%s: could not receive data from WAL stream: %s"),
842-
progname, PQerrorMessage(conn));
843-
goto error;
844-
}
845-
continue;
805+
sleeptime = secs * 1000 + usecs / 1000;
846806
}
807+
else
808+
sleeptime = -1;
809+
810+
r = CopyStreamReceive(conn, sleeptime, &copybuf);
811+
if (r == 0)
812+
continue;
847813
if (r == -1)
814+
goto error;
815+
if (r == -2)
848816
{
849817
PGresult *res = PQgetResult(conn);
850818

@@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
877845
}
878846
if (copybuf != NULL)
879847
PQfreemem(copybuf);
848+
copybuf = NULL;
880849
*stoppos = blockpos;
881850
return res;
882851
}
883-
if (r == -2)
884-
{
885-
fprintf(stderr, _("%s: could not read COPY data: %s"),
886-
progname, PQerrorMessage(conn));
887-
goto error;
888-
}
889852

890853
/* Check the message type. */
891854
if (copybuf[0] == 'k')
@@ -1056,3 +1019,115 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
10561019
PQfreemem(copybuf);
10571020
return NULL;
10581021
}
1022+
1023+
/*
1024+
* Wait until we can read CopyData message, or timeout.
1025+
*
1026+
* Returns 1 if data has become available for reading, 0 if timed out
1027+
* or interrupted by signal, and -1 on an error.
1028+
*/
1029+
static int
1030+
CopyStreamPoll(PGconn *conn, long timeout_ms)
1031+
{
1032+
int ret;
1033+
fd_set input_mask;
1034+
struct timeval timeout;
1035+
struct timeval *timeoutptr;
1036+
1037+
if (PQsocket(conn) < 0)
1038+
{
1039+
fprintf(stderr, _("%s: socket not open"), progname);
1040+
return -1;
1041+
}
1042+
1043+
FD_ZERO(&input_mask);
1044+
FD_SET(PQsocket(conn), &input_mask);
1045+
1046+
if (timeout_ms < 0)
1047+
timeoutptr = NULL;
1048+
else
1049+
{
1050+
timeout.tv_sec = timeout_ms / 1000L;
1051+
timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
1052+
timeoutptr = &timeout;
1053+
}
1054+
1055+
ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
1056+
if (ret == 0 || (ret < 0 && errno == EINTR))
1057+
return 0; /* Got a timeout or signal */
1058+
else if (ret < 0)
1059+
{
1060+
fprintf(stderr, _("%s: select() failed: %s\n"),
1061+
progname, strerror(errno));
1062+
return -1;
1063+
}
1064+
1065+
return 1;
1066+
}
1067+
1068+
/*
1069+
* Receive CopyData message available from XLOG stream, blocking for
1070+
* maximum of 'timeout' ms.
1071+
*
1072+
* If data was received, returns the length of the data. *buffer is set to
1073+
* point to a buffer holding the received message. The buffer is only valid
1074+
* until the next CopyStreamReceive call.
1075+
*
1076+
* 0 if no data was available within timeout, or wait was interrupted
1077+
* by signal. -1 on error. -2 if the server ended the COPY.
1078+
*/
1079+
static int
1080+
CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
1081+
{
1082+
static char *copybuf = NULL;
1083+
int rawlen;
1084+
1085+
if (copybuf != NULL)
1086+
PQfreemem(copybuf);
1087+
copybuf = NULL;
1088+
*buffer = NULL;
1089+
1090+
/* Try to receive a CopyData message */
1091+
rawlen = PQgetCopyData(conn, &copybuf, 1);
1092+
if (rawlen == 0)
1093+
{
1094+
/*
1095+
* No data available. Wait for some to appear, but not longer than
1096+
* the specified timeout, so that we can ping the server.
1097+
*/
1098+
if (timeout > 0)
1099+
{
1100+
int ret;
1101+
1102+
ret = CopyStreamPoll(conn, timeout);
1103+
if (ret <= 0)
1104+
return ret;
1105+
}
1106+
1107+
/* Else there is actually data on the socket */
1108+
if (PQconsumeInput(conn) == 0)
1109+
{
1110+
fprintf(stderr,
1111+
_("%s: could not receive data from WAL stream: %s"),
1112+
progname, PQerrorMessage(conn));
1113+
return -1;
1114+
}
1115+
1116+
/* Now that we've consumed some input, try again */
1117+
rawlen = PQgetCopyData(conn, &copybuf, 1);
1118+
if (rawlen == 0)
1119+
return 0;
1120+
}
1121+
if (rawlen == -1) /* end-of-streaming or error */
1122+
return -2;
1123+
if (rawlen == -2)
1124+
{
1125+
fprintf(stderr, _("%s: could not read COPY data: %s"),
1126+
progname, PQerrorMessage(conn));
1127+
return -1;
1128+
}
1129+
1130+
/* Return received messages to caller */
1131+
*buffer = copybuf;
1132+
return rawlen;
1133+
}

0 commit comments

Comments
 (0)