Skip to content

Commit 552f945

Browse files
committed
add origin info to prepare
1 parent bde6eeb commit 552f945

File tree

3 files changed

+62
-1
lines changed

3 files changed

+62
-1
lines changed

src/backend/access/transam/twophase.c

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,9 @@ EndPrepare(GlobalTransaction gxact)
10221022
{
10231023
TwoPhaseFileHeader *hdr;
10241024
StateFileChunk *record;
1025+
xl_xact_origin xl_origin;
1026+
xl_xact_xinfo xl_xinfo;
1027+
uint8 info = XLOG_XACT_PREPARE;
10251028

10261029
/* Add the end sentinel to the list of 2PC records */
10271030
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1059,12 +1062,33 @@ EndPrepare(GlobalTransaction gxact)
10591062

10601063
START_CRIT_SECTION();
10611064

1065+
/* dump transaction origin information */
1066+
if (replorigin_session_origin != InvalidRepOriginId)
1067+
{
1068+
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
1069+
1070+
xl_origin.origin_lsn = replorigin_session_origin_lsn;
1071+
xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
1072+
}
1073+
1074+
if (xl_xinfo.xinfo != 0)
1075+
info |= XLOG_XACT_HAS_INFO;
1076+
10621077
MyPgXact->delayChkpt = true;
10631078

10641079
XLogBeginInsert();
10651080
for (record = records.head; record != NULL; record = record->next)
10661081
XLogRegisterData(record->data, record->len);
1067-
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1082+
1083+
if (xl_xinfo.xinfo != 0)
1084+
XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo));
1085+
1086+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
1087+
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
1088+
1089+
XLogIncludeOrigin();
1090+
1091+
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, info);
10681092
XLogFlush(gxact->prepare_end_lsn);
10691093

10701094
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1269,6 +1293,31 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12691293

12701294
parsed->msgs = (SharedInvalidationMessage *) bufptr;
12711295
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1296+
1297+
/*
1298+
* Parse xinfo now.
1299+
*/
1300+
1301+
parsed->xinfo = 0;
1302+
1303+
if (info & XLOG_XACT_HAS_INFO)
1304+
{
1305+
xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) bufptr;
1306+
parsed->xinfo = xl_xinfo->xinfo;
1307+
bufptr += sizeof(xl_xact_xinfo);
1308+
}
1309+
1310+
1311+
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
1312+
{
1313+
xl_xact_origin xl_origin;
1314+
/* we're only guaranteed 4 byte alignment, so copy onto stack */
1315+
memcpy(&xl_origin, bufptr, sizeof(xl_origin));
1316+
parsed->origin_lsn = xl_origin.origin_lsn;
1317+
parsed->origin_timestamp = xl_origin.origin_timestamp;
1318+
bufptr += sizeof(xl_xact_origin);
1319+
}
1320+
12721321
}
12731322

12741323

src/backend/replication/logical/decode.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,13 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
581581
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
582582
int i;
583583
TransactionId xid = parsed->twophase_xid;
584+
585+
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
586+
{
587+
origin_lsn = parsed->origin_lsn;
588+
commit_time = parsed->origin_timestamp;
589+
}
590+
584591
strcpy(ctx->reorder->gid, parsed->twophase_gid);
585592

586593
/*

src/include/access/xact.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,8 @@ typedef struct xl_xact_parsed_commit
304304

305305
typedef struct xl_xact_parsed_prepare
306306
{
307+
uint32 xinfo;
308+
307309
Oid dbId; /* MyDatabaseId */
308310

309311
int nsubxacts;
@@ -317,6 +319,9 @@ typedef struct xl_xact_parsed_prepare
317319

318320
TransactionId twophase_xid;
319321
char twophase_gid[GIDSIZE];
322+
323+
XLogRecPtr origin_lsn;
324+
TimestampTz origin_timestamp;
320325
} xl_xact_parsed_prepare;
321326

322327
typedef struct xl_xact_parsed_abort

0 commit comments

Comments
 (0)