Skip to content

Commit 62d662f

Browse files
committed
DRY code for RecoverPreparedFromFiles and RecoverPreparedFromXLOG
1 parent 2c7c465 commit 62d662f

File tree

3 files changed

+165
-224
lines changed

3 files changed

+165
-224
lines changed

src/backend/access/transam/twophase.c

Lines changed: 89 additions & 221 deletions
Original file line numberDiff line numberDiff line change
@@ -98,64 +98,6 @@
9898
/* GUC variable, can't be changed after startup */
9999
int max_prepared_xacts = 0;
100100

101-
/*
102-
* This struct describes one global transaction that is in prepared state
103-
* or attempting to become prepared.
104-
*
105-
* The lifecycle of a global transaction is:
106-
*
107-
* 1. After checking that the requested GID is not in use, set up an entry in
108-
* the TwoPhaseState->prepXacts array with the correct GID and valid = false,
109-
* and mark it as locked by my backend.
110-
*
111-
* 2. After successfully completing prepare, set valid = true and enter the
112-
* referenced PGPROC into the global ProcArray.
113-
*
114-
* 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
115-
* valid and not locked, then mark the entry as locked by storing my current
116-
* backend ID into locking_backend. This prevents concurrent attempts to
117-
* commit or rollback the same prepared xact.
118-
*
119-
* 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
120-
* from the ProcArray and the TwoPhaseState->prepXacts array and return it to
121-
* the freelist.
122-
*
123-
* Note that if the preparing transaction fails between steps 1 and 2, the
124-
* entry must be removed so that the GID and the GlobalTransaction struct
125-
* can be reused. See AtAbort_Twophase().
126-
*
127-
* typedef struct GlobalTransactionData *GlobalTransaction appears in
128-
* twophase.h
129-
*
130-
* Note that the max value of GIDSIZE must fit in the uint16 gidlen,
131-
* specified in TwoPhaseFileHeader.
132-
*/
133-
#define GIDSIZE 200
134-
135-
typedef struct GlobalTransactionData
136-
{
137-
GlobalTransaction next; /* list link for free list */
138-
int pgprocno; /* ID of associated dummy PGPROC */
139-
BackendId dummyBackendId; /* similar to backend id for backends */
140-
TimestampTz prepared_at; /* time of preparation */
141-
142-
/*
143-
* Note that we need to keep track of two LSNs for each GXACT.
144-
* We keep track of the start LSN because this is the address we must
145-
* use to read state data back from WAL when committing a prepared GXACT.
146-
* We keep track of the end LSN because that is the LSN we need to wait
147-
* for prior to commit.
148-
*/
149-
XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
150-
XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
151-
152-
Oid owner; /* ID of user that executed the xact */
153-
BackendId locking_backend; /* backend currently working on the xact */
154-
bool valid; /* TRUE if PGPROC entry is in proc array */
155-
bool ondisk; /* TRUE if prepare state file is on disk */
156-
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
157-
} GlobalTransactionData;
158-
159101
/*
160102
* Two Phase Commit shared state. Access to this struct is protected
161103
* by TwoPhaseStateLock.
@@ -487,7 +429,7 @@ GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
487429
* MarkAsPrepared
488430
* Mark the GXACT as fully valid, and enter it into the global ProcArray.
489431
*/
490-
static void
432+
void
491433
MarkAsPrepared(GlobalTransaction gxact)
492434
{
493435
/* Lock here may be overkill, but I'm not convinced of that ... */
@@ -1967,6 +1909,93 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
19671909
return result;
19681910
}
19691911

1912+
/*
1913+
* RecoverPreparedFromBuffer
1914+
*
1915+
* Parse data in given buffer (that can be a pointer to WAL record or file)
1916+
* and load shared-memory state for that prepared transaction.
1917+
*
1918+
* It's caller responsibility to call MarkAsPrepared() on returned gxact.
1919+
*
1920+
*/
1921+
GlobalTransaction
1922+
RecoverPreparedFromBuffer(char *buf, bool forceOverwriteOK)
1923+
{
1924+
char *bufptr;
1925+
const char *gid;
1926+
TransactionId *subxids;
1927+
bool overwriteOK = false;
1928+
int i;
1929+
GlobalTransaction gxact;
1930+
TwoPhaseFileHeader *hdr;
1931+
1932+
/* Deconstruct header */
1933+
hdr = (TwoPhaseFileHeader *) buf;
1934+
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1935+
gid = (const char *) bufptr;
1936+
bufptr += MAXALIGN(hdr->gidlen);
1937+
subxids = (TransactionId *) bufptr;
1938+
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1939+
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1940+
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1941+
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1942+
1943+
/*
1944+
* It's possible that SubTransSetParent has been set before, if
1945+
* the prepared transaction generated xid assignment records. Test
1946+
* here must match one used in AssignTransactionId().
1947+
*/
1948+
if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
1949+
XLogLogicalInfoActive()))
1950+
overwriteOK = true;
1951+
1952+
/*
1953+
* Caller can also force overwriteOK.
1954+
*/
1955+
if (forceOverwriteOK)
1956+
overwriteOK = true;
1957+
1958+
/*
1959+
* Reconstruct subtrans state for the transaction --- needed
1960+
* because pg_subtrans is not preserved over a restart. Note that
1961+
* we are linking all the subtransactions directly to the
1962+
* top-level XID; there may originally have been a more complex
1963+
* hierarchy, but there's no need to restore that exactly.
1964+
*/
1965+
for (i = 0; i < hdr->nsubxacts; i++)
1966+
SubTransSetParent(subxids[i], hdr->xid, overwriteOK);
1967+
1968+
/*
1969+
* Recreate its GXACT and dummy PGPROC
1970+
*/
1971+
gxact = MarkAsPreparing(hdr->xid, gid,
1972+
hdr->prepared_at,
1973+
hdr->owner, hdr->database);
1974+
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
1975+
1976+
/*
1977+
* Recover other state (notably locks) using resource managers
1978+
*/
1979+
ProcessRecords(bufptr, hdr->xid, twophase_recover_callbacks);
1980+
1981+
/*
1982+
* Release locks held by the standby process after we process each
1983+
* prepared transaction. As a result, we don't need too many
1984+
* additional locks at any one time.
1985+
*/
1986+
if (InHotStandby)
1987+
StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids);
1988+
1989+
/*
1990+
* We're done with recovering this transaction. Clear
1991+
* MyLockedGxact, like we do in PrepareTransaction() during normal
1992+
* operation.
1993+
*/
1994+
PostPrepare_Twophase();
1995+
1996+
return gxact;
1997+
}
1998+
19701999
/*
19712000
* RecoverPreparedFromFiles
19722001
*
@@ -1980,7 +2009,6 @@ RecoverPreparedFromFiles(bool forceOverwriteOK)
19802009
char dir[MAXPGPATH];
19812010
DIR *cldir;
19822011
struct dirent *clde;
1983-
bool overwriteOK = false;
19842012

19852013
snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
19862014

@@ -1992,11 +2020,7 @@ RecoverPreparedFromFiles(bool forceOverwriteOK)
19922020
{
19932021
TransactionId xid;
19942022
char *buf;
1995-
char *bufptr;
1996-
TwoPhaseFileHeader *hdr;
1997-
TransactionId *subxids;
19982023
GlobalTransaction gxact;
1999-
const char *gid;
20002024
int i;
20012025
PGXACT *pgxact;
20022026

@@ -2041,73 +2065,10 @@ RecoverPreparedFromFiles(bool forceOverwriteOK)
20412065
ereport(LOG,
20422066
(errmsg("recovering prepared transaction %u", xid)));
20432067

2044-
/* Deconstruct header */
2045-
hdr = (TwoPhaseFileHeader *) buf;
2046-
Assert(TransactionIdEquals(hdr->xid, xid));
2047-
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2048-
gid = (const char *) bufptr;
2049-
bufptr += MAXALIGN(hdr->gidlen);
2050-
subxids = (TransactionId *) bufptr;
2051-
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2052-
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2053-
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2054-
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2055-
2056-
/*
2057-
* It's possible that SubTransSetParent has been set before, if
2058-
* the prepared transaction generated xid assignment records. Test
2059-
* here must match one used in AssignTransactionId().
2060-
*/
2061-
if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
2062-
XLogLogicalInfoActive()))
2063-
overwriteOK = true;
2064-
2065-
/*
2066-
* Caller can also force overwriteOK.
2067-
*/
2068-
if (forceOverwriteOK)
2069-
overwriteOK = true;
2070-
2071-
/*
2072-
* Reconstruct subtrans state for the transaction --- needed
2073-
* because pg_subtrans is not preserved over a restart. Note that
2074-
* we are linking all the subtransactions directly to the
2075-
* top-level XID; there may originally have been a more complex
2076-
* hierarchy, but there's no need to restore that exactly.
2077-
*/
2078-
for (i = 0; i < hdr->nsubxacts; i++)
2079-
SubTransSetParent(subxids[i], xid, overwriteOK);
2080-
2081-
/*
2082-
* Recreate its GXACT and dummy PGPROC
2083-
*/
2084-
gxact = MarkAsPreparing(xid, gid,
2085-
hdr->prepared_at,
2086-
hdr->owner, hdr->database);
2068+
gxact = RecoverPreparedFromBuffer(buf, forceOverwriteOK);
20872069
gxact->ondisk = true;
2088-
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
20892070
MarkAsPrepared(gxact);
20902071

2091-
/*
2092-
* Recover other state (notably locks) using resource managers
2093-
*/
2094-
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2095-
2096-
/*
2097-
* Release locks held by the standby process after we process each
2098-
* prepared transaction. As a result, we don't need too many
2099-
* additional locks at any one time.
2100-
*/
2101-
if (InHotStandby)
2102-
StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2103-
2104-
/*
2105-
* We're done with recovering this transaction. Clear
2106-
* MyLockedGxact, like we do in PrepareTransaction() during normal
2107-
* operation.
2108-
*/
2109-
PostPrepare_Twophase();
2110-
21112072
pfree(buf);
21122073
}
21132074

@@ -2119,99 +2080,6 @@ RecoverPreparedFromFiles(bool forceOverwriteOK)
21192080
}
21202081

21212082

2122-
/*
2123-
* RecoverPreparedFromXLOG
2124-
*
2125-
* To avoid creation of state files during replay we registering
2126-
* prepare xlog records in shared memory in the same way as it happens
2127-
* while not in recovery. If replay faces commit xlog record before
2128-
* checkpoint/restartpoint happens then we avoid using files at all.
2129-
*
2130-
* We need this behaviour because the speed of the 2PC replay on the replica
2131-
* should be at least the same as the 2PC transaction speed of the master.
2132-
*/
2133-
void
2134-
RecoverPreparedFromXLOG(XLogReaderState *record)
2135-
{
2136-
bool overwriteOK = false;
2137-
TransactionId xid = XLogRecGetXid(record);
2138-
char *buf = (char *) XLogRecGetData(record);
2139-
char *bufptr;
2140-
const char *gid;
2141-
TwoPhaseFileHeader *hdr;
2142-
TransactionId *subxids;
2143-
GlobalTransaction gxact;
2144-
int i;
2145-
2146-
/* Deconstruct header */
2147-
hdr = (TwoPhaseFileHeader *) buf;
2148-
Assert(TransactionIdEquals(hdr->xid, xid));
2149-
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2150-
gid = (const char *) bufptr;
2151-
bufptr += MAXALIGN(hdr->gidlen);
2152-
subxids = (TransactionId *) bufptr;
2153-
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2154-
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2155-
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2156-
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2157-
2158-
/*
2159-
* It's possible that SubTransSetParent has been set before, if
2160-
* the prepared transaction generated xid assignment records. Test
2161-
* here must match one used in AssignTransactionId().
2162-
*/
2163-
if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
2164-
XLogLogicalInfoActive()))
2165-
overwriteOK = true;
2166-
2167-
/*
2168-
* Reconstruct subtrans state for the transaction --- needed
2169-
* because pg_subtrans is not preserved over a restart. Note that
2170-
* we are linking all the subtransactions directly to the
2171-
* top-level XID; there may originally have been a more complex
2172-
* hierarchy, but there's no need to restore that exactly.
2173-
*/
2174-
for (i = 0; i < hdr->nsubxacts; i++)
2175-
SubTransSetParent(subxids[i], xid, overwriteOK);
2176-
2177-
/*
2178-
* Recreate its GXACT and dummy PGPROC
2179-
*
2180-
* MarkAsPreparing sets prepare_start_lsn to InvalidXLogRecPtr
2181-
* so next checkpoint will skip that transaction.
2182-
*/
2183-
gxact = MarkAsPreparing(xid, gid,
2184-
hdr->prepared_at,
2185-
hdr->owner, hdr->database);
2186-
GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2187-
MarkAsPrepared(gxact);
2188-
2189-
gxact->prepare_start_lsn = record->ReadRecPtr;
2190-
gxact->prepare_end_lsn = record->EndRecPtr;
2191-
2192-
/*
2193-
* Recover other state (notably locks) using resource managers
2194-
*/
2195-
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2196-
2197-
/*
2198-
* Release locks held by the standby process after we process each
2199-
* prepared transaction. As a result, we don't need too many
2200-
* additional locks at any one time.
2201-
*/
2202-
if (InHotStandby)
2203-
StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2204-
2205-
/*
2206-
* We're done with recovering this transaction. Clear
2207-
* MyLockedGxact, like we do in PrepareTransaction() during normal
2208-
* operation.
2209-
*/
2210-
PostPrepare_Twophase();
2211-
}
2212-
2213-
2214-
22152083
/*
22162084
* RecordTransactionCommitPrepared
22172085
*

src/backend/access/transam/xact.c

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5599,7 +5599,21 @@ xact_redo(XLogReaderState *record)
55995599
}
56005600
else if (info == XLOG_XACT_PREPARE)
56015601
{
5602-
RecoverPreparedFromXLOG(record);
5602+
GlobalTransaction gxact;
5603+
5604+
/*
5605+
* To avoid creation of state files during replay we registering
5606+
* prepare xlog records in shared memory in the same way as it happens
5607+
* while not in recovery. If replay faces commit xlog record before
5608+
* checkpoint/restartpoint happens then we avoid using files at all.
5609+
*
5610+
* We need this behaviour because the speed of the 2PC replay on the replica
5611+
* should be at least the same as the 2PC transaction speed of the master.
5612+
*/
5613+
gxact = RecoverPreparedFromBuffer((char *) XLogRecGetData(record), false);
5614+
gxact->prepare_start_lsn = record->ReadRecPtr;
5615+
gxact->prepare_end_lsn = record->EndRecPtr;
5616+
MarkAsPrepared(gxact);
56035617
}
56045618
else if (info == XLOG_XACT_ASSIGNMENT)
56055619
{

0 commit comments

Comments
 (0)