Skip to content

Commit 0bead9a

Browse files
author
Amit Kapila
committed
Immediately WAL-log subtransaction and top-level XID association.
The logical decoding infrastructure needs to know which top-level transaction the subxact belongs to, in order to decode all the changes. Until now that might be delayed until commit, due to the caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring incremental decoding. So we also write the assignment info into WAL immediately, as part of the next WAL record (to minimize overhead) only when wal_level=logical. We can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that is required for avoiding overflow in the hot standby snapshot. Bump XLOG_PAGE_MAGIC, since this introduces XLR_BLOCK_ID_TOPLEVEL_XID. Author: Tomas Vondra, Dilip Kumar, Amit Kapila Reviewed-by: Amit Kapila Tested-by: Neha Sharma and Mahendra Singh Thalor Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent d05b172 commit 0bead9a

File tree

9 files changed

+108
-24
lines changed

9 files changed

+108
-24
lines changed

src/backend/access/transam/xact.c

+50
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ typedef struct TransactionStateData
191191
bool didLogXid; /* has xid been included in WAL record? */
192192
int parallelModeLevel; /* Enter/ExitParallelMode counter */
193193
bool chain; /* start a new block after this one */
194+
bool assigned; /* assigned to top-level XID */
194195
struct TransactionStateData *parent; /* back link to parent */
195196
} TransactionStateData;
196197

@@ -223,6 +224,7 @@ typedef struct SerializedTransactionState
223224
static TransactionStateData TopTransactionStateData = {
224225
.state = TRANS_DEFAULT,
225226
.blockState = TBLOCK_DEFAULT,
227+
.assigned = false,
226228
};
227229

228230
/*
@@ -5120,6 +5122,7 @@ PushTransaction(void)
51205122
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
51215123
s->prevXactReadOnly = XactReadOnly;
51225124
s->parallelModeLevel = 0;
5125+
s->assigned = false;
51235126

51245127
CurrentTransactionState = s;
51255128

@@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record)
60226025
else
60236026
elog(PANIC, "xact_redo: unknown op code %u", info);
60246027
}
6028+
6029+
/*
6030+
* IsSubTransactionAssignmentPending
6031+
*
6032+
* This is used to decide whether we need to WAL log the top-level XID for
6033+
* operation in a subtransaction. We require that for logical decoding, see
6034+
* LogicalDecodingProcessRecord.
6035+
*
6036+
* This returns true if wal_level >= logical and we are inside a valid
6037+
* subtransaction, for which the assignment was not yet written to any WAL
6038+
* record.
6039+
*/
6040+
bool
6041+
IsSubTransactionAssignmentPending(void)
6042+
{
6043+
/* wal_level has to be logical */
6044+
if (!XLogLogicalInfoActive())
6045+
return false;
6046+
6047+
/* we need to be in a transaction state */
6048+
if (!IsTransactionState())
6049+
return false;
6050+
6051+
/* it has to be a subtransaction */
6052+
if (!IsSubTransaction())
6053+
return false;
6054+
6055+
/* the subtransaction has to have a XID assigned */
6056+
if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
6057+
return false;
6058+
6059+
/* and it should not be already 'assigned' */
6060+
return !CurrentTransactionState->assigned;
6061+
}
6062+
6063+
/*
6064+
* MarkSubTransactionAssigned
6065+
*
6066+
* Mark the subtransaction assignment as completed.
6067+
*/
6068+
void
6069+
MarkSubTransactionAssigned(void)
6070+
{
6071+
Assert(IsSubTransactionAssignmentPending());
6072+
6073+
CurrentTransactionState->assigned = true;
6074+
}

src/backend/access/transam/xloginsert.c

+21-2
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,13 @@ static XLogRecData hdr_rdt;
8989
static char *hdr_scratch = NULL;
9090

9191
#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
92+
#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
9293

9394
#define HEADER_SCRATCH_SIZE \
9495
(SizeOfXLogRecord + \
9596
MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
96-
SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
97+
SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
98+
SizeOfXLogTransactionId)
9799

98100
/*
99101
* An array of XLogRecData structs, to hold registered data.
@@ -195,6 +197,10 @@ XLogResetInsertion(void)
195197
{
196198
int i;
197199

200+
/* reset the subxact assignment flag (if needed) */
201+
if (curinsert_flags & XLOG_INCLUDE_XID)
202+
MarkSubTransactionAssigned();
203+
198204
for (i = 0; i < max_registered_block_id; i++)
199205
registered_buffers[i].in_use = false;
200206

@@ -398,7 +404,7 @@ void
398404
XLogSetRecordFlags(uint8 flags)
399405
{
400406
Assert(begininsert_called);
401-
curinsert_flags = flags;
407+
curinsert_flags |= flags;
402408
}
403409

404410
/*
@@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
748754
scratch += sizeof(replorigin_session_origin);
749755
}
750756

757+
/* followed by toplevel XID, if not already included in previous record */
758+
if (IsSubTransactionAssignmentPending())
759+
{
760+
TransactionId xid = GetTopTransactionIdIfAny();
761+
762+
/* update the flag (later used by XLogResetInsertion) */
763+
XLogSetRecordFlags(XLOG_INCLUDE_XID);
764+
765+
*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
766+
memcpy(scratch, &xid, sizeof(TransactionId));
767+
scratch += sizeof(TransactionId);
768+
}
769+
751770
/* followed by main data, if any */
752771
if (mainrdata_len > 0)
753772
{

src/backend/access/transam/xlogreader.c

+5
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
11971197

11981198
state->decoded_record = record;
11991199
state->record_origin = InvalidRepOriginId;
1200+
state->toplevel_xid = InvalidTransactionId;
12001201

12011202
ptr = (char *) record;
12021203
ptr += SizeOfXLogRecord;
@@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
12351236
{
12361237
COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
12371238
}
1239+
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
1240+
{
1241+
COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
1242+
}
12381243
else if (block_id <= XLR_MAX_BLOCK_ID)
12391244
{
12401245
/* XLogRecordBlockHeader */

src/backend/replication/logical/decode.c

+23-21
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,27 @@ void
9494
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
9595
{
9696
XLogRecordBuffer buf;
97+
TransactionId txid;
9798

9899
buf.origptr = ctx->reader->ReadRecPtr;
99100
buf.endptr = ctx->reader->EndRecPtr;
100101
buf.record = record;
101102

103+
txid = XLogRecGetTopXid(record);
104+
105+
/*
106+
* If the top-level xid is valid, we need to assign the subxact to the
107+
* top-level xact. We need to do this for all records, hence we do it
108+
* before the switch.
109+
*/
110+
if (TransactionIdIsValid(txid))
111+
{
112+
ReorderBufferAssignChild(ctx->reorder,
113+
txid,
114+
record->decoded_record->xl_xid,
115+
buf.origptr);
116+
}
117+
102118
/* cast so we get a warning when new rmgrs are added */
103119
switch ((RmgrId) XLogRecGetRmid(record))
104120
{
@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
216232
/*
217233
* If the snapshot isn't yet fully built, we cannot decode anything, so
218234
* bail out.
219-
*
220-
* However, it's critical to process XLOG_XACT_ASSIGNMENT records even
221-
* when the snapshot is being built: it is possible to get later records
222-
* that require subxids to be properly assigned.
223235
*/
224-
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
225-
info != XLOG_XACT_ASSIGNMENT)
236+
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
226237
return;
227238

228239
switch (info)
@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
264275
break;
265276
}
266277
case XLOG_XACT_ASSIGNMENT:
267-
{
268-
xl_xact_assignment *xlrec;
269-
int i;
270-
TransactionId *sub_xid;
271278

272-
xlrec = (xl_xact_assignment *) XLogRecGetData(r);
273-
274-
sub_xid = &xlrec->xsub[0];
275-
276-
for (i = 0; i < xlrec->nsubxacts; i++)
277-
{
278-
ReorderBufferAssignChild(reorder, xlrec->xtop,
279-
*(sub_xid++), buf->origptr);
280-
}
281-
break;
282-
}
279+
/*
280+
* We assign subxact to the toplevel xact while processing each
281+
* record if required. So, we don't need to do anything here.
282+
* See LogicalDecodingProcessRecord.
283+
*/
284+
break;
283285
case XLOG_XACT_PREPARE:
284286

285287
/*

src/include/access/xact.h

+3
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
428428
extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
429429
extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
430430

431+
extern bool IsSubTransactionAssignmentPending(void);
432+
extern void MarkSubTransactionAssigned(void);
433+
431434
extern int xactGetCommittedChildren(TransactionId **ptr);
432435

433436
extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,

src/include/access/xlog.h

+1
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ extern bool XLOG_DEBUG;
237237
*/
238238
#define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */
239239
#define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */
240+
#define XLOG_INCLUDE_XID 0x04 /* include XID of top-level xact */
240241

241242

242243
/* Checkpoint statistics */

src/include/access/xlog_internal.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
/*
3232
* Each page of XLOG file has a header like this:
3333
*/
34-
#define XLOG_PAGE_MAGIC 0xD106 /* can be used as WAL version indicator */
34+
#define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */
3535

3636
typedef struct XLogPageHeaderData
3737
{

src/include/access/xlogreader.h

+3
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ struct XLogReaderState
191191

192192
RepOriginId record_origin;
193193

194+
TransactionId toplevel_xid; /* XID of top-level transaction */
195+
194196
/* information about blocks referenced by the record. */
195197
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
196198

@@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
304306
#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
305307
#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
306308
#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
309+
#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
307310
#define XLogRecGetData(decoder) ((decoder)->main_data)
308311
#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
309312
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)

src/include/access/xlogrecord.h

+1
Original file line numberDiff line numberDiff line change
@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
223223
#define XLR_BLOCK_ID_DATA_SHORT 255
224224
#define XLR_BLOCK_ID_DATA_LONG 254
225225
#define XLR_BLOCK_ID_ORIGIN 253
226+
#define XLR_BLOCK_ID_TOPLEVEL_XID 252
226227

227228
#endif /* XLOGRECORD_H */

0 commit comments

Comments
 (0)