@@ -287,7 +287,7 @@ recvint64(char *buf)
287
287
* Send a Standby Status Update message to server.
288
288
*/
289
289
static bool
290
- sendFeedback (PGconn * conn , XLogRecPtr blockpos , int64 now )
290
+ sendFeedback (PGconn * conn , XLogRecPtr blockpos , int64 now , bool replyRequested )
291
291
{
292
292
char replybuf [1 + 8 + 8 + 8 + 8 + 1 ];
293
293
int len = 0 ;
@@ -302,7 +302,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
302
302
len += 8 ;
303
303
sendint64 (now , & replybuf [len ]); /* sendTime */
304
304
len += 8 ;
305
- replybuf [len ] = 0 ; /* replyRequested */
305
+ replybuf [len ] = replyRequested ? 1 : 0 ; /* replyRequested */
306
306
len += 1 ;
307
307
308
308
if (PQputCopyData (conn , replybuf , len ) <= 0 || PQflush (conn ))
@@ -413,6 +413,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
413
413
int bytes_left ;
414
414
int bytes_written ;
415
415
int64 now ;
416
+ int hdr_len ;
416
417
417
418
if (copybuf != NULL )
418
419
{
@@ -441,7 +442,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
441
442
standby_message_timeout ))
442
443
{
443
444
/* Time to send feedback! */
444
- if (!sendFeedback (conn , blockpos , now ))
445
+ if (!sendFeedback (conn , blockpos , now , false ))
445
446
goto error ;
446
447
last_status = now ;
447
448
}
@@ -520,10 +521,34 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
520
521
/* Check the message type. */
521
522
if (copybuf [0 ] == 'k' )
522
523
{
524
+ int pos ;
525
+ bool replyRequested ;
526
+
523
527
/*
524
- * keepalive message, sent in 9.2 and newer. We just ignore this
525
- * message completely, but need to skip past it in the stream.
528
+ * Parse the keepalive message, enclosed in the CopyData message.
529
+ * We just check if the server requested a reply, and ignore the
530
+ * rest.
526
531
*/
532
+ pos = 1 ; /* skip msgtype 'k' */
533
+ pos += 8 ; /* skip walEnd */
534
+ pos += 8 ; /* skip sendTime */
535
+
536
+ if (r < pos + 1 )
537
+ {
538
+ fprintf (stderr , _ ("%s: streaming header too small: %d\n" ),
539
+ progname , r );
540
+ goto error ;
541
+ }
542
+ replyRequested = copybuf [pos ];
543
+
544
+ /* If the server requested an immediate reply, send one. */
545
+ if (replyRequested )
546
+ {
547
+ now = localGetCurrentTimestamp ();
548
+ if (!sendFeedback (conn , blockpos , now , false))
549
+ goto error ;
550
+ last_status = now ;
551
+ }
527
552
continue ;
528
553
}
529
554
else if (copybuf [0 ] != 'w' )
@@ -538,8 +563,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
538
563
* message. We only need the WAL location field (dataStart), the rest
539
564
* of the header is ignored.
540
565
*/
541
- #define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */ )
542
- if (r < STREAMING_HEADER_SIZE + 1 )
566
+ hdr_len = 1 ; /* msgtype 'w' */
567
+ hdr_len += 8 ; /* dataStart */
568
+ hdr_len += 8 ; /* walEnd */
569
+ hdr_len += 8 ; /* sendTime */
570
+ if (r < hdr_len + 1 )
543
571
{
544
572
fprintf (stderr , _ ("%s: streaming header too small: %d\n" ),
545
573
progname , r );
@@ -578,7 +606,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
578
606
}
579
607
}
580
608
581
- bytes_left = r - STREAMING_HEADER_SIZE ;
609
+ bytes_left = r - hdr_len ;
582
610
bytes_written = 0 ;
583
611
584
612
while (bytes_left )
@@ -604,7 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
604
632
}
605
633
606
634
if (write (walfile ,
607
- copybuf + STREAMING_HEADER_SIZE + bytes_written ,
635
+ copybuf + hdr_len + bytes_written ,
608
636
bytes_to_write ) != bytes_to_write )
609
637
{
610
638
fprintf (stderr ,
0 commit comments