Skip to content

Commit af35e66

Browse files
committed
Fix bugs in cascading replication with recovery_target_timeline='latest'
The cascading replication code assumed that the current RecoveryTargetTLI never changes, but that's not true with recovery_target_timeline='latest'. The obvious upshot of that is that RecoveryTargetTLI in shared memory needs to be protected by a lock. A less obvious consequence is that when a cascading standby is connected, and the standby switches to a new target timeline after scanning the archive, it will continue to stream WAL to the cascading standby, but from a wrong file, ie. the file of the previous timeline. For example, if the standby is currently streaming from the middle of file 000000010000000000000005, and the timeline changes, the standby will continue to stream from that file. However, the WAL on the new timeline is in file 000000020000000000000005, so the standby sends garbage from 000000010000000000000005 to the cascading standby, instead of the correct WAL from file 000000020000000000000005. This also fixes a related bug where a partial WAL segment is restored from the archive and streamed to a cascading standby. The code assumed that when a WAL segment is copied from the archive, it can immediately be fully streamed to a cascading standby. However, if the segment is only partially filled, ie. has the right size, but only N first bytes contain valid WAL, that's not safe. That can happen if a partial WAL segment is manually copied to the archive, or if a partial WAL segment is archived because a server is started up on a new timeline within that segment. The cascading standby will get confused if the WAL it received is not valid, and will get stuck until it's restarted. This patch fixes that problem by not allowing WAL restored from the archive to be streamed to a cascading standby until it's been replayed, and thus validated.
1 parent 3e22659 commit af35e66

File tree

3 files changed

+59
-36
lines changed

3 files changed

+59
-36
lines changed

src/backend/access/transam/xlog.c

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,6 @@ typedef struct XLogCtlData
406406
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
407407
int XLogCacheBlck; /* highest allocated xlog buffer index */
408408
TimeLineID ThisTimeLineID;
409-
TimeLineID RecoveryTargetTLI;
410409

411410
/*
412411
* archiveCleanupCommand is read from recovery.conf but needs to be in
@@ -455,14 +454,14 @@ typedef struct XLogCtlData
455454
XLogRecPtr recoveryLastRecPtr;
456455
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
457456
TimestampTz recoveryLastXTime;
457+
/* current effective recovery target timeline */
458+
TimeLineID RecoveryTargetTLI;
458459

459460
/*
460461
* timestamp of when we started replaying the current chunk of WAL data,
461462
* only relevant for replication or archive recovery
462463
*/
463464
TimestampTz currentChunkStartTime;
464-
/* end of the last record restored from the archive */
465-
XLogRecPtr restoreLastRecPtr;
466465
/* Are we requested to pause recovery? */
467466
bool recoveryPause;
468467

@@ -2880,19 +2879,6 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
28802879
if (reload)
28812880
WalSndRqstFileReload();
28822881

2883-
/*
2884-
* Calculate the end location of the restored WAL file and save it in
2885-
* shmem. It's used as current standby flush position, and cascading
2886-
* walsenders try to send WAL records up to this location.
2887-
*/
2888-
endptr.xlogid = log;
2889-
endptr.xrecoff = seg * XLogSegSize;
2890-
XLByteAdvance(endptr, XLogSegSize);
2891-
2892-
SpinLockAcquire(&xlogctl->info_lck);
2893-
xlogctl->restoreLastRecPtr = endptr;
2894-
SpinLockRelease(&xlogctl->info_lck);
2895-
28962882
/* Signal walsender that new WAL has arrived */
28972883
if (AllowCascadeReplication())
28982884
WalSndWakeup();
@@ -4467,12 +4453,17 @@ rescanLatestTimeLine(void)
44674453
ThisTimeLineID)));
44684454
else
44694455
{
4456+
/* use volatile pointer to prevent code rearrangement */
4457+
volatile XLogCtlData *xlogctl = XLogCtl;
4458+
44704459
/* Switch target */
44714460
recoveryTargetTLI = newtarget;
44724461
list_free(expectedTLIs);
44734462
expectedTLIs = newExpectedTLIs;
44744463

4475-
XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
4464+
SpinLockAcquire(&xlogctl->info_lck);
4465+
xlogctl->RecoveryTargetTLI = recoveryTargetTLI;
4466+
SpinLockRelease(&xlogctl->info_lck);
44764467

44774468
ereport(LOG,
44784469
(errmsg("new target timeline is %u",
@@ -7519,13 +7510,20 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
75197510
}
75207511

75217512
/*
7522-
* GetRecoveryTargetTLI - get the recovery target timeline ID
7513+
* GetRecoveryTargetTLI - get the current recovery target timeline ID
75237514
*/
75247515
TimeLineID
75257516
GetRecoveryTargetTLI(void)
75267517
{
7527-
/* RecoveryTargetTLI doesn't change so we need no lock to copy it */
7528-
return XLogCtl->RecoveryTargetTLI;
7518+
/* use volatile pointer to prevent code rearrangement */
7519+
volatile XLogCtlData *xlogctl = XLogCtl;
7520+
TimeLineID result;
7521+
7522+
SpinLockAcquire(&xlogctl->info_lck);
7523+
result = xlogctl->RecoveryTargetTLI;
7524+
SpinLockRelease(&xlogctl->info_lck);
7525+
7526+
return result;
75297527
}
75307528

75317529
/*
@@ -8321,7 +8319,7 @@ CreateRestartPoint(int flags)
83218319
XLogRecPtr endptr;
83228320

83238321
/* Get the current (or recent) end of xlog */
8324-
endptr = GetStandbyFlushRecPtr();
8322+
endptr = GetStandbyFlushRecPtr(NULL);
83258323

83268324
KeepLogSeg(endptr, &_logId, &_logSeg);
83278325
PrevLogSeg(_logId, _logSeg);
@@ -9837,23 +9835,22 @@ do_pg_abort_backup(void)
98379835
/*
98389836
* Get latest redo apply position.
98399837
*
9840-
* Optionally, returns the end byte position of the last restored
9841-
* WAL segment. Callers not interested in that value may pass
9842-
* NULL for restoreLastRecPtr.
9838+
* Optionally, returns the current recovery target timeline. Callers not
9839+
* interested in that may pass NULL for targetTLI.
98439840
*
98449841
* Exported to allow WALReceiver to read the pointer directly.
98459842
*/
98469843
XLogRecPtr
9847-
GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
9844+
GetXLogReplayRecPtr(TimeLineID *targetTLI)
98489845
{
98499846
/* use volatile pointer to prevent code rearrangement */
98509847
volatile XLogCtlData *xlogctl = XLogCtl;
98519848
XLogRecPtr recptr;
98529849

98539850
SpinLockAcquire(&xlogctl->info_lck);
98549851
recptr = xlogctl->recoveryLastRecPtr;
9855-
if (restoreLastRecPtr)
9856-
*restoreLastRecPtr = xlogctl->restoreLastRecPtr;
9852+
if (targetTLI)
9853+
*targetTLI = xlogctl->RecoveryTargetTLI;
98579854
SpinLockRelease(&xlogctl->info_lck);
98589855

98599856
return recptr;
@@ -9862,21 +9859,23 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
98629859
/*
98639860
* Get current standby flush position, ie, the last WAL position
98649861
* known to be fsync'd to disk in standby.
9862+
*
9863+
* If 'targetTLI' is not NULL, it's set to the current recovery target
9864+
* timeline.
98659865
*/
98669866
XLogRecPtr
9867-
GetStandbyFlushRecPtr(void)
9867+
GetStandbyFlushRecPtr(TimeLineID *targetTLI)
98689868
{
98699869
XLogRecPtr receivePtr;
98709870
XLogRecPtr replayPtr;
9871-
XLogRecPtr restorePtr;
98729871

98739872
receivePtr = GetWalRcvWriteRecPtr(NULL);
9874-
replayPtr = GetXLogReplayRecPtr(&restorePtr);
9873+
replayPtr = GetXLogReplayRecPtr(targetTLI);
98759874

98769875
if (XLByteLT(receivePtr, replayPtr))
9877-
return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
9876+
return replayPtr;
98789877
else
9879-
return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr;
9878+
return receivePtr;
98809879
}
98819880

98829881
/*

src/backend/replication/walsender.c

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ IdentifySystem(void)
299299
GetSystemIdentifier());
300300
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
301301

302-
logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
302+
logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr();
303303

304304
snprintf(xpos, sizeof(xpos), "%X/%X",
305305
logptr.xlogid, logptr.xrecoff);
@@ -1144,7 +1144,31 @@ XLogSend(char *msgbuf, bool *caughtup)
11441144
* subsequently crashes and restarts, slaves must not have applied any WAL
11451145
* that gets lost on the master.
11461146
*/
1147-
SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
1147+
if (am_cascading_walsender)
1148+
{
1149+
TimeLineID currentTargetTLI;
1150+
SendRqstPtr = GetStandbyFlushRecPtr(&currentTargetTLI);
1151+
1152+
/*
1153+
* If the recovery target timeline changed, bail out. It's a bit
1154+
* unfortunate that we have to just disconnect, but there is no way
1155+
* to tell the client that the timeline changed. We also don't know
1156+
* exactly where the switch happened, so we cannot safely try to send
1157+
* up to the switchover point before disconnecting.
1158+
*/
1159+
if (currentTargetTLI != ThisTimeLineID)
1160+
{
1161+
if (!walsender_ready_to_stop)
1162+
ereport(LOG,
1163+
(errmsg("terminating walsender process to force cascaded standby "
1164+
"to update timeline and reconnect")));
1165+
walsender_ready_to_stop = true;
1166+
*caughtup = true;
1167+
return;
1168+
}
1169+
}
1170+
else
1171+
SendRqstPtr = GetFlushRecPtr();
11481172

11491173
/* Quick exit if nothing to do */
11501174
if (XLByteLE(SendRqstPtr, sentPtr))

src/include/access/xlog.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,8 @@ extern bool RecoveryInProgress(void);
286286
extern bool HotStandbyActive(void);
287287
extern bool XLogInsertAllowed(void);
288288
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
289-
extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
290-
extern XLogRecPtr GetStandbyFlushRecPtr(void);
289+
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI);
290+
extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI);
291291
extern XLogRecPtr GetXLogInsertRecPtr(void);
292292
extern XLogRecPtr GetXLogWriteRecPtr(void);
293293
extern bool RecoveryIsPaused(void);

0 commit comments

Comments
 (0)