47
47
* files
48
48
* * In case of crash replay will move data from xlog to files, if that
49
49
* hasn't happened before. XXX TODO - move to shmem in replay also
50
-
51
50
*
52
51
*-------------------------------------------------------------------------
53
52
*/
79
78
#include "replication/origin.h"
80
79
#include "replication/syncrep.h"
81
80
#include "replication/walsender.h"
82
- #include "replication/logicalfuncs.h"
83
81
#include "storage/fd.h"
84
82
#include "storage/ipc.h"
85
83
#include "storage/predicate.h"
89
87
#include "storage/smgr.h"
90
88
#include "utils/builtins.h"
91
89
#include "utils/memutils.h"
92
- #include "utils/snapmgr.h"
93
90
#include "utils/timestamp.h"
94
91
95
92
@@ -1251,8 +1248,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1251
1248
XLogReaderState * xlogreader ;
1252
1249
char * errormsg ;
1253
1250
1254
- Assert (!RecoveryInProgress ());
1255
-
1256
1251
xlogreader = XLogReaderAllocate (& read_local_xlog_page , NULL );
1257
1252
if (!xlogreader )
1258
1253
ereport (ERROR ,
@@ -1280,7 +1275,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1280
1275
* len = XLogRecGetDataLen (xlogreader );
1281
1276
1282
1277
* buf = palloc (sizeof (char )* XLogRecGetDataLen (xlogreader ));
1283
-
1284
1278
memcpy (* buf , XLogRecGetData (xlogreader ), sizeof (char ) * XLogRecGetDataLen (xlogreader ));
1285
1279
1286
1280
XLogReaderFree (xlogreader );
@@ -1297,14 +1291,12 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
1297
1291
TwoPhaseFileHeader * hdr ;
1298
1292
bool result ;
1299
1293
1300
- fprintf (stderr , "===(%u) StandbyTransactionIdIsPrepared(%u) \n" , getpid (), xid );
1301
-
1302
1294
Assert (TransactionIdIsValid (xid ));
1303
1295
1304
1296
if (max_prepared_xacts <= 0 )
1305
1297
return false; /* nothing to do */
1306
1298
1307
- // check for in-memory tx here too
1299
+ // NB: check for in-memory tx here too
1308
1300
1309
1301
/* Read and validate file */
1310
1302
buf = ReadTwoPhaseFile (xid , false);
@@ -1340,7 +1332,6 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1340
1332
int ndelrels ;
1341
1333
SharedInvalidationMessage * invalmsgs ;
1342
1334
int i ;
1343
- bool file_used = false;
1344
1335
1345
1336
/*
1346
1337
* Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1362,6 +1353,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1362
1353
else
1363
1354
XlogReadTwoPhaseData (gxact -> prepare_start_lsn , & buf , NULL );
1364
1355
1356
+
1365
1357
/*
1366
1358
* Disassemble the header area
1367
1359
*/
@@ -1529,8 +1521,6 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1529
1521
pg_crc32c statefile_crc ;
1530
1522
int fd ;
1531
1523
1532
- fprintf (stderr , "===(%u) RecreateTwoPhaseFile called\n" , getpid ());
1533
-
1534
1524
/* Recompute CRC */
1535
1525
INIT_CRC32C (statefile_crc );
1536
1526
COMP_CRC32C (statefile_crc , content , len );
@@ -1596,9 +1586,10 @@ XlogRedoFinishPrepared(TransactionId xid)
1596
1586
char * bufptr ;
1597
1587
TwoPhaseFileHeader * hdr ;
1598
1588
1599
- fprintf (stderr , "===(%u) XlogRedoCleanupPrepared called for %x\n" , getpid (), xid );
1589
+ // NB: take care about file state file removal
1590
+
1591
+ // NB: put lock there
1600
1592
1601
- /* We can do that without the lock in replay, aren't we? */
1602
1593
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
1603
1594
{
1604
1595
gxact = TwoPhaseState -> prepXacts [i ];
@@ -1761,28 +1752,17 @@ TransactionId
1761
1752
PrescanPreparedTransactions (TransactionId * * xids_p , int * nxids_p )
1762
1753
{
1763
1754
TransactionId origNextXid = ShmemVariableCache -> nextXid ;
1764
-
1765
- /* By this we will take into account xacts restored to memory */
1766
- // TransactionId origNextXid = GetOldestSafeDecodingTransactionId();
1767
-
1768
1755
TransactionId result = origNextXid ;
1769
1756
DIR * cldir ;
1770
1757
struct dirent * clde ;
1771
1758
TransactionId * xids = NULL ;
1772
1759
int nxids = 0 ;
1773
1760
int allocsize = 0 ;
1774
1761
1775
- // !!! take care of TransactionId **xids_p, int *nxids_p
1776
-
1777
- fprintf (stderr , "===(%u) PrescanPreparedTransactions called. result = %u\n" , getpid (), result );
1762
+ // NB: take care of TransactionId **xids_p, int *nxids_p
1778
1763
1779
- /*
1780
- * Since we want to find minimum among prepared xacts we can use that function ignoring
1781
- * KnownAssignedXids.
1782
- * Other option is just to iterate here through the procarray.
1783
- */
1764
+ // NB: just iterate through preparedXacts here
1784
1765
result = GetOldestActiveTransactionId ();
1785
- // fprintf(stderr, "===(%u) PrescanPreparedTransactions called. should be = %u\n", getpid(), GetOldestActiveTransactionId());
1786
1766
1787
1767
cldir = AllocateDir (TWOPHASE_DIR );
1788
1768
while ((clde = ReadDir (cldir , TWOPHASE_DIR )) != NULL )
@@ -1898,95 +1878,9 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1898
1878
* nxids_p = nxids ;
1899
1879
}
1900
1880
1901
- fprintf (stderr , "===(%u) PrescanPreparedTransactions ended. result = %u\n" , getpid (), result );
1902
1881
return result ;
1903
1882
}
1904
1883
1905
- // /*
1906
- // * StandbyRecoverPreparedTransactions
1907
- // *
1908
- // * Scan the pg_twophase directory and setup all the required information to
1909
- // * allow standby queries to treat prepared transactions as still active.
1910
- // * This is never called at the end of recovery - we use
1911
- // * RecoverPreparedTransactions() at that point.
1912
- // *
1913
- // * Currently we simply call SubTransSetParent() for any subxids of prepared
1914
- // * transactions. If overwriteOK is true, it's OK if some XIDs have already
1915
- // * been marked in pg_subtrans.
1916
- // */
1917
- // void
1918
- // StandbyRecoverPreparedTransactions(bool overwriteOK)
1919
- // {
1920
- // DIR *cldir;
1921
- // struct dirent *clde;
1922
-
1923
- // fprintf(stderr, "!===(%u) StandbyRecoverPreparedTransactions called, overwriteOK = %u\n", getpid(), (int)overwriteOK);
1924
-
1925
- // cldir = AllocateDir(TWOPHASE_DIR);
1926
- // while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1927
- // {
1928
- // if (strlen(clde->d_name) == 8 &&
1929
- // strspn(clde->d_name, "0123456789ABCDEF") == 8)
1930
- // {
1931
- // TransactionId xid;
1932
- // char *buf;
1933
- // TwoPhaseFileHeader *hdr;
1934
- // TransactionId *subxids;
1935
- // int i;
1936
-
1937
- // xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1938
-
1939
- // /* Already processed? */
1940
- // if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1941
- // {
1942
- // ereport(WARNING,
1943
- // (errmsg("removing stale two-phase state file \"%s\"",
1944
- // clde->d_name)));
1945
- // RemoveTwoPhaseFile(xid, true);
1946
- // continue;
1947
- // }
1948
-
1949
- // /* Read and validate file */
1950
- // buf = ReadTwoPhaseFile(xid, true);
1951
- // if (buf == NULL)
1952
- // {
1953
- // ereport(WARNING,
1954
- // (errmsg("removing corrupt two-phase state file \"%s\"",
1955
- // clde->d_name)));
1956
- // RemoveTwoPhaseFile(xid, true);
1957
- // continue;
1958
- // }
1959
-
1960
- // /* Deconstruct header */
1961
- // hdr = (TwoPhaseFileHeader *) buf;
1962
- // if (!TransactionIdEquals(hdr->xid, xid))
1963
- // {
1964
- // ereport(WARNING,
1965
- // (errmsg("removing corrupt two-phase state file \"%s\"",
1966
- // clde->d_name)));
1967
- // RemoveTwoPhaseFile(xid, true);
1968
- // pfree(buf);
1969
- // continue;
1970
- // }
1971
-
1972
- // /*
1973
- // * Examine subtransaction XIDs ... they should all follow main
1974
- // * XID.
1975
- // */
1976
- // subxids = (TransactionId *)
1977
- // (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1978
- // for (i = 0; i < hdr->nsubxacts; i++)
1979
- // {
1980
- // TransactionId subxid = subxids[i];
1981
-
1982
- // Assert(TransactionIdFollows(subxid, xid));
1983
- // SubTransSetParent(xid, subxid, overwriteOK);
1984
- // }
1985
- // }
1986
- // }
1987
- // FreeDir(cldir);
1988
- // }
1989
-
1990
1884
/*
1991
1885
* RecoverPreparedFromFiles
1992
1886
*
@@ -2000,9 +1894,8 @@ RecoverPreparedFromFiles(bool overwriteOK)
2000
1894
char dir [MAXPGPATH ];
2001
1895
DIR * cldir ;
2002
1896
struct dirent * clde ;
2003
- // bool overwriteOK = false;
2004
1897
2005
- fprintf ( stderr , "===(%u) RecoverPreparedFromFiles called\n" , getpid ());
1898
+ // NB: look carefully at case overwriteOK=true
2006
1899
2007
1900
snprintf (dir , MAXPGPATH , "%s" , TWOPHASE_DIR );
2008
1901
@@ -2024,24 +1917,16 @@ RecoverPreparedFromFiles(bool overwriteOK)
2024
1917
2025
1918
xid = (TransactionId ) strtoul (clde -> d_name , NULL , 16 );
2026
1919
2027
- // /* Already recovered from WAL? */
2028
- // if (TransactionIdIsInProgress(xid))
2029
- // {
2030
- // fprintf(stderr, "! xid %x is in progress\n", xid);
2031
- // continue;
2032
- // }
1920
+ // NB: put lock here
2033
1921
2034
1922
/* Already recovered from WAL? */
2035
1923
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
2036
1924
{
2037
1925
gxact = TwoPhaseState -> prepXacts [i ];
2038
1926
pgxact = & ProcGlobal -> allPgXact [gxact -> pgprocno ];
2039
-
2040
1927
2041
1928
fprintf (stderr , "! %x ?= %x\n" , xid , pgxact -> xid );
2042
1929
2043
-
2044
-
2045
1930
if (xid == pgxact -> xid )
2046
1931
goto next_file ;
2047
1932
}
@@ -2152,8 +2037,6 @@ RecoverPreparedFromXLOG(XLogReaderState *record)
2152
2037
GlobalTransaction gxact ;
2153
2038
int i ;
2154
2039
2155
- fprintf (stderr , "===(%u) RecoverPreparedFromXLOG called\n" , getpid ());
2156
-
2157
2040
/* Deconstruct header */
2158
2041
hdr = (TwoPhaseFileHeader * ) buf ;
2159
2042
Assert (TransactionIdEquals (hdr -> xid , xid ));
0 commit comments