Skip to content

Commit 6f625d9

Browse files
committed
Do not use StandbyResover anymore. Seems like working PoC now.
1 parent af6be20 commit 6f625d9

File tree

3 files changed

+115
-108
lines changed

3 files changed

+115
-108
lines changed

src/backend/access/transam/twophase.c

Lines changed: 107 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,11 +1264,15 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
12641264
TwoPhaseFileHeader *hdr;
12651265
bool result;
12661266

1267+
fprintf(stderr, "===(%u) StandbyTransactionIdIsPrepared(%u) \n", getpid(), xid);
1268+
12671269
Assert(TransactionIdIsValid(xid));
12681270

12691271
if (max_prepared_xacts <= 0)
12701272
return false; /* nothing to do */
12711273

1274+
// check for in-memory tx here too
1275+
12721276
/* Read and validate file */
12731277
buf = ReadTwoPhaseFile(xid, false);
12741278
if (buf == NULL)
@@ -1848,90 +1852,90 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
18481852
return result;
18491853
}
18501854

1851-
/*
1852-
* StandbyRecoverPreparedTransactions
1853-
*
1854-
* Scan the pg_twophase directory and setup all the required information to
1855-
* allow standby queries to treat prepared transactions as still active.
1856-
* This is never called at the end of recovery - we use
1857-
* RecoverPreparedTransactions() at that point.
1858-
*
1859-
* Currently we simply call SubTransSetParent() for any subxids of prepared
1860-
* transactions. If overwriteOK is true, it's OK if some XIDs have already
1861-
* been marked in pg_subtrans.
1862-
*/
1863-
void
1864-
StandbyRecoverPreparedTransactions(bool overwriteOK)
1865-
{
1866-
DIR *cldir;
1867-
struct dirent *clde;
1868-
1869-
fprintf(stderr, "!===(%u) StandbyRecoverPreparedTransactions called, overwriteOK = %u\n", getpid(), (int)overwriteOK);
1870-
1871-
cldir = AllocateDir(TWOPHASE_DIR);
1872-
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1873-
{
1874-
if (strlen(clde->d_name) == 8 &&
1875-
strspn(clde->d_name, "0123456789ABCDEF") == 8)
1876-
{
1877-
TransactionId xid;
1878-
char *buf;
1879-
TwoPhaseFileHeader *hdr;
1880-
TransactionId *subxids;
1881-
int i;
1882-
1883-
xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1884-
1885-
/* Already processed? */
1886-
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1887-
{
1888-
ereport(WARNING,
1889-
(errmsg("removing stale two-phase state file \"%s\"",
1890-
clde->d_name)));
1891-
RemoveTwoPhaseFile(xid, true);
1892-
continue;
1893-
}
1894-
1895-
/* Read and validate file */
1896-
buf = ReadTwoPhaseFile(xid, true);
1897-
if (buf == NULL)
1898-
{
1899-
ereport(WARNING,
1900-
(errmsg("removing corrupt two-phase state file \"%s\"",
1901-
clde->d_name)));
1902-
RemoveTwoPhaseFile(xid, true);
1903-
continue;
1904-
}
1905-
1906-
/* Deconstruct header */
1907-
hdr = (TwoPhaseFileHeader *) buf;
1908-
if (!TransactionIdEquals(hdr->xid, xid))
1909-
{
1910-
ereport(WARNING,
1911-
(errmsg("removing corrupt two-phase state file \"%s\"",
1912-
clde->d_name)));
1913-
RemoveTwoPhaseFile(xid, true);
1914-
pfree(buf);
1915-
continue;
1916-
}
1917-
1918-
/*
1919-
* Examine subtransaction XIDs ... they should all follow main
1920-
* XID.
1921-
*/
1922-
subxids = (TransactionId *)
1923-
(buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1924-
for (i = 0; i < hdr->nsubxacts; i++)
1925-
{
1926-
TransactionId subxid = subxids[i];
1927-
1928-
Assert(TransactionIdFollows(subxid, xid));
1929-
SubTransSetParent(xid, subxid, overwriteOK);
1930-
}
1931-
}
1932-
}
1933-
FreeDir(cldir);
1934-
}
1855+
// /*
1856+
// * StandbyRecoverPreparedTransactions
1857+
// *
1858+
// * Scan the pg_twophase directory and setup all the required information to
1859+
// * allow standby queries to treat prepared transactions as still active.
1860+
// * This is never called at the end of recovery - we use
1861+
// * RecoverPreparedTransactions() at that point.
1862+
// *
1863+
// * Currently we simply call SubTransSetParent() for any subxids of prepared
1864+
// * transactions. If overwriteOK is true, it's OK if some XIDs have already
1865+
// * been marked in pg_subtrans.
1866+
// */
1867+
// void
1868+
// StandbyRecoverPreparedTransactions(bool overwriteOK)
1869+
// {
1870+
// DIR *cldir;
1871+
// struct dirent *clde;
1872+
1873+
// fprintf(stderr, "!===(%u) StandbyRecoverPreparedTransactions called, overwriteOK = %u\n", getpid(), (int)overwriteOK);
1874+
1875+
// cldir = AllocateDir(TWOPHASE_DIR);
1876+
// while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1877+
// {
1878+
// if (strlen(clde->d_name) == 8 &&
1879+
// strspn(clde->d_name, "0123456789ABCDEF") == 8)
1880+
// {
1881+
// TransactionId xid;
1882+
// char *buf;
1883+
// TwoPhaseFileHeader *hdr;
1884+
// TransactionId *subxids;
1885+
// int i;
1886+
1887+
// xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1888+
1889+
// /* Already processed? */
1890+
// if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1891+
// {
1892+
// ereport(WARNING,
1893+
// (errmsg("removing stale two-phase state file \"%s\"",
1894+
// clde->d_name)));
1895+
// RemoveTwoPhaseFile(xid, true);
1896+
// continue;
1897+
// }
1898+
1899+
// /* Read and validate file */
1900+
// buf = ReadTwoPhaseFile(xid, true);
1901+
// if (buf == NULL)
1902+
// {
1903+
// ereport(WARNING,
1904+
// (errmsg("removing corrupt two-phase state file \"%s\"",
1905+
// clde->d_name)));
1906+
// RemoveTwoPhaseFile(xid, true);
1907+
// continue;
1908+
// }
1909+
1910+
// /* Deconstruct header */
1911+
// hdr = (TwoPhaseFileHeader *) buf;
1912+
// if (!TransactionIdEquals(hdr->xid, xid))
1913+
// {
1914+
// ereport(WARNING,
1915+
// (errmsg("removing corrupt two-phase state file \"%s\"",
1916+
// clde->d_name)));
1917+
// RemoveTwoPhaseFile(xid, true);
1918+
// pfree(buf);
1919+
// continue;
1920+
// }
1921+
1922+
// /*
1923+
// * Examine subtransaction XIDs ... they should all follow main
1924+
// * XID.
1925+
// */
1926+
// subxids = (TransactionId *)
1927+
// (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1928+
// for (i = 0; i < hdr->nsubxacts; i++)
1929+
// {
1930+
// TransactionId subxid = subxids[i];
1931+
1932+
// Assert(TransactionIdFollows(subxid, xid));
1933+
// SubTransSetParent(xid, subxid, overwriteOK);
1934+
// }
1935+
// }
1936+
// }
1937+
// FreeDir(cldir);
1938+
// }
19351939

19361940
/*
19371941
* RecoverPreparedFromFiles
@@ -1941,12 +1945,12 @@ StandbyRecoverPreparedTransactions(bool overwriteOK)
19411945
* startup.
19421946
*/
19431947
void
1944-
RecoverPreparedFromFiles(void)
1948+
RecoverPreparedFromFiles(bool overwriteOK)
19451949
{
19461950
char dir[MAXPGPATH];
19471951
DIR *cldir;
19481952
struct dirent *clde;
1949-
bool overwriteOK = false;
1953+
// bool overwriteOK = false;
19501954

19511955
fprintf(stderr, "===(%u) RecoverPreparedFromFiles called\n", getpid());
19521956

@@ -1965,32 +1969,32 @@ RecoverPreparedFromFiles(void)
19651969
TransactionId *subxids;
19661970
GlobalTransaction gxact;
19671971
int i;
1968-
// PGXACT *pgxact;
1972+
PGXACT *pgxact;
19691973

19701974

19711975
xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
19721976

1973-
/* Already recovered from WAL? */
1974-
if (TransactionIdIsInProgress(xid))
1975-
{
1976-
fprintf(stderr, "! xid %x is in progress\n", xid);
1977-
continue;
1978-
}
1979-
19801977
// /* Already recovered from WAL? */
1981-
// for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1978+
// if (TransactionIdIsInProgress(xid))
19821979
// {
1983-
// gxact = TwoPhaseState->prepXacts[i];
1984-
// pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1980+
// fprintf(stderr, "! xid %x is in progress\n", xid);
1981+
// continue;
1982+
// }
1983+
1984+
/* Already recovered from WAL? */
1985+
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1986+
{
1987+
gxact = TwoPhaseState->prepXacts[i];
1988+
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
19851989

19861990

1987-
// fprintf(stderr, "! %x ?= %x\n", xid, pgxact->xid);
1991+
fprintf(stderr, "! %x ?= %x\n", xid, pgxact->xid);
19881992

19891993

19901994

1991-
// if (xid == pgxact->xid)
1992-
// goto next_file;
1993-
// }
1995+
if (xid == pgxact->xid)
1996+
goto next_file;
1997+
}
19941998

19951999
/* Already processed? */
19962000
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
@@ -2080,8 +2084,8 @@ RecoverPreparedFromFiles(void)
20802084
pfree(buf);
20812085
}
20822086

2083-
// next_file:
2084-
// continue;
2087+
next_file:
2088+
continue;
20852089

20862090
}
20872091
FreeDir(cldir);

src/backend/access/transam/xlog.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6614,7 +6614,7 @@ StartupXLOG(void)
66146614

66156615
ProcArrayApplyRecoveryInfo(&running);
66166616

6617-
StandbyRecoverPreparedTransactions(false);
6617+
RecoverPreparedFromFiles(false);
66186618
}
66196619
}
66206620

@@ -7361,7 +7361,7 @@ StartupXLOG(void)
73617361
TrimMultiXact();
73627362

73637363
/* Reload shared-memory state for prepared transactions */
7364-
RecoverPreparedFromFiles();
7364+
RecoverPreparedFromFiles(false);
73657365

73667366
/*
73677367
* Shutdown the recovery environment. This must occur after
@@ -9284,7 +9284,10 @@ xlog_redo(XLogReaderState *record)
92849284

92859285
ProcArrayApplyRecoveryInfo(&running);
92869286

9287-
StandbyRecoverPreparedTransactions(true);
9287+
/*
9288+
* NB: is still okay to call Recover with overwriteOK = true?
9289+
*/
9290+
RecoverPreparedFromFiles(true);
92889291
}
92899292

92909293
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */

src/include/access/twophase.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
4747

4848
extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
4949
int *nxids_p);
50-
extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
51-
extern void RecoverPreparedFromFiles(void);
50+
// extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
51+
extern void RecoverPreparedFromFiles(bool overwriteOK);
5252
extern void RecoverPreparedFromXLOG(XLogReaderState *record);
5353

5454
extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);

0 commit comments

Comments
 (0)