Skip to content

Commit abd5071

Browse files
committed
Fix minor bug in logical-replication walsender shutdown
Logical walsender should exit when it catches up with sending WAL during shutdown; but there was a rare corner case when it failed to because of a race condition that puts it back to wait for more WAL instead -- but since there wasn't any, it'd not shut down immediately. It would only continue the shutdown when wal_sender_timeout terminates the sleep, which causes annoying waits during shutdown procedure. Restructure the code so that we no longer forget to set WalSndCaughtUp in that case. This was an oversight in commit c6c3334. Backpatch all the way down to 9.4. Author: Craig Ringer, Álvaro Herrera Discussion: https://postgr.es/m/CAMsr+YEuz4XwZX_QmnX_-2530XhyAmnK=zCmicEnq1vLr0aZ-g@mail.gmail.com
1 parent 080cf32 commit abd5071

File tree

1 file changed

+18
-29
lines changed

1 file changed

+18
-29
lines changed

src/backend/replication/walsender.c

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,9 +1168,8 @@ WalSndWaitForWal(XLogRecPtr loc)
11681168
int wakeEvents;
11691169
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
11701170

1171-
11721171
/*
1173-
* Fast path to avoid acquiring the spinlock in the we already know we
1172+
* Fast path to avoid acquiring the spinlock in case we already know we
11741173
* have enough WAL available. This is particularly interesting if we're
11751174
* far behind.
11761175
*/
@@ -2488,6 +2487,7 @@ XLogSendLogical(void)
24882487
{
24892488
XLogRecord *record;
24902489
char *errm;
2490+
XLogRecPtr flushPtr;
24912491

24922492
/*
24932493
* Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
@@ -2504,40 +2504,29 @@ XLogSendLogical(void)
25042504
if (errm != NULL)
25052505
elog(ERROR, "%s", errm);
25062506

2507+
/*
2508+
* We'll use the current flush point to determine whether we've caught up.
2509+
*/
2510+
flushPtr = GetFlushRecPtr();
2511+
25072512
if (record != NULL)
25082513
{
2509-
/* XXX: Note that logical decoding cannot be used while in recovery */
2510-
XLogRecPtr flushPtr = GetFlushRecPtr();
2511-
25122514
LogicalDecodingProcessRecord(logical_decoding_ctx, record);
25132515

25142516
sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2515-
2516-
/*
2517-
* If we have sent a record that is at or beyond the flushed point, we
2518-
* have caught up.
2519-
*/
2520-
if (sentPtr >= flushPtr)
2521-
WalSndCaughtUp = true;
25222517
}
2523-
else
2524-
{
2525-
/*
2526-
* If the record we just wanted read is at or beyond the flushed
2527-
* point, then we're caught up.
2528-
*/
2529-
if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2530-
{
2531-
WalSndCaughtUp = true;
25322518

2533-
/*
2534-
* Have WalSndLoop() terminate the connection in an orderly
2535-
* manner, after writing out all the pending data.
2536-
*/
2537-
if (got_STOPPING)
2538-
got_SIGUSR2 = true;
2539-
}
2540-
}
2519+
/* Set flag if we're caught up. */
2520+
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2521+
WalSndCaughtUp = true;
2522+
2523+
/*
2524+
* If we're caught up and have been requested to stop, have WalSndLoop()
2525+
* terminate the connection in an orderly manner, after writing out all
2526+
* the pending data.
2527+
*/
2528+
if (WalSndCaughtUp && got_STOPPING)
2529+
got_SIGUSR2 = true;
25412530

25422531
/* Update shared memory status */
25432532
{

0 commit comments

Comments
 (0)