Skip to content

Commit 6910089

Browse files
committed
seems to be working
1 parent 2ef6fe9 commit 6910089

File tree

4 files changed

+107
-5
lines changed

4 files changed

+107
-5
lines changed

src/backend/access/transam/twophase.c

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,83 @@ static void RemoveGXact(GlobalTransaction gxact);
200200

201201
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
202202

203+
204+
dlist_head StandbyTwoPhaseStateData = DLIST_STATIC_INIT(StandbyTwoPhaseStateData);
205+
206+
typedef struct StandbyPreparedTransaction
207+
{
208+
TransactionId xid;
209+
XLogRecPtr prepare_start_lsn;
210+
XLogRecPtr prepare_end_lsn;
211+
dlist_node list_node;
212+
} StandbyPreparedTransaction;
213+
214+
void
215+
StandbyCheckPointTwoPhase(XLogRecPtr redo_horizon)
216+
{
217+
dlist_mutable_iter miter;
218+
int serialized_xacts = 0;
219+
220+
Assert(RecoveryInProgress());
221+
222+
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
223+
224+
dlist_foreach_modify(miter, &StandbyTwoPhaseStateData)
225+
{
226+
StandbyPreparedTransaction *xact = dlist_container(StandbyPreparedTransaction,
227+
list_node, miter.cur);
228+
229+
if (xact->prepare_end_lsn <= redo_horizon)
230+
{
231+
char *buf;
232+
int len;
233+
234+
XlogReadTwoPhaseData(xact->prepare_start_lsn, &buf, &len);
235+
RecreateTwoPhaseFile(xact->xid, buf, len);
236+
pfree(buf);
237+
dlist_delete(miter.cur);
238+
serialized_xacts++;
239+
}
240+
}
241+
242+
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
243+
244+
if (log_checkpoints && serialized_xacts > 0)
245+
ereport(LOG,
246+
(errmsg_plural("%u two-phase state file was written "
247+
"for long-running prepared transactions",
248+
"%u two-phase state files were written "
249+
"for long-running prepared transactions",
250+
serialized_xacts,
251+
serialized_xacts)));
252+
}
253+
254+
// XXX: rename to remove_standby_state
255+
void
256+
StandbyAtCommit(TransactionId xid)
257+
{
258+
dlist_mutable_iter miter;
259+
260+
Assert(RecoveryInProgress());
261+
262+
dlist_foreach_modify(miter, &StandbyTwoPhaseStateData)
263+
{
264+
StandbyPreparedTransaction *xact = dlist_container(StandbyPreparedTransaction,
265+
list_node, miter.cur);
266+
267+
if (xact->xid == xid)
268+
{
269+
// pfree(xact);
270+
dlist_delete(miter.cur);
271+
return;
272+
}
273+
}
274+
275+
RemoveTwoPhaseFile(xid, false);
276+
}
277+
278+
279+
203280
/*
204281
* Initialization of shared memory
205282
*/
@@ -1252,7 +1329,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
12521329
XLogReaderState *xlogreader;
12531330
char *errormsg;
12541331

1255-
Assert(!RecoveryInProgress());
1332+
// Assert(!RecoveryInProgress());
12561333

12571334
xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
12581335
if (!xlogreader)
@@ -1653,6 +1730,20 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
16531730
serialized_xacts)));
16541731
}
16551732

1733+
void
1734+
StandbyAtPrepare(XLogReaderState *record)
1735+
{
1736+
StandbyPreparedTransaction *xact;
1737+
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) XLogRecGetData(record);
1738+
1739+
xact = (StandbyPreparedTransaction *) palloc(sizeof(StandbyPreparedTransaction));
1740+
xact->xid = hdr->xid;
1741+
xact->prepare_start_lsn = record->ReadRecPtr;
1742+
xact->prepare_end_lsn = record->EndRecPtr;
1743+
1744+
dlist_push_tail(&StandbyTwoPhaseStateData, &xact->list_node);
1745+
}
1746+
16561747
/*
16571748
* PrescanPreparedTransactions
16581749
*

src/backend/access/transam/xact.c

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5604,7 +5604,8 @@ xact_redo(XLogReaderState *record)
56045604
Assert(TransactionIdIsValid(parsed.twophase_xid));
56055605
xact_redo_commit(&parsed, parsed.twophase_xid,
56065606
record->EndRecPtr, XLogRecGetOrigin(record));
5607-
RemoveTwoPhaseFile(parsed.twophase_xid, false);
5607+
// RemoveTwoPhaseFile(parsed.twophase_xid, false);
5608+
StandbyAtCommit(parsed.twophase_xid);
56085609
}
56095610
}
56105611
else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
@@ -5624,14 +5625,17 @@ xact_redo(XLogReaderState *record)
56245625
{
56255626
Assert(TransactionIdIsValid(parsed.twophase_xid));
56265627
xact_redo_abort(&parsed, parsed.twophase_xid);
5627-
RemoveTwoPhaseFile(parsed.twophase_xid, false);
5628+
// RemoveTwoPhaseFile(parsed.twophase_xid, false);
5629+
StandbyAtCommit(parsed.twophase_xid);
56285630
}
56295631
}
56305632
else if (info == XLOG_XACT_PREPARE)
56315633
{
56325634
/* the record contents are exactly the 2PC file */
5633-
RecreateTwoPhaseFile(XLogRecGetXid(record),
5634-
XLogRecGetData(record), XLogRecGetDataLen(record));
5635+
// elog(WARNING, "2PC: RecreateTwoPhaseFile");
5636+
// RecreateTwoPhaseFile(XLogRecGetXid(record),
5637+
// XLogRecGetData(record), XLogRecGetDataLen(record));
5638+
StandbyAtPrepare(record);
56355639
}
56365640
else if (info == XLOG_XACT_ASSIGNMENT)
56375641
{

src/backend/access/transam/xlog.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9395,6 +9395,7 @@ xlog_redo(XLogReaderState *record)
93959395
Assert(info == XLOG_FPI || info == XLOG_FPI_FOR_HINT ||
93969396
!XLogRecHasAnyBlockRefs(record));
93979397

9398+
elog(WARNING, "2PC: xlog_redo, info=%x", info);
93989399
if (info == XLOG_NEXTOID)
93999400
{
94009401
Oid nextOid;
@@ -9548,6 +9549,7 @@ xlog_redo(XLogReaderState *record)
95489549
checkPoint.ThisTimeLineID, ThisTimeLineID)));
95499550

95509551
RecoveryRestartPoint(&checkPoint);
9552+
StandbyCheckPointTwoPhase(checkPoint.redo);
95519553
}
95529554
else if (info == XLOG_END_OF_RECOVERY)
95539555
{

src/include/access/twophase.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#define TWOPHASE_H
1616

1717
#include "access/xlogdefs.h"
18+
#include "access/xlogreader.h"
1819
#include "datatype/timestamp.h"
1920
#include "storage/lock.h"
2021

@@ -56,4 +57,8 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
5657

5758
extern void FinishPreparedTransaction(const char *gid, bool isCommit);
5859

60+
extern void StandbyAtCommit(TransactionId xid);
61+
extern void StandbyAtPrepare(XLogReaderState *record);
62+
extern void StandbyCheckPointTwoPhase(XLogRecPtr redo_horizon);
63+
5964
#endif /* TWOPHASE_H */

0 commit comments

Comments
 (0)