Skip to content

Commit d5eb1fe

Browse files
committed
Make logical WAL sender report streaming state appropriately
WAL senders sending logically-decoded data fail to properly report in "streaming" state when starting up, hence as long as one extra record is not replayed, such WAL senders would remain in a "catchup" state, which is inconsistent with the physical cousin. This can be easily reproduced by for example using pg_recvlogical and restarting the upstream server. The TAP tests have been slightly modified to detect the failure and strengthened so as future tests also make sure that a node is in streaming state when waiting for its catchup. Backpatch down to 9.4 where this code has been introduced. Reported-by: Sawada Masahiko Author: Simon Riggs, Sawada Masahiko Reviewed-by: Petr Jelinek, Michael Paquier, Vaishnavi Prabakaran Discussion: https://postgr.es/m/CAD21AoB2ZbCCqOx=bgKMcLrAvs1V0ZMqzs7wBTuDySezTGtMZA@mail.gmail.com
1 parent 4b8860e commit d5eb1fe

File tree

1 file changed

+16
-6
lines changed

1 file changed

+16
-6
lines changed

src/backend/replication/walsender.c

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1926,8 +1926,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
19261926
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
19271927
{
19281928
ereport(DEBUG1,
1929-
(errmsg("standby \"%s\" has now caught up with primary",
1930-
application_name)));
1929+
(errmsg("\"%s\" has now caught up with upstream server",
1930+
application_name)));
19311931
WalSndSetState(WALSNDSTATE_STREAMING);
19321932
}
19331933

@@ -2483,10 +2483,10 @@ XLogSendLogical(void)
24832483
char *errm;
24842484

24852485
/*
2486-
* Don't know whether we've caught up yet. We'll set it to true in
2487-
* WalSndWaitForWal, if we're actually waiting. We also set to true if
2488-
* XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2489-
* i.e. when we're shutting down.
2486+
* Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2487+
* true in WalSndWaitForWal, if we're actually waiting. We also set to
2488+
* true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2489+
* didn't wait - i.e. when we're shutting down.
24902490
*/
24912491
WalSndCaughtUp = false;
24922492

@@ -2499,9 +2499,19 @@ XLogSendLogical(void)
24992499

25002500
if (record != NULL)
25012501
{
2502+
/* XXX: Note that logical decoding cannot be used while in recovery */
2503+
XLogRecPtr flushPtr = GetFlushRecPtr();
2504+
25022505
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
25032506

25042507
sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2508+
2509+
/*
2510+
* If we have sent a record that is at or beyond the flushed point, we
2511+
* have caught up.
2512+
*/
2513+
if (sentPtr >= flushPtr)
2514+
WalSndCaughtUp = true;
25052515
}
25062516
else
25072517
{

0 commit comments

Comments
 (0)