Skip to content

Commit 4d63969

Browse files
committed
WIP. Recover 2pc state during WAL record replay
1 parent 070e12f commit 4d63969

File tree

4 files changed

+220
-32
lines changed

4 files changed

+220
-32
lines changed

reinit.sh

Lines changed: 94 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,101 @@
11
#!/bin/sh
2-
pkill -9 postgres
3-
make install
4-
rm -rf install/data
5-
rm -rf install/data_slave
6-
rm -rf /cores/*
7-
8-
9-
# Node master
10-
./install/bin/initdb -D ./install/data
11-
echo "max_prepared_transactions = 100" >> ./install/data/postgresql.conf
12-
echo "shared_buffers = 512MB" >> ./install/data/postgresql.conf
13-
echo "fsync = off" >> ./install/data/postgresql.conf
14-
echo "log_checkpoints = on" >> ./install/data/postgresql.conf
15-
echo "max_wal_size = 48MB" >> ./install/data/postgresql.conf
16-
echo "min_wal_size = 32MB" >> ./install/data/postgresql.conf
17-
echo "wal_level = hot_standby" >> ./install/data/postgresql.conf
18-
echo "wal_keep_segments = 64" >> ./install/data/postgresql.conf
19-
echo "max_wal_senders = 2" >> ./install/data/postgresql.conf
20-
echo "max_replication_slots = 2" >> ./install/data/postgresql.conf
21-
22-
echo '' > ./install/data/logfile
23-
echo 'local replication stas trust' >> ./install/data/pg_hba.conf
242

3+
reinit_master() {
4+
rm -rf install/data
5+
6+
./install/bin/initdb -D ./install/data
7+
8+
echo "max_prepared_transactions = 100" >> ./install/data/postgresql.conf
9+
echo "shared_buffers = 512MB" >> ./install/data/postgresql.conf
10+
echo "fsync = off" >> ./install/data/postgresql.conf
11+
echo "log_checkpoints = on" >> ./install/data/postgresql.conf
12+
echo "max_wal_size = 48MB" >> ./install/data/postgresql.conf
13+
echo "min_wal_size = 32MB" >> ./install/data/postgresql.conf
14+
echo "wal_level = hot_standby" >> ./install/data/postgresql.conf
15+
echo "wal_keep_segments = 64" >> ./install/data/postgresql.conf
16+
echo "max_wal_senders = 2" >> ./install/data/postgresql.conf
17+
echo "max_replication_slots = 2" >> ./install/data/postgresql.conf
18+
19+
echo '' > ./install/data/logfile
20+
21+
echo 'local replication stas trust' >> ./install/data/pg_hba.conf
22+
23+
./install/bin/pg_ctl -w -D ./install/data -l ./install/data/logfile start
24+
./install/bin/createdb stas
25+
./install/bin/psql -c "create table t(id int);"
26+
}
27+
28+
reinit_slave() {
29+
rm -rf install/data_slave
30+
31+
./install/bin/pg_basebackup -D ./install/data_slave/ -R
32+
33+
echo "port = 5433" >> ./install/data_slave/postgresql.conf
34+
echo "hot_standby = on" >> ./install/data_slave/postgresql.conf
35+
36+
echo '' > ./install/data_slave/logfile
37+
38+
./install/bin/pg_ctl -w -D ./install/data_slave -l ./install/data_slave/logfile start
39+
}
40+
41+
make install > /dev/null
42+
43+
cat <<MSG
44+
###############################################################################
45+
# Check that we can commit after hard restart
46+
###############################################################################
47+
MSG
48+
49+
pkill -9 postgres
50+
reinit_master >> /dev/null
51+
psql <<SQL
52+
begin;
53+
insert into t values (42);
54+
prepare transaction 'x';
55+
SQL
56+
pkill -9 postgres
2557
./install/bin/pg_ctl -w -D ./install/data -l ./install/data/logfile start
26-
./install/bin/createdb stas
27-
./install/bin/psql -c "create table t(id int);"
58+
psql <<SQL
59+
commit prepared 'x';
60+
SQL
61+
62+
63+
64+
# cat <<MSG
65+
# ###############################################################################
66+
# # Check that we can commit after soft restart
67+
# ###############################################################################
68+
# MSG
69+
70+
# pkill -9 postgres
71+
# reinit_master >> /dev/null
72+
# psql <<SQL
73+
# begin;
74+
# insert into t values (42);
75+
# prepare transaction 'x';
76+
# SQL
77+
# ./install/bin/pg_ctl -w -D ./install/data -l ./install/data/logfile restart
78+
# psql <<SQL
79+
# commit prepared 'x';
80+
# SQL
81+
82+
83+
84+
85+
86+
87+
88+
89+
90+
91+
92+
93+
94+
95+
96+
97+
2898

29-
./install/bin/pg_basebackup -D ./install/data_slave/ -R
3099

31-
# Node slave
32-
echo "port = 5433" >> ./install/data_slave/postgresql.conf
33-
echo "hot_standby = on" >> ./install/data_slave/postgresql.conf
34-
echo '' > ./install/data_slave/logfile
35-
./install/bin/pg_ctl -w -D ./install/data_slave -l ./install/data_slave/logfile start
36100

37101

src/backend/access/transam/twophase.c

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,6 +1056,8 @@ EndPrepare(GlobalTransaction gxact)
10561056
/* Store record's start location to read that later on Commit */
10571057
gxact->prepare_start_lsn = ProcLastRecPtr;
10581058

1059+
fprintf(stderr, "=== Prepared tx stored at %lx : %lx \n", gxact->prepare_start_lsn, gxact->prepare_end_lsn);
1060+
10591061
/*
10601062
* Mark the prepared transaction as valid. As soon as xact.c marks
10611063
* MyPgXact as not running our XID (which it will do immediately after
@@ -1495,6 +1497,8 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
14951497
pg_crc32c statefile_crc;
14961498
int fd;
14971499

1500+
fprintf(stderr, "===(%u) RecreateTwoPhaseFile called\n", getpid());
1501+
14981502
/* Recompute CRC */
14991503
INIT_CRC32C(statefile_crc);
15001504
COMP_CRC32C(statefile_crc, content, len);
@@ -1639,6 +1643,8 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
16391643
int nxids = 0;
16401644
int allocsize = 0;
16411645

1646+
fprintf(stderr, "===(%u) PrescanPreparedTransactions called\n", getpid());
1647+
16421648
cldir = AllocateDir(TWOPHASE_DIR);
16431649
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
16441650
{
@@ -1774,6 +1780,8 @@ StandbyRecoverPreparedTransactions(bool overwriteOK)
17741780
DIR *cldir;
17751781
struct dirent *clde;
17761782

1783+
fprintf(stderr, "===(%u) StandbyRecoverPreparedTransactions called\n", getpid());
1784+
17771785
cldir = AllocateDir(TWOPHASE_DIR);
17781786
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
17791787
{
@@ -1854,6 +1862,8 @@ RecoverPreparedTransactions(void)
18541862
struct dirent *clde;
18551863
bool overwriteOK = false;
18561864

1865+
fprintf(stderr, "===(%u) RecoverPreparedTransactions called\n", getpid());
1866+
18571867
snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
18581868

18591869
cldir = AllocateDir(dir);
@@ -1963,6 +1973,112 @@ RecoverPreparedTransactions(void)
19631973
FreeDir(cldir);
19641974
}
19651975

1976+
1977+
void
1978+
RecoverPreparedTransaction(XLogReaderState *record)
1979+
{
1980+
bool overwriteOK = false;
1981+
TransactionId xid = XLogRecGetXid(record);
1982+
char *buf = (char *) XLogRecGetData(record);
1983+
char *bufptr;
1984+
TwoPhaseFileHeader *hdr;
1985+
TransactionId *subxids;
1986+
GlobalTransaction gxact;
1987+
int i;
1988+
1989+
fprintf(stderr, "===(%u) RecoverPreparedTransactioNN called\n", getpid());
1990+
1991+
// /* Already processed? */
1992+
// if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1993+
// {
1994+
// ereport(WARNING,
1995+
// (errmsg("removing stale two-phase state file \"%s\"",
1996+
// clde->d_name)));
1997+
// RemoveTwoPhaseFile(xid, true);
1998+
// continue;
1999+
// }
2000+
2001+
// /* Read and validate file */
2002+
// buf = ReadTwoPhaseFile(xid, true);
2003+
// if (buf == NULL)
2004+
// {
2005+
// ereport(WARNING,
2006+
// (errmsg("removing corrupt two-phase state file \"%s\"",
2007+
// clde->d_name)));
2008+
// RemoveTwoPhaseFile(xid, true);
2009+
// continue;
2010+
// }
2011+
2012+
// ereport(LOG,
2013+
// (errmsg("recovering prepared transaction %u", xid)));
2014+
2015+
/* Deconstruct header */
2016+
hdr = (TwoPhaseFileHeader *) buf;
2017+
Assert(TransactionIdEquals(hdr->xid, xid));
2018+
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2019+
subxids = (TransactionId *) bufptr;
2020+
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2021+
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2022+
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2023+
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2024+
2025+
/*
2026+
* It's possible that SubTransSetParent has been set before, if
2027+
* the prepared transaction generated xid assignment records. Test
2028+
* here must match one used in AssignTransactionId().
2029+
*/
2030+
if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
2031+
XLogLogicalInfoActive()))
2032+
overwriteOK = true;
2033+
2034+
/*
2035+
* Reconstruct subtrans state for the transaction --- needed
2036+
* because pg_subtrans is not preserved over a restart. Note that
2037+
* we are linking all the subtransactions directly to the
2038+
* top-level XID; there may originally have been a more complex
2039+
* hierarchy, but there's no need to restore that exactly.
2040+
*/
2041+
for (i = 0; i < hdr->nsubxacts; i++)
2042+
SubTransSetParent(subxids[i], xid, overwriteOK);
2043+
2044+
/*
2045+
* Recreate its GXACT and dummy PGPROC
2046+
*
2047+
* MarkAsPreparing sets prepare_start_lsn to InvalidXLogRecPtr
2048+
* so next checkpoint will skip that transaction.
2049+
*/
2050+
gxact = MarkAsPreparing(xid, hdr->gid,
2051+
hdr->prepared_at,
2052+
hdr->owner, hdr->database);
2053+
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2054+
MarkAsPrepared(gxact);
2055+
2056+
gxact->prepare_start_lsn = record->ReadRecPtr;
2057+
gxact->prepare_end_lsn = record->EndRecPtr;
2058+
2059+
/*
2060+
* Recover other state (notably locks) using resource managers
2061+
*/
2062+
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2063+
2064+
/*
2065+
* Release locks held by the standby process after we process each
2066+
* prepared transaction. As a result, we don't need too many
2067+
* additional locks at any one time.
2068+
*/
2069+
if (InHotStandby)
2070+
StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2071+
2072+
/*
2073+
* We're done with recovering this transaction. Clear
2074+
* MyLockedGxact, like we do in PrepareTransaction() during normal
2075+
* operation.
2076+
*/
2077+
PostPrepare_Twophase();
2078+
}
2079+
2080+
2081+
19662082
/*
19672083
* RecordTransactionCommitPrepared
19682084
*

src/backend/access/transam/xact.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5593,8 +5593,15 @@ xact_redo(XLogReaderState *record)
55935593
else if (info == XLOG_XACT_PREPARE)
55945594
{
55955595
/* the record contents are exactly the 2PC file */
5596-
RecreateTwoPhaseFile(XLogRecGetXid(record),
5597-
XLogRecGetData(record), XLogRecGetDataLen(record));
5596+
// RecreateTwoPhaseFile(XLogRecGetXid(record),
5597+
// XLogRecGetData(record), XLogRecGetDataLen(record));
5598+
5599+
5600+
// RecoverPreparedTransaction(XLogRecGetXid(record),
5601+
// (char *) XLogRecGetData(record), XLogRecGetDataLen(record));
5602+
fprintf(stderr, "=== Recovering tx %lx : %lx \n", record->ReadRecPtr, record->EndRecPtr);
5603+
5604+
RecoverPreparedTransaction(record);
55985605
}
55995606
else if (info == XLOG_XACT_ASSIGNMENT)
56005607
{

src/include/access/twophase.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
4848
int *nxids_p);
4949
extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
5050
extern void RecoverPreparedTransactions(void);
51+
extern void RecoverPreparedTransaction(XLogReaderState *record);
5152

5253
extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
5354
extern void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);

0 commit comments

Comments
 (0)