@@ -121,7 +121,10 @@ typedef struct GlobalTransactionData
121
121
BackendId dummyBackendId ; /* similar to backend id for backends */
122
122
TimestampTz prepared_at ; /* time of preparation */
123
123
XLogRecPtr prepare_lsn ; /* XLOG offset of prepare record end */
124
- XLogRecPtr prepare_xlogptr ; /* XLOG offset of prepare record start */
124
+ XLogRecPtr prepare_xlogptr ; /* XLOG offset of prepare record start
125
+ * or NULL if twophase data moved to file
126
+ * after checkpoint.
127
+ */
125
128
Oid owner ; /* ID of user that executed the xact */
126
129
BackendId locking_backend ; /* backend currently working on the xact */
127
130
bool valid ; /* TRUE if PGPROC entry is in proc array */
@@ -1303,21 +1306,23 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1303
1306
1304
1307
/*
1305
1308
* Read and validate 2PC state data.
1306
- * NB: Here we can face the situation where checkpoint can happend
1307
- * between condition check and xlog read. To prevent that I'm holding
1308
- * delayChkpt. Other possible scenario is try to read xlog and if it fails
1309
- * try to read file.
1309
+ * State data can be stored in xlog or files depending on checkpoint
1310
+ * status. One way to read that data is to delay checkpoint (delayChkpt) and
1311
+ * compare gxact->prepare_lsn with current xlog horizon. But having in mind
1312
+ * that most of 2PC transactions will be commited right after prepare, we
1313
+ * can just try to read xlog and in case of error read file. Also that is
1314
+ * happening under LockGXact, so nobody can commit our transaction between
1315
+ * xlog and file reads.
1310
1316
*/
1311
- MyPgXact -> delayChkpt = true;
1312
- if (gxact -> prepare_lsn <= GetRedoRecPtr ()){
1313
- buf = ReadTwoPhaseFile (xid , true);
1314
- file_used = true;
1317
+ if (gxact -> prepare_lsn )
1318
+ {
1319
+ XlogReadTwoPhaseData (gxact -> prepare_xlogptr , & buf , NULL );
1315
1320
}
1316
1321
else
1317
1322
{
1318
- XlogReadTwoPhaseData (gxact -> prepare_xlogptr , & buf , NULL );
1323
+ buf = ReadTwoPhaseFile (xid , true);
1324
+ file_used = true;
1319
1325
}
1320
- MyPgXact -> delayChkpt = false;
1321
1326
1322
1327
/*
1323
1328
* Disassemble the header area
@@ -1560,24 +1565,35 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
1560
1565
int len ;
1561
1566
char * buf ;
1562
1567
1568
+ fprintf (stderr , "=== Checkpoint: redo_horizon=%lX\n" , redo_horizon );
1569
+
1563
1570
if (max_prepared_xacts <= 0 )
1564
1571
return ; /* nothing to do */
1565
1572
1566
1573
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START ();
1567
1574
1575
+ /*
1576
+ * Here we doing whole I/O while holding TwoPhaseStateLock.
1577
+ * It's also possible to move I/O out of the lock, but on
1578
+ * every error we should check whether somebody commited our
1579
+ * transaction in different backend. Let's leave this optimisation
1580
+ * for future, if somebody will spot that this place cause
1581
+ * bottleneck.
1582
+ *
1583
+ */
1568
1584
LWLockAcquire (TwoPhaseStateLock , LW_SHARED );
1569
-
1570
1585
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
1571
1586
{
1572
1587
GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
1573
1588
PGXACT * pgxact = & ProcGlobal -> allPgXact [gxact -> pgprocno ];
1574
1589
1575
- if (gxact -> valid && gxact -> prepare_lsn <= redo_horizon ){
1590
+ if (gxact -> valid && gxact -> prepare_lsn && gxact -> prepare_lsn <= redo_horizon ){
1576
1591
XlogReadTwoPhaseData (gxact -> prepare_xlogptr , & buf , & len );
1577
1592
RecreateTwoPhaseFile (pgxact -> xid , buf , len );
1593
+ gxact -> prepare_lsn = (XLogRecPtr ) NULL ;
1594
+ pfree (buf );
1578
1595
}
1579
1596
}
1580
-
1581
1597
LWLockRelease (TwoPhaseStateLock );
1582
1598
1583
1599
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE ();
@@ -2094,7 +2110,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
2094
2110
2095
2111
/**********************************************************************************/
2096
2112
2097
- void
2113
+ static void
2098
2114
XlogReadTwoPhaseData (XLogRecPtr lsn , char * * buf , int * len )
2099
2115
{
2100
2116
XLogRecord * record ;
@@ -2106,17 +2122,14 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
2106
2122
elog (ERROR , "failed to open xlogreader for reading 2PC data" );
2107
2123
2108
2124
record = XLogReadRecord (xlogreader , lsn , & errormsg );
2109
-
2110
2125
if (record == NULL )
2111
- elog (ERROR , "failed to find 2PC data in xlog" );
2126
+ elog (ERROR , "failed to read 2PC record from xlog" );
2112
2127
2113
2128
if (len != NULL )
2114
2129
* len = XLogRecGetDataLen (xlogreader );
2115
- else
2116
- elog (ERROR , "failed to read 2PC data from xlog: xore length" );
2117
2130
2118
- * buf = palloc (sizeof (char )* ( * len ));
2119
- memcpy (* buf , XLogRecGetData (xlogreader ), sizeof (char )* ( * len ));
2131
+ * buf = palloc (sizeof (char )* XLogRecGetDataLen ( xlogreader ));
2132
+ memcpy (* buf , XLogRecGetData (xlogreader ), sizeof (char )* XLogRecGetDataLen ( xlogreader ));
2120
2133
2121
2134
XLogReaderFree (xlogreader );
2122
2135
}
0 commit comments