Skip to content

Commit 247ee94

Browse files
author
Amit Kapila
committed
Fix data loss in logical replication.
This commit is a backpatch of commit 4909b38 for 13. Data loss can happen when the DDLs like ALTER PUBLICATION ... ADD TABLE ... or ALTER TYPE ... that don't take a strong lock on table happens concurrently to DMLs on the tables involved in the DDL. This happens because logical decoding doesn't distribute invalidations to concurrent transactions and those transactions use stale cache data to decode the changes. The problem becomes bigger because we keep using the stale cache even after those in-progress transactions are finished and skip the changes required to be sent to the client. This commit fixes the issue by distributing invalidation messages from catalog-modifying transactions to all concurrent in-progress transactions. This allows the necessary rebuild of the catalog cache when decoding new changes after concurrent DDL. The fix for 13 is different from what we did in branches 14 and above, such that for 13, the concurrent DDL changes (from DDL types mentioned earlier) will be visible for any newly started transactions. To make them visible in concurrent transactions, we need to introduce a new change type REORDER_BUFFER_CHANGE_INVALIDATION, already present in branches 14 and greater. We decided not to take the risk of a bigger change and fix the issue partially in 13. Reported-by: hubert depesz lubaczewski <depesz@depesz.com> Reported-by: Tomas Vondra <tomas.vondra@enterprisedb.com> Author: Shlok Kyal <shlok.kyal.oss@gmail.com> Author: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Tested-by: Benoit Lobréau <benoit.lobreau@dalibo.com> Discussion: https://postgr.es/m/de52b282-1166-1180-45a2-8d8917ca74c6@enterprisedb.com Discussion: https://postgr.es/m/CAD21AoAenVqiMjpN-PvGHL1N9DWnHSq673bfgr6phmBUzx=kLQ@mail.gmail.com Discussion: https://postgr.es/m/CAD21AoAhU3kp8shYqP=ExiFDZ9sZxpFb5WzLa0p+vEe5j+7CWQ@mail.gmail.com
1 parent c5ba351 commit 247ee94

File tree

6 files changed

+164
-21
lines changed

6 files changed

+164
-21
lines changed

contrib/test_decoding/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
88
spill slot truncate
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top catalog_change_snapshot \
11-
skip_snapshot_restore
11+
skip_snapshot_restore invalidation_distrubution
1212

1313
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
1414
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
4+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
5+
step s1_begin: BEGIN;
6+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
7+
step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
8+
step s1_commit: COMMIT;
9+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
10+
step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
11+
count
12+
-----
13+
1
14+
(1 row)
15+
16+
?column?
17+
--------
18+
stop
19+
(1 row)
20+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Test that catalog cache invalidation messages are distributed to ongoing
2+
# transactions, ensuring they can access the updated catalog content after
3+
# processing these messages.
4+
setup
5+
{
6+
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
7+
CREATE TABLE tbl1(val1 integer, val2 integer);
8+
CREATE PUBLICATION pub;
9+
}
10+
11+
teardown
12+
{
13+
DROP TABLE tbl1;
14+
DROP PUBLICATION pub;
15+
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
16+
}
17+
18+
session "s1"
19+
setup { SET synchronous_commit=on; }
20+
21+
step "s1_begin" { BEGIN; }
22+
step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
23+
step "s1_commit" { COMMIT; }
24+
25+
session "s2"
26+
setup { SET synchronous_commit=on; }
27+
28+
step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
29+
step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
30+
31+
# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
32+
permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"

src/backend/replication/logical/reorderbuffer.c

+56-8
Original file line numberDiff line numberDiff line change
@@ -2264,20 +2264,45 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
22642264
SharedInvalidationMessage *msgs)
22652265
{
22662266
ReorderBufferTXN *txn;
2267+
MemoryContext oldcontext;
22672268

22682269
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
22692270

2270-
if (txn->ninvalidations != 0)
2271-
elog(ERROR, "only ever add one set of invalidations");
2271+
oldcontext = MemoryContextSwitchTo(rb->context);
2272+
2273+
/*
2274+
* Collect all the invalidations under the top transaction, if available,
2275+
* so that we can execute them all together.
2276+
*/
2277+
if (txn->toplevel_xid)
2278+
{
2279+
txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, lsn,
2280+
true);
2281+
}
22722282

22732283
Assert(nmsgs > 0);
22742284

2275-
txn->ninvalidations = nmsgs;
2276-
txn->invalidations = (SharedInvalidationMessage *)
2277-
MemoryContextAlloc(rb->context,
2278-
sizeof(SharedInvalidationMessage) * nmsgs);
2279-
memcpy(txn->invalidations, msgs,
2280-
sizeof(SharedInvalidationMessage) * nmsgs);
2285+
/* Accumulate invalidations. */
2286+
if (txn->ninvalidations == 0)
2287+
{
2288+
txn->ninvalidations = nmsgs;
2289+
txn->invalidations = (SharedInvalidationMessage *)
2290+
palloc(sizeof(SharedInvalidationMessage) * nmsgs);
2291+
memcpy(txn->invalidations, msgs,
2292+
sizeof(SharedInvalidationMessage) * nmsgs);
2293+
}
2294+
else
2295+
{
2296+
txn->invalidations = (SharedInvalidationMessage *)
2297+
repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
2298+
(txn->ninvalidations + nmsgs));
2299+
2300+
memcpy(txn->invalidations + txn->ninvalidations, msgs,
2301+
nmsgs * sizeof(SharedInvalidationMessage));
2302+
txn->ninvalidations += nmsgs;
2303+
}
2304+
2305+
MemoryContextSwitchTo(oldcontext);
22812306
}
22822307

22832308
/*
@@ -3895,3 +3920,26 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
38953920
*cmax = ent->cmax;
38963921
return true;
38973922
}
3923+
3924+
/*
3925+
* Count invalidation messages of specified transaction.
3926+
*
3927+
* Returns number of messages, and msgs is set to the pointer of the linked
3928+
* list for the messages.
3929+
*/
3930+
uint32
3931+
ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
3932+
SharedInvalidationMessage **msgs)
3933+
{
3934+
ReorderBufferTXN *txn;
3935+
3936+
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3937+
false);
3938+
3939+
if (txn == NULL)
3940+
return 0;
3941+
3942+
*msgs = txn->invalidations;
3943+
3944+
return txn->ninvalidations;
3945+
}

src/backend/replication/logical/snapbuild.c

+51-12
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
292292

293293
static void SnapBuildSnapIncRefcount(Snapshot snap);
294294

295-
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
295+
static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
296296

297297
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
298298
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
@@ -861,23 +861,24 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
861861
}
862862

863863
/*
864-
* Add a new Snapshot to all transactions we're decoding that currently are
865-
* in-progress so they can see new catalog contents made by the transaction
866-
* that just committed. This is necessary because those in-progress
867-
* transactions will use the new catalog's contents from here on (at the very
868-
* least everything they do needs to be compatible with newer catalog
869-
* contents).
864+
* Add a new Snapshot and invalidation messages to all transactions we're
865+
* decoding that currently are in-progress so they can see new catalog contents
866+
* made by the transaction that just committed. This is necessary because those
867+
* in-progress transactions will use the new catalog's contents from here on
868+
* (at the very least everything they do needs to be compatible with newer
869+
* catalog contents).
870870
*/
871871
static void
872-
SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
872+
SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
873873
{
874874
dlist_iter txn_i;
875875
ReorderBufferTXN *txn;
876876

877877
/*
878878
* Iterate through all toplevel transactions. This can include
879879
* subtransactions which we just don't yet know to be that, but that's
880-
* fine, they will just get an unnecessary snapshot queued.
880+
* fine, they will just get an unnecessary snapshot and invalidations
881+
* queued.
881882
*/
882883
dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
883884
{
@@ -890,6 +891,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
890891
* transaction which in turn implies we don't yet need a snapshot at
891892
* all. We'll add a snapshot when the first change gets queued.
892893
*
894+
* Similarly, we don't need to add invalidations to a transaction whose
895+
* base snapshot is not yet set. Once a base snapshot is built, it will
896+
* include the xids of committed transactions that have modified the
897+
* catalog, thus reflecting the new catalog contents. The existing
898+
* catalog cache will have already been invalidated after processing
899+
* the invalidations in the transaction that modified catalogs,
900+
* ensuring that a fresh cache is constructed during decoding.
901+
*
893902
* NB: This works correctly even for subtransactions because
894903
* ReorderBufferAssignChild() takes care to transfer the base snapshot
895904
* to the top-level transaction, and while iterating the changequeue
@@ -898,7 +907,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
898907
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
899908
continue;
900909

901-
elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
910+
elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
902911
txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
903912

904913
/*
@@ -908,6 +917,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
908917
SnapBuildSnapIncRefcount(builder->snapshot);
909918
ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
910919
builder->snapshot);
920+
921+
/*
922+
* Add invalidation messages to the reorder buffer of in-progress
923+
* transactions except the current committed transaction, for which we
924+
* will execute invalidations at the end.
925+
*
926+
* It is required, otherwise, we will end up using the stale catcache
927+
* contents built by the current transaction even after its decoding,
928+
* which should have been invalidated due to concurrent catalog
929+
* changing transaction.
930+
*/
931+
if (txn->xid != xid)
932+
{
933+
uint32 ninvalidations;
934+
SharedInvalidationMessage *msgs = NULL;
935+
936+
ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
937+
xid, &msgs);
938+
939+
if (ninvalidations > 0)
940+
{
941+
Assert(msgs != NULL);
942+
943+
ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
944+
ninvalidations, msgs);
945+
}
946+
}
911947
}
912948
}
913949

@@ -1186,8 +1222,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
11861222
/* refcount of the snapshot builder for the new snapshot */
11871223
SnapBuildSnapIncRefcount(builder->snapshot);
11881224

1189-
/* add a new catalog snapshot to all currently running transactions */
1190-
SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1225+
/*
1226+
* Add a new catalog snapshot and invalidations messages to all
1227+
* currently running transactions.
1228+
*/
1229+
SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
11911230
}
11921231
}
11931232

src/include/replication/reorderbuffer.h

+4
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,10 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
463463

464464
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
465465

466+
uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
467+
TransactionId xid,
468+
SharedInvalidationMessage **msgs);
469+
466470
void StartupReorderBuffer(void);
467471

468472
#endif

0 commit comments

Comments
 (0)