Skip to content

Commit 8a812e5

Browse files
author
Amit Kapila
committedMar 8, 2021
Track replication origin progress for rollbacks.
Commit 1eb6d65 allowed to track replica origin replay progress for 2PC but it was not complete. It misses to properly track the progress for rollback prepared especially it missed updating the code for recovery. Additionally, we need to allow tracking it on subscriber nodes where wal_level might not be logical. It is required to track decoding of 2PC which is committed in PG14 (a271a1b) and also nobody complained about this till now so not backpatching it. Author: Amit Kapila Reviewed-by: Michael Paquier and Ajin Cherian Discussion: https://postgr.es/m/CAA4eK1L-kHmMnSdrRW6UhRbCjR7cgh04c+6psY15qzT6ktcd+g@mail.gmail.com
1 parent f9a0392 commit 8a812e5

File tree

2 files changed

+31
-6
lines changed

2 files changed

+31
-6
lines changed
 

‎src/backend/access/transam/twophase.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2276,6 +2276,14 @@ RecordTransactionAbortPrepared(TransactionId xid,
22762276
const char *gid)
22772277
{
22782278
XLogRecPtr recptr;
2279+
bool replorigin;
2280+
2281+
/*
2282+
* Are we using the replication origins feature? Or, in other words, are
2283+
* we replaying remote actions?
2284+
*/
2285+
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2286+
replorigin_session_origin != DoNotReplicateId);
22792287

22802288
/*
22812289
* Catch the scenario where we aborted partway through
@@ -2298,6 +2306,11 @@ RecordTransactionAbortPrepared(TransactionId xid,
22982306
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
22992307
xid, gid);
23002308

2309+
if (replorigin)
2310+
/* Move LSNs forward for this replication origin */
2311+
replorigin_session_advance(replorigin_session_origin_lsn,
2312+
XactLastRecEnd);
2313+
23012314
/* Always flush, since we're about to remove the 2PC state file */
23022315
XLogFlush(recptr);
23032316

‎src/backend/access/transam/xact.c

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5714,10 +5714,12 @@ XactLogAbortRecord(TimestampTz abort_time,
57145714
xl_dbinfo.tsId = MyDatabaseTableSpace;
57155715
}
57165716

5717-
/* dump transaction origin information only for abort prepared */
5717+
/*
5718+
* Dump transaction origin information only for abort prepared. We need
5719+
* this during recovery to update the replication origin progress.
5720+
*/
57185721
if ((replorigin_session_origin != InvalidRepOriginId) &&
5719-
TransactionIdIsValid(twophase_xid) &&
5720-
XLogLogicalInfoActive())
5722+
TransactionIdIsValid(twophase_xid))
57215723
{
57225724
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
57235725

@@ -5923,7 +5925,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
59235925
* because subtransaction commit is never WAL logged.
59245926
*/
59255927
static void
5926-
xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
5928+
xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
5929+
XLogRecPtr lsn, RepOriginId origin_id)
59275930
{
59285931
TransactionId max_xid;
59295932

@@ -5972,6 +5975,13 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
59725975
StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts);
59735976
}
59745977

5978+
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
5979+
{
5980+
/* recover apply progress */
5981+
replorigin_advance(origin_id, parsed->origin_lsn, lsn,
5982+
false /* backward */ , false /* WAL */ );
5983+
}
5984+
59755985
/* Make sure files supposed to be dropped are dropped */
59765986
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
59775987
}
@@ -6013,15 +6023,17 @@ xact_redo(XLogReaderState *record)
60136023
xl_xact_parsed_abort parsed;
60146024

60156025
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
6016-
xact_redo_abort(&parsed, XLogRecGetXid(record));
6026+
xact_redo_abort(&parsed, XLogRecGetXid(record),
6027+
record->EndRecPtr, XLogRecGetOrigin(record));
60176028
}
60186029
else if (info == XLOG_XACT_ABORT_PREPARED)
60196030
{
60206031
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
60216032
xl_xact_parsed_abort parsed;
60226033

60236034
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
6024-
xact_redo_abort(&parsed, parsed.twophase_xid);
6035+
xact_redo_abort(&parsed, parsed.twophase_xid,
6036+
record->EndRecPtr, XLogRecGetOrigin(record));
60256037

60266038
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
60276039
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);

0 commit comments

Comments
 (0)