Skip to content

Commit c55040c

Browse files
author
Amit Kapila
committed
WAL Log invalidations at command end with wal_level=logical.
When wal_level=logical, write invalidations at command end into WAL so that decoding can use this information. This patch is required to allow the streaming of in-progress transactions in logical decoding.  The actual work to allow streaming will be committed as a separate patch. We still add the invalidations to the cache and write them to WAL at commit time in RecordTransactionCommit(). This uses the existing XLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resource manager (see LogStandbyInvalidations for details). So existing code relying on those invalidations (e.g. redo) does not need to be changed. The invalidations written at command end uses a new xlog record type XLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource manager. See LogLogicalInvalidations for details. These new xlog records are ignored by existing redo procedures, which still rely on the invalidations written to commit records. The invalidations are decoded and accumulated in top-transaction, and then executed during replay.  This obviates the need to decode the invalidations as part of a commit record. Bump XLOG_PAGE_MAGIC, since this introduces XLOG_XACT_INVALIDATIONS. Author: Dilip Kumar, Tomas Vondra, Amit Kapila Reviewed-by: Amit Kapila Tested-by: Neha Sharma and Mahendra Singh Thalor Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent 38f60f1 commit c55040c

File tree

9 files changed

+166
-34
lines changed

9 files changed

+166
-34
lines changed

src/backend/access/rmgrdesc/xactdesc.c

+10
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,13 @@ xact_desc(StringInfo buf, XLogReaderState *record)
396396
appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
397397
xact_desc_assignment(buf, xlrec);
398398
}
399+
else if (info == XLOG_XACT_INVALIDATIONS)
400+
{
401+
xl_xact_invals *xlrec = (xl_xact_invals *) rec;
402+
403+
standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs, InvalidOid,
404+
InvalidOid, false);
405+
}
399406
}
400407

401408
const char *
@@ -423,6 +430,9 @@ xact_identify(uint8 info)
423430
case XLOG_XACT_ASSIGNMENT:
424431
id = "ASSIGNMENT";
425432
break;
433+
case XLOG_XACT_INVALIDATIONS:
434+
id = "INVALIDATION";
435+
break;
426436
}
427437

428438
return id;

src/backend/access/transam/xact.c

+17
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,16 @@ RecordTransactionCommit(void)
12241224
bool RelcacheInitFileInval = false;
12251225
bool wrote_xlog;
12261226

1227+
/*
1228+
* Log pending invalidations for logical decoding of in-progress
1229+
* transactions. Normally for DDLs, we log this at each command end,
1230+
* however, for certain cases where we directly update the system table
1231+
* without a transaction block, the invalidations are not logged till this
1232+
* time.
1233+
*/
1234+
if (XLogLogicalInfoActive())
1235+
LogLogicalInvalidations();
1236+
12271237
/* Get data needed for commit record */
12281238
nrels = smgrGetPendingDeletes(true, &rels);
12291239
nchildren = xactGetCommittedChildren(&children);
@@ -6022,6 +6032,13 @@ xact_redo(XLogReaderState *record)
60226032
ProcArrayApplyXidAssignment(xlrec->xtop,
60236033
xlrec->nsubxacts, xlrec->xsub);
60246034
}
6035+
else if (info == XLOG_XACT_INVALIDATIONS)
6036+
{
6037+
/*
6038+
* XXX we do ignore this for now, what matters are invalidations
6039+
* written into the commit record.
6040+
*/
6041+
}
60256042
else
60266043
elog(PANIC, "xact_redo: unknown op code %u", info);
60276044
}

src/backend/replication/logical/decode.c

+35-23
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,39 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
278278

279279
/*
280280
* We assign subxact to the toplevel xact while processing each
281-
* record if required. So, we don't need to do anything here.
282-
* See LogicalDecodingProcessRecord.
281+
* record if required. So, we don't need to do anything here. See
282+
* LogicalDecodingProcessRecord.
283283
*/
284284
break;
285+
case XLOG_XACT_INVALIDATIONS:
286+
{
287+
TransactionId xid;
288+
xl_xact_invals *invals;
289+
290+
xid = XLogRecGetXid(r);
291+
invals = (xl_xact_invals *) XLogRecGetData(r);
292+
293+
/*
294+
* Execute the invalidations for xid-less transactions,
295+
* otherwise, accumulate them so that they can be processed at
296+
* the commit time.
297+
*/
298+
if (TransactionIdIsValid(xid))
299+
{
300+
if (!ctx->fast_forward)
301+
ReorderBufferAddInvalidations(reorder, xid,
302+
buf->origptr,
303+
invals->nmsgs,
304+
invals->msgs);
305+
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
306+
buf->origptr);
307+
}
308+
else if ((!ctx->fast_forward))
309+
ReorderBufferImmediateInvalidation(ctx->reorder,
310+
invals->nmsgs,
311+
invals->msgs);
312+
}
313+
break;
285314
case XLOG_XACT_PREPARE:
286315

287316
/*
@@ -334,15 +363,11 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
334363
case XLOG_STANDBY_LOCK:
335364
break;
336365
case XLOG_INVALIDATIONS:
337-
{
338-
xl_invalidations *invalidations =
339-
(xl_invalidations *) XLogRecGetData(r);
340366

341-
if (!ctx->fast_forward)
342-
ReorderBufferImmediateInvalidation(ctx->reorder,
343-
invalidations->nmsgs,
344-
invalidations->msgs);
345-
}
367+
/*
368+
* We are processing the invalidations at the command level via
369+
* XLOG_XACT_INVALIDATIONS. So we don't need to do anything here.
370+
*/
346371
break;
347372
default:
348373
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
@@ -573,19 +598,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
573598
commit_time = parsed->origin_timestamp;
574599
}
575600

576-
/*
577-
* Process invalidation messages, even if we're not interested in the
578-
* transaction's contents, since the various caches need to always be
579-
* consistent.
580-
*/
581-
if (parsed->nmsgs > 0)
582-
{
583-
if (!ctx->fast_forward)
584-
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
585-
parsed->nmsgs, parsed->msgs);
586-
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
587-
}
588-
589601
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
590602
parsed->nsubxacts, parsed->subxacts);
591603

src/backend/replication/logical/reorderbuffer.c

+43-9
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
856856
subtxn->toplevel_xid = xid;
857857
Assert(subtxn->nsubtxns == 0);
858858

859+
/* set the reference to top-level transaction */
860+
subtxn->toptxn = txn;
861+
859862
/* add to subtransaction list */
860863
dlist_push_tail(&txn->subtxns, &subtxn->node);
861864
txn->nsubtxns++;
@@ -2201,7 +2204,11 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
22012204
/*
22022205
* Setup the invalidation of the toplevel transaction.
22032206
*
2204-
* This needs to be done before ReorderBufferCommit is called!
2207+
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
2208+
* accumulates all the invalidation messages in the toplevel transaction.
2209+
* This is required because in some cases where we skip processing the
2210+
* transaction (see ReorderBufferForget), we need to execute all the
2211+
* invalidations together.
22052212
*/
22062213
void
22072214
ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
@@ -2212,17 +2219,35 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
22122219

22132220
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
22142221

2215-
if (txn->ninvalidations != 0)
2216-
elog(ERROR, "only ever add one set of invalidations");
2222+
/*
2223+
* We collect all the invalidations under the top transaction so that we
2224+
* can execute them all together.
2225+
*/
2226+
if (txn->toptxn)
2227+
txn = txn->toptxn;
22172228

22182229
Assert(nmsgs > 0);
22192230

2220-
txn->ninvalidations = nmsgs;
2221-
txn->invalidations = (SharedInvalidationMessage *)
2222-
MemoryContextAlloc(rb->context,
2223-
sizeof(SharedInvalidationMessage) * nmsgs);
2224-
memcpy(txn->invalidations, msgs,
2225-
sizeof(SharedInvalidationMessage) * nmsgs);
2231+
/* Accumulate invalidations. */
2232+
if (txn->ninvalidations == 0)
2233+
{
2234+
txn->ninvalidations = nmsgs;
2235+
txn->invalidations = (SharedInvalidationMessage *)
2236+
MemoryContextAlloc(rb->context,
2237+
sizeof(SharedInvalidationMessage) * nmsgs);
2238+
memcpy(txn->invalidations, msgs,
2239+
sizeof(SharedInvalidationMessage) * nmsgs);
2240+
}
2241+
else
2242+
{
2243+
txn->invalidations = (SharedInvalidationMessage *)
2244+
repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
2245+
(txn->ninvalidations + nmsgs));
2246+
2247+
memcpy(txn->invalidations + txn->ninvalidations, msgs,
2248+
nmsgs * sizeof(SharedInvalidationMessage));
2249+
txn->ninvalidations += nmsgs;
2250+
}
22262251
}
22272252

22282253
/*
@@ -2250,6 +2275,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
22502275
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
22512276

22522277
txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
2278+
2279+
/*
2280+
* Mark top-level transaction as having catalog changes too if one of its
2281+
* children has so that the ReorderBufferBuildTupleCidHash can
2282+
* conveniently check just top-level transaction and decide whether to
2283+
* build the hash table or not.
2284+
*/
2285+
if (txn->toptxn != NULL)
2286+
txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
22532287
}
22542288

22552289
/*

src/backend/utils/cache/inval.c

+54
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@
8585
* worth trying to avoid sending such inval traffic in the future, if those
8686
* problems can be overcome cheaply.
8787
*
88+
* When wal_level=logical, write invalidations into WAL at each command end to
89+
* support the decoding of the in-progress transactions. See
90+
* CommandEndInvalidationMessages.
8891
*
8992
* Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
9093
* Portions Copyright (c) 1994, Regents of the University of California
@@ -1094,6 +1097,11 @@ CommandEndInvalidationMessages(void)
10941097

10951098
ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
10961099
LocalExecuteInvalidationMessage);
1100+
1101+
/* WAL Log per-command invalidation messages for wal_level=logical */
1102+
if (XLogLogicalInfoActive())
1103+
LogLogicalInvalidations();
1104+
10971105
AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
10981106
&transInvalInfo->CurrentCmdInvalidMsgs);
10991107
}
@@ -1501,3 +1509,49 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
15011509
i = ccitem->link - 1;
15021510
}
15031511
}
1512+
1513+
/*
1514+
* LogLogicalInvalidations
1515+
*
1516+
* Emit WAL for invalidations. This is currently only used for logging
1517+
* invalidations at the command end or at commit time if any invalidations
1518+
* are pending.
1519+
*/
1520+
void
1521+
LogLogicalInvalidations()
1522+
{
1523+
xl_xact_invals xlrec;
1524+
SharedInvalidationMessage *invalMessages;
1525+
int nmsgs = 0;
1526+
1527+
/* Quick exit if we haven't done anything with invalidation messages. */
1528+
if (transInvalInfo == NULL)
1529+
return;
1530+
1531+
ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
1532+
MakeSharedInvalidMessagesArray);
1533+
1534+
Assert(!(numSharedInvalidMessagesArray > 0 &&
1535+
SharedInvalidMessagesArray == NULL));
1536+
1537+
invalMessages = SharedInvalidMessagesArray;
1538+
nmsgs = numSharedInvalidMessagesArray;
1539+
SharedInvalidMessagesArray = NULL;
1540+
numSharedInvalidMessagesArray = 0;
1541+
1542+
if (nmsgs > 0)
1543+
{
1544+
/* prepare record */
1545+
memset(&xlrec, 0, MinSizeOfXactInvals);
1546+
xlrec.nmsgs = nmsgs;
1547+
1548+
/* perform insertion */
1549+
XLogBeginInsert();
1550+
XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals);
1551+
XLogRegisterData((char *) invalMessages,
1552+
nmsgs * sizeof(SharedInvalidationMessage));
1553+
XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
1554+
1555+
pfree(invalMessages);
1556+
}
1557+
}

src/include/access/xact.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
146146
#define XLOG_XACT_COMMIT_PREPARED 0x30
147147
#define XLOG_XACT_ABORT_PREPARED 0x40
148148
#define XLOG_XACT_ASSIGNMENT 0x50
149-
/* free opcode 0x60 */
149+
#define XLOG_XACT_INVALIDATIONS 0x60
150150
/* free opcode 0x70 */
151151

152152
/* mask for filtering opcodes out of xl_info */

src/include/access/xlog_internal.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
/*
3232
* Each page of XLOG file has a header like this:
3333
*/
34-
#define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */
34+
#define XLOG_PAGE_MAGIC 0xD108 /* can be used as WAL version indicator */
3535

3636
typedef struct XLogPageHeaderData
3737
{

src/include/replication/reorderbuffer.h

+3
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ typedef struct ReorderBufferTXN
220220
*/
221221
XLogRecPtr end_lsn;
222222

223+
/* Toplevel transaction for this subxact (NULL for top-level). */
224+
struct ReorderBufferTXN *toptxn;
225+
223226
/*
224227
* LSN of the last lsn at which snapshot information reside, so we can
225228
* restart decoding from there and fully recover this transaction from

src/include/utils/inval.h

+2
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,6 @@ extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
6161
extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
6262

6363
extern void InvalidateSystemCaches(void);
64+
65+
extern void LogLogicalInvalidations(void);
6466
#endif /* INVAL_H */

0 commit comments

Comments
 (0)