Skip to content

Commit ef2c90a

Browse files
committed
gracefully release locks in 2pc commit while replay
1 parent b91a28c commit ef2c90a

File tree

5 files changed

+133
-3
lines changed

5 files changed

+133
-3
lines changed

reinit.sh

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,58 @@ psql <<SQL
8585
SQL
8686

8787

88+
cat <<MSG
89+
###############################################################################
90+
# Check that WAL replay will delete memory state for commited 2pc tx.
91+
###############################################################################
92+
MSG
93+
94+
pkill -9 postgres
95+
reinit_master >> /dev/null
96+
psql <<SQL
97+
begin;
98+
insert into t values (42);
99+
prepare transaction 'x';
100+
commit prepared 'x';
101+
SQL
102+
pkill -9 postgres
103+
./install/bin/pg_ctl -w -D ./install/data -l ./install/data/logfile start
104+
# ./install/bin/pg_ctl -w -D ./install/data -l ./install/data/logfile restart
105+
psql <<SQL
106+
begin;
107+
insert into t values (42);
108+
109+
-- This prepare can fail due to 2pc identifier conflict if replay
110+
-- didn't clean proc and gxact on commit.
111+
prepare transaction 'x';
112+
SQL
113+
114+
115+
116+
# cat <<MSG
117+
# ###############################################################################
118+
# # Check that we can commit while running active sync slave and that there is no
119+
# # active prepared transaction on slave after that.
120+
# ###############################################################################
121+
# MSG
122+
123+
# pkill -9 postgres
124+
# reinit_master >> /dev/null
125+
# reinit_slave >> /dev/null
126+
# psql <<SQL
127+
# begin;
128+
# insert into t values (42);
129+
# prepare transaction 'x';
130+
# SQL
131+
# pkill -9 postgres
132+
# ./install/bin/pg_ctl -w -D ./install/data -l ./install/data/logfile start
133+
# psql <<SQL
134+
# commit prepared 'x';
135+
# SQL
136+
# echo "Following list should be empty:"
137+
# psql -tc 'select * from pg_prepared_xacts;' -p 5433
138+
139+
88140

89141

90142

src/backend/access/transam/twophase.c

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1549,6 +1549,78 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
15491549
errmsg("could not close two-phase state file: %m")));
15501550
}
15511551

1552+
/*
1553+
*
1554+
*/
1555+
void
1556+
XlogRedoFinishPrepared(TransactionId xid)
1557+
{
1558+
GlobalTransaction gxact;
1559+
PGPROC *proc;
1560+
PGXACT *pgxact;
1561+
int i;
1562+
bool file_used = false;
1563+
char *buf;
1564+
char *bufptr;
1565+
TwoPhaseFileHeader *hdr;
1566+
1567+
fprintf(stderr, "===(%u) XlogRedoCleanupPrepared called for %x\n", getpid(), xid);
1568+
1569+
/* We can do that without the lock in replay, aren't we? */
1570+
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1571+
{
1572+
gxact = TwoPhaseState->prepXacts[i];
1573+
proc = &ProcGlobal->allProcs[gxact->pgprocno];
1574+
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1575+
1576+
if (xid == pgxact->xid)
1577+
{
1578+
fprintf(stderr, "cleaning memory state for %x\n", xid);
1579+
1580+
if (gxact->prepare_start_lsn)
1581+
{
1582+
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1583+
}
1584+
else
1585+
{
1586+
buf = ReadTwoPhaseFile(xid, true);
1587+
file_used = true;
1588+
}
1589+
1590+
/*
1591+
* Disassemble the header area
1592+
*/
1593+
hdr = (TwoPhaseFileHeader *) buf;
1594+
Assert(TransactionIdEquals(hdr->xid, xid));
1595+
1596+
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1597+
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1598+
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1599+
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1600+
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1601+
1602+
ProcArrayRemove(proc, xid);
1603+
1604+
gxact->valid = false;
1605+
1606+
/* And now do the callbacks */
1607+
if (true)
1608+
ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1609+
else
1610+
ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1611+
1612+
PredicateLockTwoPhaseFinish(xid, true);
1613+
1614+
RemoveGXact(gxact);
1615+
MyLockedGxact = NULL;
1616+
1617+
break;
1618+
}
1619+
}
1620+
}
1621+
1622+
1623+
15521624
/*
15531625
* CheckPointTwoPhase -- handle 2PC component of checkpointing.
15541626
*

src/backend/access/transam/xact.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5568,6 +5568,7 @@ xact_redo(XLogReaderState *record)
55685568
xact_redo_commit(&parsed, parsed.twophase_xid,
55695569
record->EndRecPtr, XLogRecGetOrigin(record));
55705570
RemoveTwoPhaseFile(parsed.twophase_xid, false);
5571+
XlogRedoFinishPrepared(parsed.twophase_xid);
55715572
}
55725573
}
55735574
else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
@@ -5594,12 +5595,12 @@ xact_redo(XLogReaderState *record)
55945595
{
55955596
/* the record contents are exactly the 2PC file */
55965597
// RecreateTwoPhaseFile(XLogRecGetXid(record),
5597-
// XLogRecGetData(record), XLogRecGetDataLen(record));
5598+
// XLogRecGetData(record), XLogRecGetDataLen(record));
55985599

55995600

56005601
// RecoverPreparedTransaction(XLogRecGetXid(record),
56015602
// (char *) XLogRecGetData(record), XLogRecGetDataLen(record));
5602-
fprintf(stderr, "=== Recovering tx %lx : %lx \n", record->ReadRecPtr, record->EndRecPtr);
5603+
// fprintf(stderr, "=== Recovering tx %lx : %lx \n", record->ReadRecPtr, record->EndRecPtr);
56035604

56045605
RecoverPreparedFromXLOG(record);
56055606
}

src/backend/storage/lmgr/lock.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3246,7 +3246,7 @@ PostPrepare_Locks(TransactionId xid)
32463246
if (!hash_update_hash_key(LockMethodProcLockHash,
32473247
(void *) proclock,
32483248
(void *) &proclocktag))
3249-
elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
3249+
elog(PANIC, "(proc %u) duplicate entry found while reassigning a prepared transaction's locks", getpid());
32503250

32513251
/* Re-link into the new proc's proclock list */
32523252
SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
@@ -3683,6 +3683,8 @@ lock_twophase_recover(TransactionId xid, uint16 info,
36833683
LWLock *partitionLock;
36843684
LockMethod lockMethodTable;
36853685

3686+
fprintf(stderr, "=== (%u) lock_twophase_recover\n", getpid());
3687+
36863688
Assert(len == sizeof(TwoPhaseLockRecord));
36873689
locktag = &rec->locktag;
36883690
lockmode = rec->lockmode;

src/include/access/twophase.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "access/xlogdefs.h"
1818
#include "datatype/timestamp.h"
1919
#include "storage/lock.h"
20+
#include "access/xlogreader.h"
2021

2122
/*
2223
* GlobalTransactionData is defined in twophase.c; other places have no
@@ -57,4 +58,6 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
5758

5859
extern void FinishPreparedTransaction(const char *gid, bool isCommit);
5960

61+
extern void XlogRedoFinishPrepared(TransactionId xid);
62+
6063
#endif /* TWOPHASE_H */

0 commit comments

Comments
 (0)