Skip to content

Commit a9dad56

Browse files
committed
Teach pg_basebackup and pg_receivexlog to reply to server keepalives.
Without this, the connection will be killed after timeout if wal_sender_timeout is set in the server. Original patch by Amit Kapila, modified by me to fit recent changes in the code.
1 parent 9e45e03 commit a9dad56

File tree

3 files changed

+45
-16
lines changed

3 files changed

+45
-16
lines changed

doc/src/sgml/ref/pg_basebackup.sgml

+4-4
Original file line numberDiff line numberDiff line change
@@ -377,10 +377,10 @@ PostgreSQL documentation
377377
<listitem>
378378
<para>
379379
Specifies the number of seconds between status packets sent back to the
380-
server. This is required when streaming the transaction log (using
381-
<literal>--xlog=stream</literal>) if replication timeout is configured
382-
on the server, and allows for easier monitoring. A value of zero disables
383-
the status updates completely. The default value is 10 seconds.
380+
server. This allows for easier monitoring of the progress from server.
381+
A value of zero disables the periodic status updates completely,
382+
although an update will still be sent when requested by the server, to
383+
avoid timeout disconnect. The default value is 10 seconds.
384384
</para>
385385
</listitem>
386386
</varlistentry>

doc/src/sgml/ref/pg_receivexlog.sgml

+4-3
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,10 @@ PostgreSQL documentation
155155
<listitem>
156156
<para>
157157
Specifies the number of seconds between status packets sent back to the
158-
server. This is required if replication timeout is configured on the
159-
server, and allows for easier monitoring. A value of zero disables the
160-
status updates completely. The default value is 10 seconds.
158+
server. This allows for easier monitoring of the progress from server.
159+
A value of zero disables the periodic status updates completely,
160+
although an update will still be sent when requested by the server, to
161+
avoid timeout disconnect. The default value is 10 seconds.
161162
</para>
162163
</listitem>
163164
</varlistentry>

src/bin/pg_basebackup/receivelog.c

+37-9
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ recvint64(char *buf)
287287
* Send a Standby Status Update message to server.
288288
*/
289289
static bool
290-
sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
290+
sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
291291
{
292292
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
293293
int len = 0;
@@ -302,7 +302,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
302302
len += 8;
303303
sendint64(now, &replybuf[len]); /* sendTime */
304304
len += 8;
305-
replybuf[len] = 0; /* replyRequested */
305+
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
306306
len += 1;
307307

308308
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
@@ -413,6 +413,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
413413
int bytes_left;
414414
int bytes_written;
415415
int64 now;
416+
int hdr_len;
416417

417418
if (copybuf != NULL)
418419
{
@@ -441,7 +442,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
441442
standby_message_timeout))
442443
{
443444
/* Time to send feedback! */
444-
if (!sendFeedback(conn, blockpos, now))
445+
if (!sendFeedback(conn, blockpos, now, false))
445446
goto error;
446447
last_status = now;
447448
}
@@ -520,10 +521,34 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
520521
/* Check the message type. */
521522
if (copybuf[0] == 'k')
522523
{
524+
int pos;
525+
bool replyRequested;
526+
523527
/*
524-
* keepalive message, sent in 9.2 and newer. We just ignore this
525-
* message completely, but need to skip past it in the stream.
528+
* Parse the keepalive message, enclosed in the CopyData message.
529+
* We just check if the server requested a reply, and ignore the
530+
* rest.
526531
*/
532+
pos = 1; /* skip msgtype 'k' */
533+
pos += 8; /* skip walEnd */
534+
pos += 8; /* skip sendTime */
535+
536+
if (r < pos + 1)
537+
{
538+
fprintf(stderr, _("%s: streaming header too small: %d\n"),
539+
progname, r);
540+
goto error;
541+
}
542+
replyRequested = copybuf[pos];
543+
544+
/* If the server requested an immediate reply, send one. */
545+
if (replyRequested)
546+
{
547+
now = localGetCurrentTimestamp();
548+
if (!sendFeedback(conn, blockpos, now, false))
549+
goto error;
550+
last_status = now;
551+
}
527552
continue;
528553
}
529554
else if (copybuf[0] != 'w')
@@ -538,8 +563,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
538563
* message. We only need the WAL location field (dataStart), the rest
539564
* of the header is ignored.
540565
*/
541-
#define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */)
542-
if (r < STREAMING_HEADER_SIZE + 1)
566+
hdr_len = 1; /* msgtype 'w' */
567+
hdr_len += 8; /* dataStart */
568+
hdr_len += 8; /* walEnd */
569+
hdr_len += 8; /* sendTime */
570+
if (r < hdr_len + 1)
543571
{
544572
fprintf(stderr, _("%s: streaming header too small: %d\n"),
545573
progname, r);
@@ -578,7 +606,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
578606
}
579607
}
580608

581-
bytes_left = r - STREAMING_HEADER_SIZE;
609+
bytes_left = r - hdr_len;
582610
bytes_written = 0;
583611

584612
while (bytes_left)
@@ -604,7 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
604632
}
605633

606634
if (write(walfile,
607-
copybuf + STREAMING_HEADER_SIZE + bytes_written,
635+
copybuf + hdr_len + bytes_written,
608636
bytes_to_write) != bytes_to_write)
609637
{
610638
fprintf(stderr,

0 commit comments

Comments
 (0)