Skip to content

Commit abea380

Browse files
committed
some comments
1 parent 2eca05a commit abea380

File tree

4 files changed

+145
-91
lines changed

4 files changed

+145
-91
lines changed

src/backend/access/transam/twophase.c

Lines changed: 128 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@
4545
* fsynced
4646
* * If COMMIT happens after checkpoint then backend reads state data from
4747
* files
48-
* * In case of crash replay will move data from xlog to files, if that
49-
* hasn't happened before. XXX TODO - move to shmem in replay also
48+
* * Simplified version of the same scenario happens during recovery and
49+
* replication. See comments to KnownPreparedXact structure.
5050
*
5151
*-------------------------------------------------------------------------
5252
*/
@@ -181,6 +181,35 @@ static GlobalTransaction MyLockedGxact = NULL;
181181

182182
static bool twophaseExitRegistered = false;
183183

184+
/*
185+
* During replay and replication KnownPreparedList holds info about active prepared
186+
* transactions that weren't moved to files yet. We will need that info by the end of
187+
* recovery (including promote) to restore memory state of that transactions.
188+
*
189+
* Naive approach here is to move each PREPARE record to disk, fsync it and don't have
190+
* that list at all, but that provokes a lot of unnecessary fsyncs on small files
191+
* causing replica to be slower than master.
192+
*
193+
* Replay of twophase records happens by the following rules:
194+
* * On PREPARE redo KnownPreparedAdd() is called to add that transaction to
195+
* KnownPreparedList and no more actions taken.
196+
* * On checkpoint we iterate through KnownPreparedList, move all prepare
197+
* records that behind redo_horizon to file and deleting items from list.
198+
* * On COMMIT/ABORT we delete file or entry in KnownPreparedList.
199+
* * At the end of recovery we move all known prepared transactions to disk
200+
* to allow RecoverPreparedTransactions/StandbyRecoverPreparedTransactions
201+
* do their work.
202+
*/
203+
typedef struct KnownPreparedXact
204+
{
205+
TransactionId xid;
206+
XLogRecPtr prepare_start_lsn;
207+
XLogRecPtr prepare_end_lsn;
208+
dlist_node list_node;
209+
} KnownPreparedXact;
210+
211+
static dlist_head KnownPreparedList = DLIST_STATIC_INIT(KnownPreparedList);
212+
184213
static void RecordTransactionCommitPrepared(TransactionId xid,
185214
int nchildren,
186215
TransactionId *children,
@@ -200,82 +229,6 @@ static void RemoveGXact(GlobalTransaction gxact);
200229

201230
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
202231

203-
204-
dlist_head StandbyTwoPhaseStateData = DLIST_STATIC_INIT(StandbyTwoPhaseStateData);
205-
206-
typedef struct StandbyPreparedTransaction
207-
{
208-
TransactionId xid;
209-
XLogRecPtr prepare_start_lsn;
210-
XLogRecPtr prepare_end_lsn;
211-
dlist_node list_node;
212-
} StandbyPreparedTransaction;
213-
214-
void
215-
StandbyCheckPointTwoPhase(XLogRecPtr redo_horizon)
216-
{
217-
dlist_mutable_iter miter;
218-
int serialized_xacts = 0;
219-
220-
// Assert(RecoveryInProgress());
221-
222-
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
223-
224-
dlist_foreach_modify(miter, &StandbyTwoPhaseStateData)
225-
{
226-
StandbyPreparedTransaction *xact = dlist_container(StandbyPreparedTransaction,
227-
list_node, miter.cur);
228-
229-
if (redo_horizon == InvalidXLogRecPtr || xact->prepare_end_lsn <= redo_horizon)
230-
{
231-
char *buf;
232-
int len;
233-
234-
XlogReadTwoPhaseData(xact->prepare_start_lsn, &buf, &len);
235-
RecreateTwoPhaseFile(xact->xid, buf, len);
236-
pfree(buf);
237-
dlist_delete(miter.cur);
238-
serialized_xacts++;
239-
}
240-
}
241-
242-
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
243-
244-
if (log_checkpoints && serialized_xacts > 0)
245-
ereport(LOG,
246-
(errmsg_plural("%u two-phase state file was written "
247-
"for long-running prepared transactions",
248-
"%u two-phase state files were written "
249-
"for long-running prepared transactions",
250-
serialized_xacts,
251-
serialized_xacts)));
252-
}
253-
254-
// XXX: rename to remove_standby_state
255-
void
256-
StandbyAtCommit(TransactionId xid)
257-
{
258-
dlist_mutable_iter miter;
259-
260-
Assert(RecoveryInProgress());
261-
262-
dlist_foreach_modify(miter, &StandbyTwoPhaseStateData)
263-
{
264-
StandbyPreparedTransaction *xact = dlist_container(StandbyPreparedTransaction,
265-
list_node, miter.cur);
266-
267-
if (xact->xid == xid)
268-
{
269-
dlist_delete(miter.cur);
270-
return;
271-
}
272-
}
273-
274-
RemoveTwoPhaseFile(xid, false);
275-
}
276-
277-
278-
279232
/*
280233
* Initialization of shared memory
281234
*/
@@ -1729,18 +1682,25 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
17291682
serialized_xacts)));
17301683
}
17311684

1685+
/*
1686+
* KnownPreparedAdd.
1687+
*
1688+
* Store correspondence of start/end lsn and xid in KnownPreparedList.
1689+
* This is called during redo of prepare record to have list of prepared
1690+
* transactions that aren't yet moved to 2PC files by the end of recovery.
1691+
*/
17321692
void
1733-
StandbyAtPrepare(XLogReaderState *record)
1693+
KnownPreparedAdd(XLogReaderState *record)
17341694
{
1735-
StandbyPreparedTransaction *xact;
1695+
KnownPreparedXact *xact;
17361696
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) XLogRecGetData(record);
17371697

1738-
xact = (StandbyPreparedTransaction *) palloc(sizeof(StandbyPreparedTransaction));
1698+
xact = (KnownPreparedXact *) palloc(sizeof(KnownPreparedXact));
17391699
xact->xid = hdr->xid;
17401700
xact->prepare_start_lsn = record->ReadRecPtr;
17411701
xact->prepare_end_lsn = record->EndRecPtr;
17421702

1743-
dlist_push_tail(&StandbyTwoPhaseStateData, &xact->list_node);
1703+
dlist_push_tail(&KnownPreparedList, &xact->list_node);
17441704
}
17451705

17461706
/*
@@ -1781,7 +1741,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
17811741
int nxids = 0;
17821742
int allocsize = 0;
17831743

1784-
StandbyCheckPointTwoPhase(0);
1744+
KnownPreparedRecreateFiles(InvalidXLogRecPtr);
17851745

17861746
cldir = AllocateDir(TWOPHASE_DIR);
17871747
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
@@ -2254,3 +2214,88 @@ RecordTransactionAbortPrepared(TransactionId xid,
22542214
*/
22552215
SyncRepWaitForLSN(recptr, false);
22562216
}
2217+
2218+
/*
2219+
* KnownPreparedRemoveByXid
2220+
*
2221+
* Forget about prepared transaction. Called durind commit/abort.
2222+
*/
2223+
void
2224+
KnownPreparedRemoveByXid(TransactionId xid)
2225+
{
2226+
dlist_mutable_iter miter;
2227+
2228+
Assert(RecoveryInProgress());
2229+
2230+
dlist_foreach_modify(miter, &KnownPreparedList)
2231+
{
2232+
KnownPreparedXact *xact = dlist_container(KnownPreparedXact,
2233+
list_node, miter.cur);
2234+
2235+
if (xact->xid == xid)
2236+
{
2237+
dlist_delete(miter.cur);
2238+
/*
2239+
* Since we found entry in KnownPreparedList we know that file isn't
2240+
* on disk yet and we can end up here.
2241+
*/
2242+
return;
2243+
}
2244+
}
2245+
2246+
/*
2247+
* Here we know that file should be moved to disk. But aborting recovery because
2248+
* of absence of unnecessary file doesn't seems to be a good idea, so call remove
2249+
* with giveWarning=false.
2250+
*/
2251+
RemoveTwoPhaseFile(xid, false);
2252+
}
2253+
2254+
/*
2255+
* KnownPreparedRecreateFiles
2256+
*
2257+
* Moves prepare records from WAL to files. Callend during checkpoint replay
2258+
* or PrescanPreparedTransactions.
2259+
*
2260+
* redo_horizon = InvalidXLogRecPtr indicates that all transactions from
2261+
* KnownPreparedList should be moved to disk.
2262+
*/
2263+
void
2264+
KnownPreparedRecreateFiles(XLogRecPtr redo_horizon)
2265+
{
2266+
dlist_mutable_iter miter;
2267+
int serialized_xacts = 0;
2268+
2269+
Assert(RecoveryInProgress());
2270+
2271+
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
2272+
2273+
dlist_foreach_modify(miter, &KnownPreparedList)
2274+
{
2275+
KnownPreparedXact *xact = dlist_container(KnownPreparedXact,
2276+
list_node, miter.cur);
2277+
2278+
if (xact->prepare_end_lsn <= redo_horizon || redo_horizon == InvalidXLogRecPtr)
2279+
{
2280+
char *buf;
2281+
int len;
2282+
2283+
XlogReadTwoPhaseData(xact->prepare_start_lsn, &buf, &len);
2284+
RecreateTwoPhaseFile(xact->xid, buf, len);
2285+
pfree(buf);
2286+
dlist_delete(miter.cur);
2287+
serialized_xacts++;
2288+
}
2289+
}
2290+
2291+
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
2292+
2293+
if (log_checkpoints && serialized_xacts > 0)
2294+
ereport(LOG,
2295+
(errmsg_plural("%u two-phase state file was written "
2296+
"for long-running prepared transactions",
2297+
"%u two-phase state files were written "
2298+
"for long-running prepared transactions",
2299+
serialized_xacts,
2300+
serialized_xacts)));
2301+
}

src/backend/access/transam/xact.c

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5604,7 +5604,9 @@ xact_redo(XLogReaderState *record)
56045604
Assert(TransactionIdIsValid(parsed.twophase_xid));
56055605
xact_redo_commit(&parsed, parsed.twophase_xid,
56065606
record->EndRecPtr, XLogRecGetOrigin(record));
5607-
StandbyAtCommit(parsed.twophase_xid);
5607+
5608+
/* Delete KnownPrepared entry or 2PC file. */
5609+
KnownPreparedRemoveByXid(parsed.twophase_xid);
56085610
}
56095611
}
56105612
else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
@@ -5624,13 +5626,20 @@ xact_redo(XLogReaderState *record)
56245626
{
56255627
Assert(TransactionIdIsValid(parsed.twophase_xid));
56265628
xact_redo_abort(&parsed, parsed.twophase_xid);
5627-
StandbyAtCommit(parsed.twophase_xid);
5629+
5630+
/* Delete KnownPrepared entry or 2PC file. */
5631+
KnownPreparedRemoveByXid(parsed.twophase_xid);
56285632
}
56295633
}
56305634
else if (info == XLOG_XACT_PREPARE)
56315635
{
5632-
/* the record contents are exactly the 2PC file */
5633-
StandbyAtPrepare(record);
5636+
/*
5637+
* If that transaction will not be commited by the end of recovery then we
5638+
* will need 2PC file (the record contents is exactly the 2PC file) to be able
5639+
* to commit that later.
5640+
* For now store xid and pointers to that record in KnownPreparedList.
5641+
*/
5642+
KnownPreparedAdd(record);
56345643
}
56355644
else if (info == XLOG_XACT_ASSIGNMENT)
56365645
{

src/backend/access/transam/xlog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9547,8 +9547,8 @@ xlog_redo(XLogReaderState *record)
95479547
(errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
95489548
checkPoint.ThisTimeLineID, ThisTimeLineID)));
95499549

9550+
KnownPreparedRecreateFiles(checkPoint.redo);
95509551
RecoveryRestartPoint(&checkPoint);
9551-
StandbyCheckPointTwoPhase(checkPoint.redo);
95529552
}
95539553
else if (info == XLOG_END_OF_RECOVERY)
95549554
{

src/include/access/twophase.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
5757

5858
extern void FinishPreparedTransaction(const char *gid, bool isCommit);
5959

60-
extern void StandbyAtCommit(TransactionId xid);
61-
extern void StandbyAtPrepare(XLogReaderState *record);
62-
extern void StandbyCheckPointTwoPhase(XLogRecPtr redo_horizon);
60+
extern void KnownPreparedAdd(XLogReaderState *record);
61+
extern void KnownPreparedRemoveByXid(TransactionId xid);
62+
extern void KnownPreparedRecreateFiles(XLogRecPtr redo_horizon);
6363

6464
#endif /* TWOPHASE_H */

0 commit comments

Comments
 (0)