@@ -990,8 +990,6 @@ StartPrepare(GlobalTransaction gxact)
990
990
991
991
save_state_data (& hdr , sizeof (TwoPhaseFileHeader ));
992
992
993
- // fprintf(stderr, "StartPrepare: %s=(%d,%d,%d,%d)\n", hdr.gid, hdr.nsubxacts, hdr.ncommitrels, hdr.nabortrels, hdr.ninvalmsgs);
994
-
995
993
/*
996
994
* Add the additional info about subxacts, deletable files and cache
997
995
* invalidation messages.
@@ -1108,11 +1106,6 @@ EndPrepare(GlobalTransaction gxact)
1108
1106
1109
1107
END_CRIT_SECTION ();
1110
1108
1111
-
1112
- fprintf (stderr , "EndPrepare: %s=(%d,%d,%d,%d,%d)\n" , gxact -> gid , hdr -> xid , hdr -> nsubxacts , hdr -> ncommitrels , hdr -> nabortrels , hdr -> ninvalmsgs );
1113
- fprintf (stderr , "EndPrepare: %s={xlogptr:%lX,lsn:%lX,delta:%lX}\n" , gxact -> gid , gxact -> prepare_xlogptr , gxact -> prepare_lsn , gxact -> prepare_lsn - gxact -> prepare_xlogptr );
1114
-
1115
-
1116
1109
/*
1117
1110
* Wait for synchronous replication, if required.
1118
1111
*
@@ -1169,7 +1162,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
1169
1162
if (give_warnings )
1170
1163
ereport (WARNING ,
1171
1164
(errcode_for_file_access (),
1172
- errmsg ("ReadTwoPhaseFile: could not open two-phase state file \"%s\": %m" ,
1165
+ errmsg ("could not open two-phase state file \"%s\": %m" ,
1173
1166
path )));
1174
1167
return NULL ;
1175
1168
}
@@ -1297,6 +1290,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1297
1290
int ndelrels ;
1298
1291
SharedInvalidationMessage * invalmsgs ;
1299
1292
int i ;
1293
+ bool file_used = false;
1300
1294
1301
1295
/*
1302
1296
* Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1308,12 +1302,22 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1308
1302
xid = pgxact -> xid ;
1309
1303
1310
1304
/*
1311
- * Read and validate the state file
1305
+ * Read and validate 2PC state data.
1306
+ * NB: Here we can face the situation where checkpoint can happend
1307
+ * between condition check and xlog read. To prevent that I'm holding
1308
+ * delayChkpt. Other possible scenario is try to read xlog and if it fails
1309
+ * try to read file.
1312
1310
*/
1313
- if (gxact -> prepare_lsn <= GetRedoRecPtr ())
1311
+ MyPgXact -> delayChkpt = true;
1312
+ if (gxact -> prepare_lsn <= GetRedoRecPtr ()){
1314
1313
buf = ReadTwoPhaseFile (xid , true);
1314
+ file_used = true;
1315
+ }
1315
1316
else
1317
+ {
1316
1318
XlogReadTwoPhaseData (gxact -> prepare_xlogptr , & buf , NULL );
1319
+ }
1320
+ MyPgXact -> delayChkpt = false;
1317
1321
1318
1322
/*
1319
1323
* Disassemble the header area
@@ -1333,14 +1337,6 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1333
1337
/* compute latestXid among all children */
1334
1338
latestXid = TransactionIdLatest (xid , hdr -> nsubxacts , children );
1335
1339
1336
- fprintf (stderr , "FinishPrepared: %s=(%d,%d,%d,%d,%d)\n" , gxact -> gid , hdr -> xid , hdr -> nsubxacts , hdr -> ncommitrels , hdr -> nabortrels , hdr -> ninvalmsgs );
1337
- fprintf (stderr , "FinishPrepared: %s={xlogptr:%lX,lsn:%lX,delta:%lX}\n" , gxact -> gid , gxact -> prepare_xlogptr , gxact -> prepare_lsn , gxact -> prepare_lsn - gxact -> prepare_xlogptr );
1338
-
1339
- Assert (hdr -> nsubxacts == 0 );
1340
- Assert (hdr -> ncommitrels == 0 );
1341
- Assert (hdr -> nabortrels == 0 );
1342
- Assert (hdr -> ninvalmsgs == 0 );
1343
-
1344
1340
/*
1345
1341
* The order of operations here is critical: make the XLOG entry for
1346
1342
* commit or abort, then mark the transaction committed or aborted in
@@ -1423,12 +1419,13 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1423
1419
/*
1424
1420
* And now we can clean up our mess.
1425
1421
*/
1426
- // RemoveTwoPhaseFile(xid, true);
1422
+ if (file_used )
1423
+ RemoveTwoPhaseFile (xid , true);
1427
1424
1428
1425
RemoveGXact (gxact );
1429
1426
MyLockedGxact = NULL ;
1430
1427
1431
- // pfree(buf);
1428
+ pfree (buf );
1432
1429
}
1433
1430
1434
1431
/*
@@ -1489,8 +1486,6 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1489
1486
pg_crc32c statefile_crc ;
1490
1487
int fd ;
1491
1488
1492
- fprintf (stderr , "RecreateTwoPhaseFile called xid=%d, len=%d\n" , xid , len );
1493
-
1494
1489
/* Recompute CRC */
1495
1490
INIT_CRC32C (statefile_crc );
1496
1491
COMP_CRC32C (statefile_crc , content , len );
@@ -1561,99 +1556,30 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1561
1556
void
1562
1557
CheckPointTwoPhase (XLogRecPtr redo_horizon )
1563
1558
{
1564
- TransactionId * xids ;
1565
- XLogRecPtr * xlogptrs ;
1566
- int nxids ;
1567
- char path [MAXPGPATH ];
1568
1559
int i ;
1560
+ int len ;
1561
+ char * buf ;
1569
1562
1570
- /*
1571
- * We don't want to hold the TwoPhaseStateLock while doing I/O, so we grab
1572
- * it just long enough to make a list of the XIDs that require fsyncing,
1573
- * and then do the I/O afterwards.
1574
- *
1575
- * This approach creates a race condition: someone else could delete a
1576
- * GXACT between the time we release TwoPhaseStateLock and the time we try
1577
- * to open its state file. We handle this by special-casing ENOENT
1578
- * failures: if we see that, we verify that the GXACT is no longer valid,
1579
- * and if so ignore the failure.
1580
- */
1581
1563
if (max_prepared_xacts <= 0 )
1582
1564
return ; /* nothing to do */
1583
1565
1584
1566
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START ();
1585
1567
1586
- xids = (TransactionId * ) palloc (max_prepared_xacts * sizeof (TransactionId ));
1587
- xlogptrs = (XLogRecPtr * ) palloc (max_prepared_xacts * sizeof (XLogRecPtr ));
1588
- nxids = 0 ;
1589
-
1590
1568
LWLockAcquire (TwoPhaseStateLock , LW_SHARED );
1591
1569
1592
1570
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
1593
1571
{
1594
1572
GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
1595
1573
PGXACT * pgxact = & ProcGlobal -> allPgXact [gxact -> pgprocno ];
1596
- int j ;
1597
1574
1598
1575
if (gxact -> valid && gxact -> prepare_lsn <= redo_horizon ){
1599
- j = nxids ++ ;
1600
- xids [j ] = pgxact -> xid ;
1601
- xlogptrs [j ] = gxact -> prepare_xlogptr ;
1576
+ XlogReadTwoPhaseData (gxact -> prepare_xlogptr , & buf , & len );
1577
+ RecreateTwoPhaseFile (pgxact -> xid , buf , len );
1602
1578
}
1603
-
1604
1579
}
1605
1580
1606
1581
LWLockRelease (TwoPhaseStateLock );
1607
1582
1608
- for (i = 0 ; i < nxids ; i ++ )
1609
- {
1610
- TransactionId xid = xids [i ];
1611
- int fd ;
1612
- int len ;
1613
- char * buf ;
1614
-
1615
- TwoPhaseFilePath (path , xid );
1616
-
1617
- fprintf (stderr , "CheckPointTwoPhase: %lX\n" , xlogptrs [i ]);
1618
-
1619
- fd = OpenTransientFile (path , O_RDWR | PG_BINARY , 0 );
1620
-
1621
- if (fd < 0 && errno == ENOENT )
1622
- {
1623
- fprintf (stderr , "CheckPointTwoPhase: %d <-> %d \n" , errno , ENOENT );
1624
-
1625
- /* OK if gxact is no longer valid */
1626
- if (!TransactionIdIsPrepared (xid ))
1627
- continue ;
1628
-
1629
- /* Re-create file */
1630
- XlogReadTwoPhaseData (xlogptrs [i ], & buf , & len );
1631
- RecreateTwoPhaseFile (xid , buf , len );
1632
- fd = OpenTransientFile (path , O_RDWR | PG_BINARY , 0 );
1633
-
1634
- if (fd < 0 )
1635
- ereport (ERROR ,
1636
- (errcode_for_file_access (),
1637
- errmsg ("CheckPointTwoPhase: could not open two-phase state file after re-creating \"%s\": %m" ,
1638
- path )));
1639
- }
1640
- else if (fd < 0 )
1641
- {
1642
- ereport (ERROR ,
1643
- (errcode_for_file_access (),
1644
- errmsg ("CheckPointTwoPhase: could not open two-phase state file \"%s\": %m" ,
1645
- path )));
1646
- }
1647
-
1648
- if (CloseTransientFile (fd ) != 0 )
1649
- ereport (ERROR ,
1650
- (errcode_for_file_access (),
1651
- errmsg ("could not close two-phase state file \"%s\": %m" ,
1652
- path )));
1653
- }
1654
-
1655
- pfree (xids );
1656
-
1657
1583
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE ();
1658
1584
}
1659
1585
@@ -2177,19 +2103,20 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
2177
2103
2178
2104
xlogreader = XLogReaderAllocate (& logical_read_local_xlog_page , NULL );
2179
2105
if (xlogreader == NULL )
2180
- fprintf ( stderr , "xlogreader == NULL\n " );
2106
+ elog ( ERROR , "failed to open xlogreader for reading 2PC data " );
2181
2107
2182
2108
record = XLogReadRecord (xlogreader , lsn , & errormsg );
2183
2109
2184
2110
if (record == NULL )
2185
- fprintf ( stderr , "XLogReadRecord error\n " );
2111
+ elog ( ERROR , "failed to find 2PC data in xlog " );
2186
2112
2187
2113
if (len != NULL )
2188
2114
* len = XLogRecGetDataLen (xlogreader );
2189
- * buf = XLogRecGetData (xlogreader );
2190
- }
2191
-
2192
-
2193
-
2115
+ else
2116
+ elog (ERROR , "failed to read 2PC data from xlog: xore length" );
2194
2117
2118
+ * buf = palloc (sizeof (char )* (* len ));
2119
+ memcpy (* buf , XLogRecGetData (xlogreader ), sizeof (char )* (* len ));
2195
2120
2121
+ XLogReaderFree (xlogreader );
2122
+ }
0 commit comments