Skip to content

Commit 250aeaa

Browse files
committed
clean up
1 parent 59ede55 commit 250aeaa

File tree

6 files changed

+25
-161
lines changed

6 files changed

+25
-161
lines changed

reinit.sh

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ make install > /dev/null
4343

4444
cat <<MSG
4545
###############################################################################
46-
# Check that we can commit after soft restart.
46+
# Check that we can commit and abort after soft restart.
4747
# Here checkpoint happens before shutdown and no WAL replay will not occur
4848
# during start. So code should re-create memory state from files.
4949
###############################################################################
@@ -55,17 +55,21 @@ psql <<SQL
5555
begin;
5656
insert into t values (42);
5757
prepare transaction 'x';
58+
begin;
59+
insert into t values (43);
60+
prepare transaction 'y';
5861
SQL
5962
./install/bin/pg_ctl -sw -D ./install/data -l ./install/data/logfile restart
6063
psql <<SQL
6164
commit prepared 'x';
65+
rollback prepared 'y';
6266
SQL
6367

6468

6569

6670
cat <<MSG
6771
###############################################################################
68-
# Check that we can commit after hard restart.
72+
# Check that we can commit and abort after hard restart.
6973
# On startup WAL replay will re-create memory for global transactions that
7074
# happend after last checkpoint and stored. After that
7175
###############################################################################
@@ -77,11 +81,15 @@ psql <<SQL
7781
begin;
7882
insert into t values (42);
7983
prepare transaction 'x';
84+
begin;
85+
insert into t values (43);
86+
prepare transaction 'y';
8087
SQL
8188
pkill -9 postgres
8289
./install/bin/pg_ctl -sw -D ./install/data -l ./install/data/logfile start
8390
psql <<SQL
8491
commit prepared 'x';
92+
rollback prepared 'y';
8593
SQL
8694

8795

@@ -188,8 +196,8 @@ cat <<MSG
188196
MSG
189197

190198
pkill -9 postgres
191-
reinit_master
192-
reinit_slave
199+
reinit_master >> /dev/null
200+
reinit_slave >> /dev/null
193201
psql <<SQL
194202
begin;
195203
insert into t values (42);
@@ -214,8 +222,8 @@ cat <<MSG
214222
MSG
215223

216224
pkill -9 postgres
217-
reinit_master
218-
reinit_slave
225+
reinit_master >> /dev/null
226+
reinit_slave >> /dev/null
219227
psql <<SQL
220228
begin;
221229
insert into t values (42);

src/backend/access/transam/twophase.c

Lines changed: 9 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
* files
4848
* * In case of crash replay will move data from xlog to files, if that
4949
* hasn't happened before. XXX TODO - move to shmem in replay also
50-
5150
*
5251
*-------------------------------------------------------------------------
5352
*/
@@ -79,7 +78,6 @@
7978
#include "replication/origin.h"
8079
#include "replication/syncrep.h"
8180
#include "replication/walsender.h"
82-
#include "replication/logicalfuncs.h"
8381
#include "storage/fd.h"
8482
#include "storage/ipc.h"
8583
#include "storage/predicate.h"
@@ -89,7 +87,6 @@
8987
#include "storage/smgr.h"
9088
#include "utils/builtins.h"
9189
#include "utils/memutils.h"
92-
#include "utils/snapmgr.h"
9390
#include "utils/timestamp.h"
9491

9592

@@ -1251,8 +1248,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
12511248
XLogReaderState *xlogreader;
12521249
char *errormsg;
12531250

1254-
Assert(!RecoveryInProgress());
1255-
12561251
xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
12571252
if (!xlogreader)
12581253
ereport(ERROR,
@@ -1280,7 +1275,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
12801275
*len = XLogRecGetDataLen(xlogreader);
12811276

12821277
*buf = palloc(sizeof(char)*XLogRecGetDataLen(xlogreader));
1283-
12841278
memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
12851279

12861280
XLogReaderFree(xlogreader);
@@ -1297,14 +1291,12 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
12971291
TwoPhaseFileHeader *hdr;
12981292
bool result;
12991293

1300-
fprintf(stderr, "===(%u) StandbyTransactionIdIsPrepared(%u) \n", getpid(), xid);
1301-
13021294
Assert(TransactionIdIsValid(xid));
13031295

13041296
if (max_prepared_xacts <= 0)
13051297
return false; /* nothing to do */
13061298

1307-
// check for in-memory tx here too
1299+
// NB: check for in-memory tx here too
13081300

13091301
/* Read and validate file */
13101302
buf = ReadTwoPhaseFile(xid, false);
@@ -1340,7 +1332,6 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13401332
int ndelrels;
13411333
SharedInvalidationMessage *invalmsgs;
13421334
int i;
1343-
bool file_used = false;
13441335

13451336
/*
13461337
* Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1362,6 +1353,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13621353
else
13631354
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
13641355

1356+
13651357
/*
13661358
* Disassemble the header area
13671359
*/
@@ -1529,8 +1521,6 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
15291521
pg_crc32c statefile_crc;
15301522
int fd;
15311523

1532-
fprintf(stderr, "===(%u) RecreateTwoPhaseFile called\n", getpid());
1533-
15341524
/* Recompute CRC */
15351525
INIT_CRC32C(statefile_crc);
15361526
COMP_CRC32C(statefile_crc, content, len);
@@ -1596,9 +1586,10 @@ XlogRedoFinishPrepared(TransactionId xid)
15961586
char *bufptr;
15971587
TwoPhaseFileHeader *hdr;
15981588

1599-
fprintf(stderr, "===(%u) XlogRedoCleanupPrepared called for %x\n", getpid(), xid);
1589+
// NB: take care about file state file removal
1590+
1591+
// NB: put lock there
16001592

1601-
/* We can do that without the lock in replay, aren't we? */
16021593
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
16031594
{
16041595
gxact = TwoPhaseState->prepXacts[i];
@@ -1761,28 +1752,17 @@ TransactionId
17611752
PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
17621753
{
17631754
TransactionId origNextXid = ShmemVariableCache->nextXid;
1764-
1765-
/* By this we will take into account xacts restored to memory */
1766-
// TransactionId origNextXid = GetOldestSafeDecodingTransactionId();
1767-
17681755
TransactionId result = origNextXid;
17691756
DIR *cldir;
17701757
struct dirent *clde;
17711758
TransactionId *xids = NULL;
17721759
int nxids = 0;
17731760
int allocsize = 0;
17741761

1775-
// !!! take care of TransactionId **xids_p, int *nxids_p
1776-
1777-
fprintf(stderr, "===(%u) PrescanPreparedTransactions called. result = %u\n", getpid(), result);
1762+
// NB: take care of TransactionId **xids_p, int *nxids_p
17781763

1779-
/*
1780-
* Since we want to find minimum among prepared xacts we can use that function ignoring
1781-
* KnownAssignedXids.
1782-
* Other option is just to iterate here through the procarray.
1783-
*/
1764+
// NB: just iterate through preparedXacts here
17841765
result = GetOldestActiveTransactionId();
1785-
// fprintf(stderr, "===(%u) PrescanPreparedTransactions called. should be = %u\n", getpid(), GetOldestActiveTransactionId());
17861766

17871767
cldir = AllocateDir(TWOPHASE_DIR);
17881768
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
@@ -1898,95 +1878,9 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
18981878
*nxids_p = nxids;
18991879
}
19001880

1901-
fprintf(stderr, "===(%u) PrescanPreparedTransactions ended. result = %u\n", getpid(), result);
19021881
return result;
19031882
}
19041883

1905-
// /*
1906-
// * StandbyRecoverPreparedTransactions
1907-
// *
1908-
// * Scan the pg_twophase directory and setup all the required information to
1909-
// * allow standby queries to treat prepared transactions as still active.
1910-
// * This is never called at the end of recovery - we use
1911-
// * RecoverPreparedTransactions() at that point.
1912-
// *
1913-
// * Currently we simply call SubTransSetParent() for any subxids of prepared
1914-
// * transactions. If overwriteOK is true, it's OK if some XIDs have already
1915-
// * been marked in pg_subtrans.
1916-
// */
1917-
// void
1918-
// StandbyRecoverPreparedTransactions(bool overwriteOK)
1919-
// {
1920-
// DIR *cldir;
1921-
// struct dirent *clde;
1922-
1923-
// fprintf(stderr, "!===(%u) StandbyRecoverPreparedTransactions called, overwriteOK = %u\n", getpid(), (int)overwriteOK);
1924-
1925-
// cldir = AllocateDir(TWOPHASE_DIR);
1926-
// while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1927-
// {
1928-
// if (strlen(clde->d_name) == 8 &&
1929-
// strspn(clde->d_name, "0123456789ABCDEF") == 8)
1930-
// {
1931-
// TransactionId xid;
1932-
// char *buf;
1933-
// TwoPhaseFileHeader *hdr;
1934-
// TransactionId *subxids;
1935-
// int i;
1936-
1937-
// xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1938-
1939-
// /* Already processed? */
1940-
// if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
1941-
// {
1942-
// ereport(WARNING,
1943-
// (errmsg("removing stale two-phase state file \"%s\"",
1944-
// clde->d_name)));
1945-
// RemoveTwoPhaseFile(xid, true);
1946-
// continue;
1947-
// }
1948-
1949-
// /* Read and validate file */
1950-
// buf = ReadTwoPhaseFile(xid, true);
1951-
// if (buf == NULL)
1952-
// {
1953-
// ereport(WARNING,
1954-
// (errmsg("removing corrupt two-phase state file \"%s\"",
1955-
// clde->d_name)));
1956-
// RemoveTwoPhaseFile(xid, true);
1957-
// continue;
1958-
// }
1959-
1960-
// /* Deconstruct header */
1961-
// hdr = (TwoPhaseFileHeader *) buf;
1962-
// if (!TransactionIdEquals(hdr->xid, xid))
1963-
// {
1964-
// ereport(WARNING,
1965-
// (errmsg("removing corrupt two-phase state file \"%s\"",
1966-
// clde->d_name)));
1967-
// RemoveTwoPhaseFile(xid, true);
1968-
// pfree(buf);
1969-
// continue;
1970-
// }
1971-
1972-
// /*
1973-
// * Examine subtransaction XIDs ... they should all follow main
1974-
// * XID.
1975-
// */
1976-
// subxids = (TransactionId *)
1977-
// (buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
1978-
// for (i = 0; i < hdr->nsubxacts; i++)
1979-
// {
1980-
// TransactionId subxid = subxids[i];
1981-
1982-
// Assert(TransactionIdFollows(subxid, xid));
1983-
// SubTransSetParent(xid, subxid, overwriteOK);
1984-
// }
1985-
// }
1986-
// }
1987-
// FreeDir(cldir);
1988-
// }
1989-
19901884
/*
19911885
* RecoverPreparedFromFiles
19921886
*
@@ -2000,9 +1894,8 @@ RecoverPreparedFromFiles(bool overwriteOK)
20001894
char dir[MAXPGPATH];
20011895
DIR *cldir;
20021896
struct dirent *clde;
2003-
// bool overwriteOK = false;
20041897

2005-
fprintf(stderr, "===(%u) RecoverPreparedFromFiles called\n", getpid());
1898+
// NB: look carefully at case overwriteOK=true
20061899

20071900
snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
20081901

@@ -2024,24 +1917,16 @@ RecoverPreparedFromFiles(bool overwriteOK)
20241917

20251918
xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
20261919

2027-
// /* Already recovered from WAL? */
2028-
// if (TransactionIdIsInProgress(xid))
2029-
// {
2030-
// fprintf(stderr, "! xid %x is in progress\n", xid);
2031-
// continue;
2032-
// }
1920+
// NB: put lock here
20331921

20341922
/* Already recovered from WAL? */
20351923
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
20361924
{
20371925
gxact = TwoPhaseState->prepXacts[i];
20381926
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
2039-
20401927

20411928
fprintf(stderr, "! %x ?= %x\n", xid, pgxact->xid);
20421929

2043-
2044-
20451930
if (xid == pgxact->xid)
20461931
goto next_file;
20471932
}
@@ -2152,8 +2037,6 @@ RecoverPreparedFromXLOG(XLogReaderState *record)
21522037
GlobalTransaction gxact;
21532038
int i;
21542039

2155-
fprintf(stderr, "===(%u) RecoverPreparedFromXLOG called\n", getpid());
2156-
21572040
/* Deconstruct header */
21582041
hdr = (TwoPhaseFileHeader *) buf;
21592042
Assert(TransactionIdEquals(hdr->xid, xid));

src/backend/access/transam/xact.c

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5567,7 +5567,6 @@ xact_redo(XLogReaderState *record)
55675567
Assert(TransactionIdIsValid(parsed.twophase_xid));
55685568
xact_redo_commit(&parsed, parsed.twophase_xid,
55695569
record->EndRecPtr, XLogRecGetOrigin(record));
5570-
RemoveTwoPhaseFile(parsed.twophase_xid, false);
55715570
XlogRedoFinishPrepared(parsed.twophase_xid);
55725571
}
55735572
}
@@ -5588,20 +5587,11 @@ xact_redo(XLogReaderState *record)
55885587
{
55895588
Assert(TransactionIdIsValid(parsed.twophase_xid));
55905589
xact_redo_abort(&parsed, parsed.twophase_xid);
5591-
RemoveTwoPhaseFile(parsed.twophase_xid, false);
5590+
XlogRedoFinishPrepared(parsed.twophase_xid);
55925591
}
55935592
}
55945593
else if (info == XLOG_XACT_PREPARE)
55955594
{
5596-
/* the record contents are exactly the 2PC file */
5597-
// RecreateTwoPhaseFile(XLogRecGetXid(record),
5598-
// XLogRecGetData(record), XLogRecGetDataLen(record));
5599-
5600-
5601-
// RecoverPreparedTransaction(XLogRecGetXid(record),
5602-
// (char *) XLogRecGetData(record), XLogRecGetDataLen(record));
5603-
// fprintf(stderr, "=== Recovering tx %lx : %lx \n", record->ReadRecPtr, record->EndRecPtr);
5604-
56055595
RecoverPreparedFromXLOG(record);
56065596
}
56075597
else if (info == XLOG_XACT_ASSIGNMENT)

0 commit comments

Comments
 (0)