Skip to content

Commit 9f21be0

Browse files
author
Amit Kapila
committed
Fix data loss in logical replication.
Data loss can happen when the DDLs like ALTER PUBLICATION ... ADD TABLE ... or ALTER TYPE ... that don't take a strong lock on table happens concurrently to DMLs on the tables involved in the DDL. This happens because logical decoding doesn't distribute invalidations to concurrent transactions and those transactions use stale cache data to decode the changes. The problem becomes bigger because we keep using the stale cache even after those in-progress transactions are finished and skip the changes required to be sent to the client. This commit fixes the issue by distributing invalidation messages from catalog-modifying transactions to all concurrent in-progress transactions. This allows the necessary rebuild of the catalog cache when decoding new changes after concurrent DDL. We observed performance regression primarily during frequent execution of *publication DDL* statements that modify the published tables. The regression is minor or nearly nonexistent for DDLs that do not affect the published tables or occur infrequently, making this a worthwhile cost to resolve a longstanding data loss issue. An alternative approach considered was to take a strong lock on each affected table during publication modification. However, this would only address issues related to publication DDLs (but not the ALTER TYPE ...) and require locking every relation in the database for publications created as FOR ALL TABLES, which is impractical. The bug exists in all supported branches, but we are backpatching till 14. The fix for 13 requires somewhat bigger changes than this fix, so the fix for that branch is still under discussion. Reported-by: hubert depesz lubaczewski <depesz@depesz.com> Reported-by: Tomas Vondra <tomas.vondra@enterprisedb.com> Author: Shlok Kyal <shlok.kyal.oss@gmail.com> Author: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Tested-by: Benoit Lobréau <benoit.lobreau@dalibo.com> Backpatch-through: 14 Discussion: https://postgr.es/m/de52b282-1166-1180-45a2-8d8917ca74c6@enterprisedb.com Discussion: https://postgr.es/m/CAD21AoAenVqiMjpN-PvGHL1N9DWnHSq673bfgr6phmBUzx=kLQ@mail.gmail.com
1 parent 7c14294 commit 9f21be0

File tree

6 files changed

+133
-15
lines changed

6 files changed

+133
-15
lines changed

contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
1111
twophase_snapshot slot_creation_error catalog_change_snapshot \
12-
skip_snapshot_restore
12+
skip_snapshot_restore invalidation_distrubution
1313

1414
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
1515
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
4+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
5+
step s1_begin: BEGIN;
6+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
7+
step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
8+
step s1_commit: COMMIT;
9+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
10+
step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '3', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
11+
count
12+
-----
13+
1
14+
(1 row)
15+
16+
?column?
17+
--------
18+
stop
19+
(1 row)
20+
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Test that catalog cache invalidation messages are distributed to ongoing
2+
# transactions, ensuring they can access the updated catalog content after
3+
# processing these messages.
4+
setup
5+
{
6+
SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
7+
CREATE TABLE tbl1(val1 integer, val2 integer);
8+
CREATE PUBLICATION pub;
9+
}
10+
11+
teardown
12+
{
13+
DROP TABLE tbl1;
14+
DROP PUBLICATION pub;
15+
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
16+
}
17+
18+
session "s1"
19+
setup { SET synchronous_commit=on; }
20+
21+
step "s1_begin" { BEGIN; }
22+
step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
23+
step "s1_commit" { COMMIT; }
24+
25+
session "s2"
26+
setup { SET synchronous_commit=on; }
27+
28+
step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
29+
step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '3', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
30+
31+
# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
32+
permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"

src/backend/replication/logical/reorderbuffer.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5200,3 +5200,26 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
52005200
*cmax = ent->cmax;
52015201
return true;
52025202
}
5203+
5204+
/*
5205+
* Count invalidation messages of specified transaction.
5206+
*
5207+
* Returns number of messages, and msgs is set to the pointer of the linked
5208+
* list for the messages.
5209+
*/
5210+
uint32
5211+
ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
5212+
SharedInvalidationMessage **msgs)
5213+
{
5214+
ReorderBufferTXN *txn;
5215+
5216+
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5217+
false);
5218+
5219+
if (txn == NULL)
5220+
return 0;
5221+
5222+
*msgs = txn->invalidations;
5223+
5224+
return txn->ninvalidations;
5225+
}

src/backend/replication/logical/snapbuild.c

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
290290

291291
static void SnapBuildSnapIncRefcount(Snapshot snap);
292292

293-
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
293+
static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
294294

295295
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
296296
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
@@ -852,23 +852,24 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
852852
}
853853

854854
/*
855-
* Add a new Snapshot to all transactions we're decoding that currently are
856-
* in-progress so they can see new catalog contents made by the transaction
857-
* that just committed. This is necessary because those in-progress
858-
* transactions will use the new catalog's contents from here on (at the very
859-
* least everything they do needs to be compatible with newer catalog
860-
* contents).
855+
* Add a new Snapshot and invalidation messages to all transactions we're
856+
* decoding that currently are in-progress so they can see new catalog contents
857+
* made by the transaction that just committed. This is necessary because those
858+
* in-progress transactions will use the new catalog's contents from here on
859+
* (at the very least everything they do needs to be compatible with newer
860+
* catalog contents).
861861
*/
862862
static void
863-
SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
863+
SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
864864
{
865865
dlist_iter txn_i;
866866
ReorderBufferTXN *txn;
867867

868868
/*
869869
* Iterate through all toplevel transactions. This can include
870870
* subtransactions which we just don't yet know to be that, but that's
871-
* fine, they will just get an unnecessary snapshot queued.
871+
* fine, they will just get an unnecessary snapshot and invalidations
872+
* queued.
872873
*/
873874
dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
874875
{
@@ -881,6 +882,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
881882
* transaction which in turn implies we don't yet need a snapshot at
882883
* all. We'll add a snapshot when the first change gets queued.
883884
*
885+
* Similarly, we don't need to add invalidations to a transaction whose
886+
* base snapshot is not yet set. Once a base snapshot is built, it will
887+
* include the xids of committed transactions that have modified the
888+
* catalog, thus reflecting the new catalog contents. The existing
889+
* catalog cache will have already been invalidated after processing
890+
* the invalidations in the transaction that modified catalogs,
891+
* ensuring that a fresh cache is constructed during decoding.
892+
*
884893
* NB: This works correctly even for subtransactions because
885894
* ReorderBufferAssignChild() takes care to transfer the base snapshot
886895
* to the top-level transaction, and while iterating the changequeue
@@ -890,13 +899,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
890899
continue;
891900

892901
/*
893-
* We don't need to add snapshot to prepared transactions as they
894-
* should not see the new catalog contents.
902+
* We don't need to add snapshot or invalidations to prepared
903+
* transactions as they should not see the new catalog contents.
895904
*/
896905
if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
897906
continue;
898907

899-
elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
908+
elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
900909
txn->xid, LSN_FORMAT_ARGS(lsn));
901910

902911
/*
@@ -906,6 +915,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
906915
SnapBuildSnapIncRefcount(builder->snapshot);
907916
ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
908917
builder->snapshot);
918+
919+
/*
920+
* Add invalidation messages to the reorder buffer of in-progress
921+
* transactions except the current committed transaction, for which we
922+
* will execute invalidations at the end.
923+
*
924+
* It is required, otherwise, we will end up using the stale catcache
925+
* contents built by the current transaction even after its decoding,
926+
* which should have been invalidated due to concurrent catalog
927+
* changing transaction.
928+
*/
929+
if (txn->xid != xid)
930+
{
931+
uint32 ninvalidations;
932+
SharedInvalidationMessage *msgs = NULL;
933+
934+
ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
935+
xid, &msgs);
936+
937+
if (ninvalidations > 0)
938+
{
939+
Assert(msgs != NULL);
940+
941+
ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
942+
ninvalidations, msgs);
943+
}
944+
}
909945
}
910946
}
911947

@@ -1184,8 +1220,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
11841220
/* refcount of the snapshot builder for the new snapshot */
11851221
SnapBuildSnapIncRefcount(builder->snapshot);
11861222

1187-
/* add a new catalog snapshot to all currently running transactions */
1188-
SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1223+
/*
1224+
* Add a new catalog snapshot and invalidations messages to all
1225+
* currently running transactions.
1226+
*/
1227+
SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
11891228
}
11901229
}
11911230

src/include/replication/reorderbuffer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,10 @@ extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
680680

681681
extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
682682

683+
extern uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
684+
TransactionId xid,
685+
SharedInvalidationMessage **msgs);
686+
683687
extern void StartupReorderBuffer(void);
684688

685689
#endif

0 commit comments

Comments
 (0)