Skip to content

Commit 7f13ac8

Browse files
author
Amit Kapila
committed
Fix catalog lookup with the wrong snapshot during logical decoding.
Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION records to know if the transaction has modified the catalog, and that information is not serialized to snapshot. Therefore, after the restart, if the logical decoding decodes only the commit record of the transaction that has actually modified a catalog, we will miss adding its XID to the snapshot. Thus, we will end up looking at catalogs with the wrong snapshot. To fix this problem, this change adds the list of transaction IDs and sub-transaction IDs, that have modified catalogs and are running during snapshot serialization, to the serialized snapshot. After restart or otherwise, when we restore from such a serialized snapshot, the corresponding list is restored in memory. Now, when decoding a COMMIT record, we check both the list and the ReorderBuffer to see if the transaction has modified catalogs. Since this adds additional information to the serialized snapshot, we cannot backpatch it. For back branches, we took another approach. We remember the last-running-xacts list of the decoded RUNNING_XACTS record after restoring the previously serialized snapshot. Then, we mark the transaction as containing catalog changes if it's in the list of initial running transactions and its commit record has XACT_XINFO_HAS_INVALS. This doesn't require any file format changes but the transaction will end up being added to the snapshot even if it has only relcache invalidations. But that won't be a problem since we use snapshot built during decoding only to read system catalogs. This commit bumps SNAPBUILD_VERSION because of a change in SnapBuild. Reported-by: Mike Oh Author: Masahiko Sawada Reviewed-by: Amit Kapila, Shi yu, Takamichi Osumi, Kyotaro Horiguchi, Bertrand Drouvot, Ahsan Hadi Backpatch-through: 10 Discussion: https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com
1 parent 37a6e5d commit 7f13ac8

File tree

8 files changed

+353
-93
lines changed

8 files changed

+353
-93
lines changed

contrib/test_decoding/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
88
spill slot truncate stream stats twophase twophase_stream
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
11-
twophase_snapshot slot_creation_error
11+
twophase_snapshot slot_creation_error catalog_change_snapshot
1212

1313
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
1414
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
4+
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
5+
?column?
6+
--------
7+
init
8+
(1 row)
9+
10+
step s0_begin: BEGIN;
11+
step s0_savepoint: SAVEPOINT sp1;
12+
step s0_truncate: TRUNCATE tbl1;
13+
step s1_checkpoint: CHECKPOINT;
14+
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
15+
data
16+
----
17+
(0 rows)
18+
19+
step s0_commit: COMMIT;
20+
step s0_begin: BEGIN;
21+
step s0_insert: INSERT INTO tbl1 VALUES (1);
22+
step s1_checkpoint: CHECKPOINT;
23+
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
24+
data
25+
---------------------------------------
26+
BEGIN
27+
table public.tbl1: TRUNCATE: (no-flags)
28+
COMMIT
29+
(3 rows)
30+
31+
step s0_commit: COMMIT;
32+
step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
33+
data
34+
-------------------------------------------------------------
35+
BEGIN
36+
table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
37+
COMMIT
38+
(3 rows)
39+
40+
?column?
41+
--------
42+
stop
43+
(1 row)
44+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Test decoding only the commit record of the transaction that have
2+
# modified catalogs.
3+
setup
4+
{
5+
DROP TABLE IF EXISTS tbl1;
6+
CREATE TABLE tbl1 (val1 integer, val2 integer);
7+
}
8+
9+
teardown
10+
{
11+
DROP TABLE tbl1;
12+
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
13+
}
14+
15+
session "s0"
16+
setup { SET synchronous_commit=on; }
17+
step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
18+
step "s0_begin" { BEGIN; }
19+
step "s0_savepoint" { SAVEPOINT sp1; }
20+
step "s0_truncate" { TRUNCATE tbl1; }
21+
step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
22+
step "s0_commit" { COMMIT; }
23+
24+
session "s1"
25+
setup { SET synchronous_commit=on; }
26+
step "s1_checkpoint" { CHECKPOINT; }
27+
step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
28+
29+
# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
30+
# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
31+
# during the first checkpoint execution. This transaction must be marked as
32+
# containing catalog changes while decoding the COMMIT record and the decoding
33+
# of the INSERT record must read the pg_class with the correct historic snapshot.
34+
#
35+
# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
36+
# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
37+
# record written by bgwriter. One might think we can either stop the bgwriter or
38+
# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
39+
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"

src/backend/replication/logical/decode.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
628628
}
629629

630630
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
631-
parsed->nsubxacts, parsed->subxacts);
631+
parsed->nsubxacts, parsed->subxacts,
632+
parsed->xinfo);
632633

633634
/* ----
634635
* Check whether we are interested in this specific transaction, and tell

src/backend/replication/logical/reorderbuffer.c

+66-5
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,8 @@ ReorderBufferAllocate(void)
349349
buffer->by_txn_last_xid = InvalidTransactionId;
350350
buffer->by_txn_last_txn = NULL;
351351

352+
buffer->catchange_ntxns = 0;
353+
352354
buffer->outbuf = NULL;
353355
buffer->outbufsize = 0;
354356
buffer->size = 0;
@@ -366,6 +368,7 @@ ReorderBufferAllocate(void)
366368

367369
dlist_init(&buffer->toplevel_by_lsn);
368370
dlist_init(&buffer->txns_by_base_snapshot_lsn);
371+
dlist_init(&buffer->catchange_txns);
369372

370373
/*
371374
* Ensure there's no stale data from prior uses of this slot, in case some
@@ -1526,14 +1529,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
15261529
}
15271530

15281531
/*
1529-
* Remove TXN from its containing list.
1532+
* Remove TXN from its containing lists.
15301533
*
15311534
* Note: if txn is known as subxact, we are deleting the TXN from its
15321535
* parent's list of known subxacts; this leaves the parent's nsubxacts
15331536
* count too high, but we don't care. Otherwise, we are deleting the TXN
1534-
* from the LSN-ordered list of toplevel TXNs.
1537+
* from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1538+
* list of catalog modifying transactions as well.
15351539
*/
15361540
dlist_delete(&txn->node);
1541+
if (rbtxn_has_catalog_changes(txn))
1542+
{
1543+
dlist_delete(&txn->catchange_node);
1544+
rb->catchange_ntxns--;
1545+
1546+
Assert(rb->catchange_ntxns >= 0);
1547+
}
15371548

15381549
/* now remove reference from buffer */
15391550
hash_search(rb->by_txn,
@@ -3275,19 +3286,69 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
32753286
XLogRecPtr lsn)
32763287
{
32773288
ReorderBufferTXN *txn;
3289+
ReorderBufferTXN *toptxn;
32783290

32793291
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
32803292

3281-
txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3293+
if (!rbtxn_has_catalog_changes(txn))
3294+
{
3295+
txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3296+
dlist_push_tail(&rb->catchange_txns, &txn->catchange_node);
3297+
rb->catchange_ntxns++;
3298+
}
32823299

32833300
/*
32843301
* Mark top-level transaction as having catalog changes too if one of its
32853302
* children has so that the ReorderBufferBuildTupleCidHash can
32863303
* conveniently check just top-level transaction and decide whether to
32873304
* build the hash table or not.
32883305
*/
3289-
if (txn->toptxn != NULL)
3290-
txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3306+
toptxn = txn->toptxn;
3307+
if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
3308+
{
3309+
toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3310+
dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
3311+
rb->catchange_ntxns++;
3312+
}
3313+
}
3314+
3315+
/*
3316+
* Return palloc'ed array of the transactions that have changed catalogs.
3317+
* The returned array is sorted in xidComparator order.
3318+
*
3319+
* The caller must free the returned array when done with it.
3320+
*/
3321+
TransactionId *
3322+
ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
3323+
{
3324+
dlist_iter iter;
3325+
TransactionId *xids = NULL;
3326+
size_t xcnt = 0;
3327+
3328+
/* Quick return if the list is empty */
3329+
if (rb->catchange_ntxns == 0)
3330+
{
3331+
Assert(dlist_is_empty(&rb->catchange_txns));
3332+
return NULL;
3333+
}
3334+
3335+
/* Initialize XID array */
3336+
xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns);
3337+
dlist_foreach(iter, &rb->catchange_txns)
3338+
{
3339+
ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN,
3340+
catchange_node,
3341+
iter.cur);
3342+
3343+
Assert(rbtxn_has_catalog_changes(txn));
3344+
3345+
xids[xcnt++] = txn->xid;
3346+
}
3347+
3348+
qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3349+
3350+
Assert(xcnt == rb->catchange_ntxns);
3351+
return xids;
32913352
}
32923353

32933354
/*

0 commit comments

Comments
 (0)