Skip to content

Commit 5b15778

Browse files
committed
log gids only when wal_level>=logical; store origin info when it is needed
1 parent de6ba77 commit 5b15778

File tree

4 files changed

+103
-15
lines changed

4 files changed

+103
-15
lines changed

src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,15 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
9898
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
9999
{
100100
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
101-
uint8 gidlen = xl_twophase->gidlen;
102101

103102
parsed->twophase_xid = xl_twophase->xid;
104103
data += MinSizeOfXactTwophase;
105104

106-
strcpy(parsed->twophase_gid, data);
107-
data += gidlen;
105+
if (parsed->xinfo & XACT_XINFO_HAS_GID)
106+
{
107+
strcpy(parsed->twophase_gid, data);
108+
data += strlen(parsed->twophase_gid) + 1;
109+
}
108110
}
109111

110112
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -177,13 +179,28 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
177179
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
178180
{
179181
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
180-
uint8 gidlen = xl_twophase->gidlen;
181182

182183
parsed->twophase_xid = xl_twophase->xid;
183184
data += MinSizeOfXactTwophase;
184185

185-
strcpy(parsed->twophase_gid, data);
186-
data += gidlen;
186+
if (parsed->xinfo & XACT_XINFO_HAS_GID)
187+
{
188+
strcpy(parsed->twophase_gid, data);
189+
data += strlen(parsed->twophase_gid) + 1;
190+
}
191+
}
192+
193+
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
194+
{
195+
xl_xact_origin xl_origin;
196+
197+
/* we're only guaranteed 4 byte alignment, so copy onto stack */
198+
memcpy(&xl_origin, data, sizeof(xl_origin));
199+
200+
parsed->origin_lsn = xl_origin.origin_lsn;
201+
parsed->origin_timestamp = xl_origin.origin_timestamp;
202+
203+
data += sizeof(xl_xact_origin);
187204
}
188205
}
189206

src/backend/access/transam/twophase.c

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
855855
/*
856856
* Header for a 2PC state file
857857
*/
858-
#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */
858+
#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
859859

860860
typedef struct TwoPhaseFileHeader
861861
{
@@ -871,6 +871,8 @@ typedef struct TwoPhaseFileHeader
871871
int32 ninvalmsgs; /* number of cache invalidation messages */
872872
bool initfileinval; /* does relcache init file need invalidation? */
873873
uint16 gidlen; /* length of the GID - GID follows the header */
874+
XLogRecPtr origin_lsn; /* lsn of this record at origin node */
875+
TimestampTz origin_timestamp; /* time of prepare at origin node */
874876
} TwoPhaseFileHeader;
875877

876878
/*
@@ -1022,6 +1024,7 @@ EndPrepare(GlobalTransaction gxact)
10221024
{
10231025
TwoPhaseFileHeader *hdr;
10241026
StateFileChunk *record;
1027+
bool replorigin;
10251028

10261029
/* Add the end sentinel to the list of 2PC records */
10271030
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1032,6 +1035,21 @@ EndPrepare(GlobalTransaction gxact)
10321035
Assert(hdr->magic == TWOPHASE_MAGIC);
10331036
hdr->total_len = records.total_len + sizeof(pg_crc32c);
10341037

1038+
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1039+
replorigin_session_origin != DoNotReplicateId);
1040+
1041+
if (replorigin)
1042+
{
1043+
Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1044+
hdr->origin_lsn = replorigin_session_origin_lsn;
1045+
hdr->origin_timestamp = replorigin_session_origin_timestamp;
1046+
}
1047+
else
1048+
{
1049+
hdr->origin_lsn = InvalidXLogRecPtr;
1050+
hdr->origin_timestamp = 0;
1051+
}
1052+
10351053
/*
10361054
* If the data size exceeds MaxAllocSize, we won't be able to read it in
10371055
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1062,9 +1080,19 @@ EndPrepare(GlobalTransaction gxact)
10621080
MyPgXact->delayChkpt = true;
10631081

10641082
XLogBeginInsert();
1083+
10651084
for (record = records.head; record != NULL; record = record->next)
10661085
XLogRegisterData(record->data, record->len);
1086+
1087+
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1088+
10671089
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1090+
1091+
if (replorigin)
1092+
/* Move LSNs forward for this replication origin */
1093+
replorigin_session_advance(replorigin_session_origin_lsn,
1094+
gxact->prepare_end_lsn);
1095+
10681096
XLogFlush(gxact->prepare_end_lsn);
10691097

10701098
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1252,6 +1280,8 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12521280
hdr = (TwoPhaseFileHeader *) xlrec;
12531281
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
12541282

1283+
parsed->origin_lsn = hdr->origin_lsn;
1284+
parsed->origin_timestamp = hdr->origin_timestamp;
12551285
parsed->twophase_xid = hdr->xid;
12561286
parsed->dbId = hdr->database;
12571287
parsed->nsubxacts = hdr->nsubxacts;

src/backend/access/transam/xact.c

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5120,6 +5120,7 @@ XactLogCommitRecord(TimestampTz commit_time,
51205120
xl_xact_origin xl_origin;
51215121

51225122
uint8 info;
5123+
int gidlen = 0;
51235124

51245125
Assert(CritSectionCount > 0);
51255126

@@ -5180,7 +5181,13 @@ XactLogCommitRecord(TimestampTz commit_time,
51805181
{
51815182
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
51825183
xl_twophase.xid = twophase_xid;
5183-
xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
5184+
Assert(twophase_gid != NULL);
5185+
5186+
if (XLogLogicalInfoActive())
5187+
{
5188+
xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
5189+
gidlen = strlen(twophase_gid) + 1; /* include '\0' */
5190+
}
51845191
}
51855192

51865193
/* dump transaction origin information */
@@ -5233,7 +5240,9 @@ XactLogCommitRecord(TimestampTz commit_time,
52335240
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
52345241
{
52355242
XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
5236-
XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
5243+
5244+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
5245+
XLogRegisterData((char *) twophase_gid, gidlen);
52375246
}
52385247

52395248
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -5263,8 +5272,10 @@ XactLogAbortRecord(TimestampTz abort_time,
52635272
xl_xact_relfilenodes xl_relfilenodes;
52645273
xl_xact_twophase xl_twophase;
52655274
xl_xact_dbinfo xl_dbinfo;
5275+
xl_xact_origin xl_origin;
52665276

52675277
uint8 info;
5278+
int gidlen = 0;
52685279

52695280
Assert(CritSectionCount > 0);
52705281

@@ -5276,7 +5287,6 @@ XactLogAbortRecord(TimestampTz abort_time,
52765287
else
52775288
info = XLOG_XACT_ABORT_PREPARED;
52785289

5279-
52805290
/* First figure out and collect all the information needed */
52815291

52825292
xlrec.xact_time = abort_time;
@@ -5297,7 +5307,13 @@ XactLogAbortRecord(TimestampTz abort_time,
52975307
{
52985308
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
52995309
xl_twophase.xid = twophase_xid;
5300-
xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
5310+
Assert(twophase_gid != NULL);
5311+
5312+
if (XLogLogicalInfoActive())
5313+
{
5314+
xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
5315+
gidlen = strlen(twophase_gid) + 1; /* include '\0' */
5316+
}
53015317
}
53025318

53035319
if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
@@ -5307,6 +5323,17 @@ XactLogAbortRecord(TimestampTz abort_time,
53075323
xl_dbinfo.tsId = MyDatabaseTableSpace;
53085324
}
53095325

5326+
/* dump transaction origin information only for abort prepared */
5327+
if ( (replorigin_session_origin != InvalidRepOriginId) &&
5328+
TransactionIdIsValid(twophase_xid) &&
5329+
XLogLogicalInfoActive())
5330+
{
5331+
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
5332+
5333+
xl_origin.origin_lsn = replorigin_session_origin_lsn;
5334+
xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
5335+
}
5336+
53105337
if (xl_xinfo.xinfo != 0)
53115338
info |= XLOG_XACT_HAS_INFO;
53125339

@@ -5341,12 +5368,20 @@ XactLogAbortRecord(TimestampTz abort_time,
53415368
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
53425369
{
53435370
XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
5344-
XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
5371+
5372+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
5373+
XLogRegisterData((char *) twophase_gid, gidlen);
53455374
}
53465375

53475376
if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
53485377
XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
53495378

5379+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
5380+
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
5381+
5382+
if (TransactionIdIsValid(twophase_xid))
5383+
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
5384+
53505385
return XLogInsert(RM_XACT_ID, info);
53515386
}
53525387

src/include/access/xact.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
#include "utils/datetime.h"
2323

2424
/*
25-
* Maximum size of Global Transaction ID.
25+
* Maximum size of Global Transaction ID (including '\0').
2626
*/
2727
#define GIDSIZE 200
2828

@@ -141,6 +141,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
141141
#define XACT_XINFO_HAS_INVALS (1U << 3)
142142
#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
143143
#define XACT_XINFO_HAS_ORIGIN (1U << 5)
144+
#define XACT_XINFO_HAS_GID (1U << 6)
144145

145146
/*
146147
* Also stored in xinfo, these indicating a variety of additional actions that
@@ -228,8 +229,7 @@ typedef struct xl_xact_invals
228229
typedef struct xl_xact_twophase
229230
{
230231
TransactionId xid;
231-
uint8 gidlen;
232-
char gid[GIDSIZE];
232+
char gid[GIDSIZE];
233233
} xl_xact_twophase;
234234
#define MinSizeOfXactTwophase offsetof(xl_xact_twophase, gid)
235235

@@ -314,6 +314,9 @@ typedef struct xl_xact_parsed_prepare
314314

315315
TransactionId twophase_xid;
316316
char twophase_gid[GIDSIZE];
317+
318+
XLogRecPtr origin_lsn;
319+
TimestampTz origin_timestamp;
317320
} xl_xact_parsed_prepare;
318321

319322
typedef struct xl_xact_parsed_abort
@@ -332,6 +335,9 @@ typedef struct xl_xact_parsed_abort
332335

333336
TransactionId twophase_xid; /* only for 2PC */
334337
char twophase_gid[GIDSIZE];
338+
339+
XLogRecPtr origin_lsn;
340+
TimestampTz origin_timestamp;
335341
} xl_xact_parsed_abort;
336342

337343

0 commit comments

Comments
 (0)