Skip to content

Commit 11abea3

Browse files
committed
Make logical WAL sender report streaming state appropriately
WAL senders sending logically-decoded data fail to properly report in "streaming" state when starting up, hence as long as one extra record is not replayed, such WAL senders would remain in a "catchup" state, which is inconsistent with the physical cousin. This can be easily reproduced by for example using pg_recvlogical and restarting the upstream server. The TAP tests have been slightly modified to detect the failure and strengthened so as future tests also make sure that a node is in streaming state when waiting for its catchup. Backpatch down to 9.4 where this code has been introduced. Reported-by: Sawada Masahiko Author: Simon Riggs, Sawada Masahiko Reviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi Prabakaran Discussion: https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com
1 parent c350320 commit 11abea3

File tree

1 file changed

+15
-5
lines changed

1 file changed

+15
-5
lines changed

src/backend/replication/walsender.c

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2168,7 +2168,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
21682168
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
21692169
{
21702170
ereport(DEBUG1,
2171-
(errmsg("standby \"%s\" has now caught up with primary",
2171+
(errmsg("\"%s\" has now caught up with upstream server",
21722172
application_name)));
21732173
WalSndSetState(WALSNDSTATE_STREAMING);
21742174
}
@@ -2757,10 +2757,10 @@ XLogSendLogical(void)
27572757
char *errm;
27582758

27592759
/*
2760-
* Don't know whether we've caught up yet. We'll set it to true in
2761-
* WalSndWaitForWal, if we're actually waiting. We also set to true if
2762-
* XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2763-
* i.e. when we're shutting down.
2760+
* Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2761+
* true in WalSndWaitForWal, if we're actually waiting. We also set to
2762+
* true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2763+
* didn't wait - i.e. when we're shutting down.
27642764
*/
27652765
WalSndCaughtUp = false;
27662766

@@ -2773,6 +2773,9 @@ XLogSendLogical(void)
27732773

27742774
if (record != NULL)
27752775
{
2776+
/* XXX: Note that logical decoding cannot be used while in recovery */
2777+
XLogRecPtr flushPtr = GetFlushRecPtr();
2778+
27762779
/*
27772780
* Note the lack of any call to LagTrackerWrite() which is handled by
27782781
* WalSndUpdateProgress which is called by output plugin through
@@ -2781,6 +2784,13 @@ XLogSendLogical(void)
27812784
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
27822785

27832786
sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2787+
2788+
/*
2789+
* If we have sent a record that is at or beyond the flushed point, we
2790+
* have caught up.
2791+
*/
2792+
if (sentPtr >= flushPtr)
2793+
WalSndCaughtUp = true;
27842794
}
27852795
else
27862796
{

0 commit comments

Comments
 (0)