Skip to content

Commit aee8c2b

Browse files
Fix possibility of logical decoding partial transaction changes.
When creating and initializing a logical slot, the restart_lsn is set to the latest WAL insertion point (or the latest replay point on standbys). Subsequently, WAL records are decoded from that point to find the start point for extracting changes in the DecodingContextFindStartpoint() function. Since the initial restart_lsn could be in the middle of a transaction, the start point must be a consistent point where we won't see the data for partial transactions. Previously, when not building a full snapshot, serialized snapshots were restored, and the SnapBuild jumps to the consistent state even while finding the start point. Consequently, the slot's restart_lsn and confirmed_flush could be set to the middle of a transaction. This could lead to various unexpected consequences. Specifically, there were reports of logical decoding decoding partial transactions, and assertion failures occurred because only subtransactions were decoded without decoding their top-level transaction until decoding the commit record. To resolve this issue, the changes prevent restoring the serialized snapshot and jumping to the consistent state while finding the start point. On v17 and HEAD, a flag indicating whether snapshot restores should be skipped has been added to the SnapBuild struct, and SNAPBUILD_VERSION has been bumpded. On backbranches, the flag is stored in the LogicalDecodingContext instead, preserving on-disk compatibility. Backpatch to all supported versions. Reported-by: Drew Callahan Reviewed-by: Amit Kapila, Hayato Kuroda Discussion: https://postgr.es/m/2444AA15-D21B-4CCE-8052-52C7C2DAFE5C%40amazon.com Backpatch-through: 12
1 parent f68d6aa commit aee8c2b

File tree

6 files changed

+118
-8
lines changed

6 files changed

+118
-8
lines changed

contrib/test_decoding/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
88
spill slot truncate stream stats twophase twophase_stream
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
11-
twophase_snapshot slot_creation_error catalog_change_snapshot
11+
twophase_snapshot slot_creation_error catalog_change_snapshot \
12+
skip_snapshot_restore
1213

1314
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
1415
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
Parsed test spec with 3 sessions
2+
3+
starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1
4+
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding');
5+
?column?
6+
--------
7+
init
8+
(1 row)
9+
10+
step s0_begin: BEGIN;
11+
step s0_insert1: INSERT INTO tbl VALUES (1);
12+
step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); <waiting ...>
13+
step s2_checkpoint: CHECKPOINT;
14+
step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
15+
data
16+
----
17+
(0 rows)
18+
19+
step s0_insert2: INSERT INTO tbl VALUES (2);
20+
step s0_commit: COMMIT;
21+
step s1_init: <... completed>
22+
?column?
23+
--------
24+
init
25+
(1 row)
26+
27+
step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
28+
data
29+
-----------------------------------------
30+
BEGIN
31+
table public.tbl: INSERT: val1[integer]:1
32+
table public.tbl: INSERT: val1[integer]:2
33+
COMMIT
34+
(4 rows)
35+
36+
step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
37+
data
38+
----
39+
(0 rows)
40+
41+
?column?
42+
--------
43+
stop
44+
(1 row)
45+
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Test that a slot creation skips to restore serialized snapshot to reach
2+
# the consistent state.
3+
4+
setup
5+
{
6+
DROP TABLE IF EXISTS tbl;
7+
CREATE TABLE tbl (val1 integer);
8+
}
9+
10+
teardown
11+
{
12+
DROP TABLE tbl;
13+
SELECT 'stop' FROM pg_drop_replication_slot('slot0');
14+
SELECT 'stop' FROM pg_drop_replication_slot('slot1');
15+
}
16+
17+
session "s0"
18+
setup { SET synchronous_commit = on; }
19+
step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); }
20+
step "s0_begin" { BEGIN; }
21+
step "s0_insert1" { INSERT INTO tbl VALUES (1); }
22+
step "s0_insert2" { INSERT INTO tbl VALUES (2); }
23+
step "s0_commit" { COMMIT; }
24+
25+
session "s1"
26+
setup { SET synchronous_commit = on; }
27+
step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); }
28+
step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
29+
step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
30+
31+
session "s2"
32+
setup { SET synchronous_commit = on ;}
33+
step "s2_checkpoint" { CHECKPOINT; }
34+
step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
35+
36+
37+
# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the
38+
# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1"
39+
# serializes consistent snapshots to the disk at LSNs where are before
40+
# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but
41+
# must not restore any serialized snapshots and will reach the consistent state
42+
# when decoding a RUNNING_XACT record generated after s0-transaction's commit.
43+
# We check if the get_changes on 'slot1' will not return any s0-transaction's
44+
# changes as its confirmed_flush_lsn will be after the s0-transaction's commit
45+
# record.
46+
permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1"

src/backend/replication/logical/logical.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ StartupDecodingContext(List *output_plugin_options,
148148
TransactionId xmin_horizon,
149149
bool need_full_snapshot,
150150
bool fast_forward,
151+
bool in_create,
151152
XLogReaderRoutine *xl_routine,
152153
LogicalOutputPluginWriterPrepareWrite prepare_write,
153154
LogicalOutputPluginWriterWrite do_write,
@@ -287,6 +288,8 @@ StartupDecodingContext(List *output_plugin_options,
287288

288289
ctx->fast_forward = fast_forward;
289290

291+
ctx->in_create = in_create;
292+
290293
MemoryContextSwitchTo(old_context);
291294

292295
return ctx;
@@ -422,7 +425,7 @@ CreateInitDecodingContext(const char *plugin,
422425
ReplicationSlotSave();
423426

424427
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
425-
need_full_snapshot, false,
428+
need_full_snapshot, false, true,
426429
xl_routine, prepare_write, do_write,
427430
update_progress);
428431

@@ -535,7 +538,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
535538

536539
ctx = StartupDecodingContext(output_plugin_options,
537540
start_lsn, InvalidTransactionId, false,
538-
fast_forward, xl_routine, prepare_write,
541+
fast_forward, false, xl_routine, prepare_write,
539542
do_write, update_progress);
540543

541544
/* call output plugin initialization callback */

src/backend/replication/logical/snapbuild.c

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
13051305
static bool
13061306
SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
13071307
{
1308+
LogicalDecodingContext *ctx = (LogicalDecodingContext *) builder->reorder->private_data;
1309+
13081310
/* ---
13091311
* Build catalog decoding snapshot incrementally using information about
13101312
* the currently running transactions. There are several ways to do that:
@@ -1314,10 +1316,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
13141316
* state while waiting on c)'s sub-states.
13151317
*
13161318
* b) This (in a previous run) or another decoding slot serialized a
1317-
* snapshot to disk that we can use. Can't use this method for the
1318-
* initial snapshot when slot is being created and needs full snapshot
1319-
* for export or direct use, as that snapshot will only contain catalog
1320-
* modifying transactions.
1319+
* snapshot to disk that we can use. Can't use this method while finding
1320+
* the start point for decoding changes as the restart LSN would be an
1321+
* arbitrary LSN but we need to find the start point to extract changes
1322+
* where we won't see the data for partial transactions. Also, we cannot
1323+
* use this method when a slot needs a full snapshot for export or direct
1324+
* use, as that snapshot will only contain catalog modifying transactions.
13211325
*
13221326
* c) First incrementally build a snapshot for catalog tuples
13231327
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
@@ -1382,8 +1386,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
13821386

13831387
return false;
13841388
}
1385-
/* b) valid on disk state and not building full snapshot */
1389+
1390+
/*
1391+
* b) valid on disk state and while neither building full snapshot nor
1392+
* creating a slot.
1393+
*/
13861394
else if (!builder->building_full_snapshot &&
1395+
!ctx->in_create &&
13871396
SnapBuildRestore(builder, lsn))
13881397
{
13891398
int nxacts = running->subxcnt + running->xcnt;

src/include/replication/logical.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ typedef struct LogicalDecodingContext
109109
TransactionId write_xid;
110110
/* Are we processing the end LSN of a transaction? */
111111
bool end_xact;
112+
113+
/*
114+
* True if the logical decoding context being used for the creation
115+
* of a logical replication slot.
116+
*/
117+
bool in_create;
112118
} LogicalDecodingContext;
113119

114120

0 commit comments

Comments
 (0)