Skip to content

Commit e101dfa

Browse files
committed
For cascading replication, wake physical and logical walsenders separately
Physical walsenders can't send data until it's been flushed; logical walsenders can't decode and send data until it's been applied. On the standby, the WAL is flushed first, which will only wake up physical walsenders; and then applied, which will only wake up logical walsenders. Previously, all walsenders were awakened when the WAL was flushed. That was fine for logical walsenders on the primary; but on the standby the flushed WAL would have been not applied yet, so logical walsenders were awakened too early. Per idea from Jeff Davis and Amit Kapila. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-By: Jeff Davis <pgsql@j-davis.com> Reviewed-By: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Discussion: https://postgr.es/m/CAA4eK1+zO5LUeisabX10c81LU-fWMKO4M9Wyg1cdkbW7Hqh6vQ@mail.gmail.com
1 parent 2666975 commit e101dfa

File tree

7 files changed

+84
-29
lines changed

7 files changed

+84
-29
lines changed

src/backend/access/transam/xlog.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
26452645
END_CRIT_SECTION();
26462646

26472647
/* wake up walsenders now that we've released heavily contended locks */
2648-
WalSndWakeupProcessRequests();
2648+
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
26492649

26502650
/*
26512651
* If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
28162816
END_CRIT_SECTION();
28172817

28182818
/* wake up walsenders now that we've released heavily contended locks */
2819-
WalSndWakeupProcessRequests();
2819+
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
28202820

28212821
/*
28222822
* Great, done. To take some work off the critical path, try to initialize
@@ -5765,7 +5765,7 @@ StartupXLOG(void)
57655765
* If there were cascading standby servers connected to us, nudge any wal
57665766
* sender processes to notice that we've been promoted.
57675767
*/
5768-
WalSndWakeup();
5768+
WalSndWakeup(true, true);
57695769

57705770
/*
57715771
* If this was a promotion, request an (online) checkpoint now. This isn't

src/backend/access/transam/xlogarchive.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
421421
* if we restored something other than a WAL segment, but it does no harm
422422
* either.
423423
*/
424-
WalSndWakeup();
424+
WalSndWakeup(true, false);
425425
}
426426

427427
/*

src/backend/access/transam/xlogrecovery.c

+28-9
Original file line numberDiff line numberDiff line change
@@ -1935,6 +1935,31 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
19351935
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
19361936
SpinLockRelease(&XLogRecoveryCtl->info_lck);
19371937

1938+
/* ------
1939+
* Wakeup walsenders:
1940+
*
1941+
* On the standby, the WAL is flushed first (which will only wake up
1942+
* physical walsenders) and then applied, which will only wake up logical
1943+
* walsenders.
1944+
*
1945+
* Indeed, logical walsenders on standby can't decode and send data until
1946+
* it's been applied.
1947+
*
1948+
* Physical walsenders don't need to be woken up during replay unless
1949+
* cascading replication is allowed and time line change occurred (so that
1950+
* they can notice that they are on a new time line).
1951+
*
1952+
* That's why the wake up conditions are for:
1953+
*
1954+
* - physical walsenders in case of new time line and cascade
1955+
* replication is allowed
1956+
* - logical walsenders in case cascade replication is allowed (could not
1957+
* be created otherwise)
1958+
* ------
1959+
*/
1960+
if (AllowCascadeReplication())
1961+
WalSndWakeup(switchedTLI, true);
1962+
19381963
/*
19391964
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
19401965
* receiver so that it notices the updated lastReplayedEndRecPtr and sends
@@ -1958,12 +1983,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
19581983
*/
19591984
RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
19601985

1961-
/*
1962-
* Wake up any walsenders to notice that we are on a new timeline.
1963-
*/
1964-
if (AllowCascadeReplication())
1965-
WalSndWakeup();
1966-
19671986
/* Reset the prefetcher. */
19681987
XLogPrefetchReconfigure();
19691988
}
@@ -3050,9 +3069,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
30503069
{
30513070
/*
30523071
* When we find that WAL ends in an incomplete record, keep track
3053-
* of that record. After recovery is done, we'll write a record to
3054-
* indicate to downstream WAL readers that that portion is to be
3055-
* ignored.
3072+
* of that record. After recovery is done, we'll write a record
3073+
* to indicate to downstream WAL readers that that portion is to
3074+
* be ignored.
30563075
*
30573076
* However, when ArchiveRecoveryRequested = true, we're going to
30583077
* switch to a new timeline at the end of recovery. We will only

src/backend/replication/walreceiver.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
10101010
/* Signal the startup process and walsender that new WAL has arrived */
10111011
WakeupRecovery();
10121012
if (AllowCascadeReplication())
1013-
WalSndWakeup();
1013+
WalSndWakeup(true, false);
10141014

10151015
/* Report XLOG streaming progress in PS display */
10161016
if (update_process_title)

src/backend/replication/walsender.c

+37-4
Original file line numberDiff line numberDiff line change
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
26032603
walsnd->sync_standby_priority = 0;
26042604
walsnd->latch = &MyProc->procLatch;
26052605
walsnd->replyTime = 0;
2606+
2607+
/*
2608+
* The kind assignment is done here and not in StartReplication()
2609+
* and StartLogicalReplication(). Indeed, the logical walsender
2610+
* needs to read WAL records (like snapshot of running
2611+
* transactions) during the slot creation. So it needs to be woken
2612+
* up based on its kind.
2613+
*
2614+
* The kind assignment could also be done in StartReplication(),
2615+
* StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2616+
* seems better to set it on one place.
2617+
*/
2618+
if (MyDatabaseId == InvalidOid)
2619+
walsnd->kind = REPLICATION_KIND_PHYSICAL;
2620+
else
2621+
walsnd->kind = REPLICATION_KIND_LOGICAL;
2622+
26062623
SpinLockRelease(&walsnd->mutex);
26072624
/* don't need the lock anymore */
26082625
MyWalSnd = (WalSnd *) walsnd;
@@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
32803297
}
32813298

32823299
/*
3283-
* Wake up all walsenders
3300+
* Wake up physical, logical or both kinds of walsenders
3301+
*
3302+
* The distinction between physical and logical walsenders is done, because:
3303+
* - physical walsenders can't send data until it's been flushed
3304+
* - logical walsenders on standby can't decode and send data until it's been
3305+
* applied
3306+
*
3307+
* For cascading replication we need to wake up physical walsenders separately
3308+
* from logical walsenders (see the comment before calling WalSndWakeup() in
3309+
* ApplyWalRecord() for more details).
32843310
*
32853311
* This will be called inside critical sections, so throwing an error is not
32863312
* advisable.
32873313
*/
32883314
void
3289-
WalSndWakeup(void)
3315+
WalSndWakeup(bool physical, bool logical)
32903316
{
32913317
int i;
32923318

32933319
for (i = 0; i < max_wal_senders; i++)
32943320
{
32953321
Latch *latch;
3322+
ReplicationKind kind;
32963323
WalSnd *walsnd = &WalSndCtl->walsnds[i];
32973324

32983325
/*
32993326
* Get latch pointer with spinlock held, for the unlikely case that
3300-
* pointer reads aren't atomic (as they're 8 bytes).
3327+
* pointer reads aren't atomic (as they're 8 bytes). While at it, also
3328+
* get kind.
33013329
*/
33023330
SpinLockAcquire(&walsnd->mutex);
33033331
latch = walsnd->latch;
3332+
kind = walsnd->kind;
33043333
SpinLockRelease(&walsnd->mutex);
33053334

3306-
if (latch != NULL)
3335+
if (latch == NULL)
3336+
continue;
3337+
3338+
if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
3339+
(logical && kind == REPLICATION_KIND_LOGICAL))
33073340
SetLatch(latch);
33083341
}
33093342
}

src/include/replication/walsender.h

+11-11
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
4242
extern void WalSndSignals(void);
4343
extern Size WalSndShmemSize(void);
4444
extern void WalSndShmemInit(void);
45-
extern void WalSndWakeup(void);
45+
extern void WalSndWakeup(bool physical, bool logical);
4646
extern void WalSndInitStopping(void);
4747
extern void WalSndWaitStopping(void);
4848
extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
6060
/*
6161
* wakeup walsenders if there is work to be done
6262
*/
63-
#define WalSndWakeupProcessRequests() \
64-
do \
65-
{ \
66-
if (wake_wal_senders) \
67-
{ \
68-
wake_wal_senders = false; \
69-
if (max_wal_senders > 0) \
70-
WalSndWakeup(); \
71-
} \
72-
} while (0)
63+
static inline void
64+
WalSndWakeupProcessRequests(bool physical, bool logical)
65+
{
66+
if (wake_wal_senders)
67+
{
68+
wake_wal_senders = false;
69+
if (max_wal_senders > 0)
70+
WalSndWakeup(physical, logical);
71+
}
72+
}
7373

7474
#endif /* _WALSENDER_H */

src/include/replication/walsender_private.h

+3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "access/xlog.h"
1616
#include "lib/ilist.h"
1717
#include "nodes/nodes.h"
18+
#include "nodes/replnodes.h"
1819
#include "replication/syncrep.h"
1920
#include "storage/latch.h"
2021
#include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
7980
* Timestamp of the last message received from standby.
8081
*/
8182
TimestampTz replyTime;
83+
84+
ReplicationKind kind;
8285
} WalSnd;
8386

8487
extern PGDLLIMPORT WalSnd *MyWalSnd;

0 commit comments

Comments
 (0)