Skip to content

Commit 031c3f6

Browse files
committed
dirty, but working version
1 parent 17cd2c4 commit 031c3f6

File tree

4 files changed

+25
-3
lines changed

4 files changed

+25
-3
lines changed

src/backend/access/transam/twophase.c

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,9 @@ StandbyCheckPointTwoPhase(XLogRecPtr redo_horizon)
217217
dlist_mutable_iter miter;
218218
int serialized_xacts = 0;
219219

220-
Assert(RecoveryInProgress());
220+
// Assert(RecoveryInProgress());
221+
222+
elog(WARNING, "StandbyCheckPointTwoPhase");
221223

222224
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
223225

@@ -226,11 +228,13 @@ StandbyCheckPointTwoPhase(XLogRecPtr redo_horizon)
226228
StandbyPreparedTransaction *xact = dlist_container(StandbyPreparedTransaction,
227229
list_node, miter.cur);
228230

229-
if (xact->prepare_end_lsn <= redo_horizon)
231+
if (redo_horizon == 0 || xact->prepare_end_lsn <= redo_horizon)
232+
// if (true)
230233
{
231234
char *buf;
232235
int len;
233236

237+
fprintf(stderr, "2PC: checkpoint: %x --> %d (horizon: %x)\n", xact->prepare_start_lsn, xact->xid, redo_horizon);
234238
XlogReadTwoPhaseData(xact->prepare_start_lsn, &buf, &len);
235239
RecreateTwoPhaseFile(xact->xid, buf, len);
236240
pfree(buf);
@@ -267,6 +271,7 @@ StandbyAtCommit(TransactionId xid)
267271
if (xact->xid == xid)
268272
{
269273
// pfree(xact);
274+
fprintf(stderr, "2PC: commit: %x/%d\n", xact->prepare_start_lsn, xact->xid);
270275
dlist_delete(miter.cur);
271276
return;
272277
}
@@ -1741,6 +1746,8 @@ StandbyAtPrepare(XLogReaderState *record)
17411746
xact->prepare_start_lsn = record->ReadRecPtr;
17421747
xact->prepare_end_lsn = record->EndRecPtr;
17431748

1749+
fprintf(stderr, "2PC: at_prepare: %x/%d\n", xact->prepare_start_lsn, xact->xid);
1750+
17441751
dlist_push_tail(&StandbyTwoPhaseStateData, &xact->list_node);
17451752
}
17461753

@@ -1782,6 +1789,9 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
17821789
int nxids = 0;
17831790
int allocsize = 0;
17841791

1792+
fprintf(stderr, "--- PrescanPreparedTransactions\n");
1793+
StandbyCheckPointTwoPhase(0);
1794+
17851795
cldir = AllocateDir(TWOPHASE_DIR);
17861796
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
17871797
{
@@ -1918,6 +1928,8 @@ StandbyRecoverPreparedTransactions(bool overwriteOK)
19181928
DIR *cldir;
19191929
struct dirent *clde;
19201930

1931+
fprintf(stderr, "--- StandbyRecoverPreparedTransactions\n");
1932+
19211933
cldir = AllocateDir(TWOPHASE_DIR);
19221934
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
19231935
{
@@ -2001,6 +2013,8 @@ RecoverPreparedTransactions(void)
20012013
struct dirent *clde;
20022014
bool overwriteOK = false;
20032015

2016+
fprintf(stderr, "--- RecoverPreparedTransactions\n");
2017+
20042018
snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
20052019

20062020
cldir = AllocateDir(dir);

src/backend/access/transam/xact.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5635,6 +5635,7 @@ xact_redo(XLogReaderState *record)
56355635
// elog(WARNING, "2PC: RecreateTwoPhaseFile");
56365636
// RecreateTwoPhaseFile(XLogRecGetXid(record),
56375637
// XLogRecGetData(record), XLogRecGetDataLen(record));
5638+
elog(WARNING, "reading prepared tx");
56385639
StandbyAtPrepare(record);
56395640
}
56405641
else if (info == XLOG_XACT_ASSIGNMENT)

src/backend/access/transam/xlog.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6734,6 +6734,7 @@ StartupXLOG(void)
67346734

67356735
ProcArrayApplyRecoveryInfo(&running);
67366736

6737+
StandbyCheckPointTwoPhase(0);
67376738
StandbyRecoverPreparedTransactions(false);
67386739
}
67396740
}
@@ -8773,6 +8774,7 @@ CreateEndOfRecoveryRecord(void)
87738774
END_CRIT_SECTION();
87748775

87758776
LocalXLogInsertAllowed = -1; /* return to "check" state */
8777+
// StandbyCheckPointTwoPhase(0);
87768778
}
87778779

87788780
/*
@@ -8796,6 +8798,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
87968798
CheckPointBuffers(flags); /* performs all required fsyncs */
87978799
CheckPointReplicationOrigin();
87988800
/* We deliberately delay 2PC checkpointing as long as possible */
8801+
StandbyCheckPointTwoPhase(checkPointRedo);
87998802
CheckPointTwoPhase(checkPointRedo);
88008803
}
88018804

@@ -9479,6 +9482,8 @@ xlog_redo(XLogReaderState *record)
94799482

94809483
ProcArrayApplyRecoveryInfo(&running);
94819484

9485+
fprintf(stderr, "--- aaa\n");
9486+
StandbyCheckPointTwoPhase(0);
94829487
StandbyRecoverPreparedTransactions(true);
94839488
}
94849489

@@ -9571,6 +9576,7 @@ xlog_redo(XLogReaderState *record)
95719576
ereport(PANIC,
95729577
(errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
95739578
xlrec.ThisTimeLineID, ThisTimeLineID)));
9579+
// StandbyCheckPointTwoPhase(0);
95749580
}
95759581
else if (info == XLOG_NOOP)
95769582
{

src/test/recovery/t/000_twophase.pl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
$node_master->init(allows_streaming => 1);
1111
$node_master->append_conf('postgresql.conf', qq(
1212
max_prepared_transactions = 10
13+
log_checkpoints = true
1314
));
1415
$node_master->start;
1516
$node_master->backup('master_backup');
@@ -311,4 +312,4 @@
311312

312313
$node_master->psql('postgres', "select count(*) from t",
313314
stdout => \$psql_out);
314-
is($psql_out, '6', "Check nextXid handling for prepared subtransactions");
315+
is($psql_out, '6', "Check nextXid handling for prepared subtransactions");

0 commit comments

Comments
 (0)