Skip to content

Commit 5d3fcb5

Browse files
committed
Fix bogus tuple-slot management in logical replication UPDATE handling.
slot_modify_cstrings seriously abused the TupleTableSlot API by relying on a slot's underlying data to stay valid across ExecClearTuple. Since this abuse was also quite undocumented, it's little surprise that the case got broken during the v12 slot rewrites. As reported in bug #16129 from Ondřej Jirman, this could lead to crashes or data corruption when a logical replication subscriber processes a row update. Problems would only arise if the subscriber's table contained columns of pass-by-ref types that were not being copied from the publisher. Fix by explicitly copying the datum/isnull arrays from the source slot that the old row was in already. This ends up being about the same thing that happened pre-v12, but hopefully in a less opaque and fragile way. We might've caught the problem sooner if there were any test cases dealing with updates involving non-replicated or dropped columns. Now there are. Back-patch to v10 where this code came in. Even though the failure does not manifest before v12, IMO this code is too fragile to leave as-is. In any case we certainly want the additional test coverage. Patch by me; thanks to Tomas Vondra for initial investigation. Discussion: https://postgr.es/m/16129-a0c0f48e71741e5f@postgresql.org
1 parent b9f3d7a commit 5d3fcb5

File tree

2 files changed

+72
-17
lines changed

2 files changed

+72
-17
lines changed

src/backend/replication/logical/worker.c

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -377,24 +377,39 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
377377
}
378378

379379
/*
380-
* Modify slot with user data provided as C strings.
380+
* Replace selected columns with user data provided as C strings.
381381
* This is somewhat similar to heap_modify_tuple but also calls the type
382-
* input function on the user data as the input is the text representation
383-
* of the types.
382+
* input functions on the user data.
383+
* "slot" is filled with a copy of the tuple in "srcslot", with
384+
* columns selected by the "replaces" array replaced with data values
385+
* from "values".
386+
* Caution: unreplaced pass-by-ref columns in "slot" will point into the
387+
* storage for "srcslot". This is OK for current usage, but someday we may
388+
* need to materialize "slot" at the end to make it independent of "srcslot".
384389
*/
385390
static void
386-
slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
391+
slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
392+
LogicalRepRelMapEntry *rel,
387393
char **values, bool *replaces)
388394
{
389395
int natts = slot->tts_tupleDescriptor->natts;
390396
int i;
391397
SlotErrCallbackArg errarg;
392398
ErrorContextCallback errcallback;
393399

394-
slot_getallattrs(slot);
400+
/* We'll fill "slot" with a virtual tuple, so we must start with ... */
395401
ExecClearTuple(slot);
396402

397-
/* Push callback + info on the error context stack */
403+
/*
404+
* Copy all the column data from srcslot, so that we'll have valid values
405+
* for unreplaced columns.
406+
*/
407+
Assert(natts == srcslot->tts_tupleDescriptor->natts);
408+
slot_getallattrs(srcslot);
409+
memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
410+
memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
411+
412+
/* For error reporting, push callback + info on the error context stack */
398413
errarg.rel = rel;
399414
errarg.local_attnum = -1;
400415
errarg.remote_attnum = -1;
@@ -442,6 +457,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
442457
/* Pop the error context stack */
443458
error_context_stack = errcallback.previous;
444459

460+
/* And finally, declare that "slot" contains a valid virtual tuple */
445461
ExecStoreVirtualTuple(slot);
446462
}
447463

@@ -752,8 +768,8 @@ apply_handle_update(StringInfo s)
752768
{
753769
/* Process and store remote tuple in the slot */
754770
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
755-
ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
756-
slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
771+
slot_modify_cstrings(remoteslot, localslot, rel,
772+
newtup.values, newtup.changed);
757773
MemoryContextSwitchTo(oldctx);
758774

759775
EvalPlanQualSetSlot(&epqstate, remoteslot);

src/test/subscription/t/001_rep_changes.pl

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use warnings;
44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 16;
6+
use Test::More tests => 19;
77

88
# Initialize publisher node
99
my $node_publisher = get_new_node('publisher');
@@ -28,9 +28,9 @@
2828
$node_publisher->safe_psql('postgres',
2929
"CREATE TABLE tab_rep (a int primary key)");
3030
$node_publisher->safe_psql('postgres',
31-
"CREATE TABLE tab_mixed (a int primary key, b text)");
31+
"CREATE TABLE tab_mixed (a int primary key, b text, c numeric)");
3232
$node_publisher->safe_psql('postgres',
33-
"INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')");
33+
"INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)");
3434

3535
# Setup structure on subscriber
3636
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
@@ -42,7 +42,8 @@
4242

4343
# different column count and order than on publisher
4444
$node_subscriber->safe_psql('postgres',
45-
"CREATE TABLE tab_mixed (c text, b text, a int primary key)");
45+
"CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)"
46+
);
4647

4748
# Setup logical replication
4849
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
@@ -91,7 +92,7 @@
9192
$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a");
9293

9394
$node_publisher->safe_psql('postgres',
94-
"INSERT INTO tab_mixed VALUES (2, 'bar')");
95+
"INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)");
9596

9697
$node_publisher->poll_query_until('postgres', $caughtup_query)
9798
or die "Timed out while waiting for subscriber to catch up";
@@ -104,10 +105,9 @@
104105
"SELECT count(*), min(a), max(a) FROM tab_rep");
105106
is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
106107

107-
$result =
108-
$node_subscriber->safe_psql('postgres', "SELECT c, b, a FROM tab_mixed");
109-
is( $result, qq(|foo|1
110-
|bar|2), 'check replicated changes with different column order');
108+
$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_mixed");
109+
is( $result, qq(local|1.1|foo|1
110+
local|2.2|bar|2), 'check replicated changes with different column order');
111111

112112
# insert some duplicate rows
113113
$node_publisher->safe_psql('postgres',
@@ -126,11 +126,14 @@
126126
"ALTER TABLE tab_ins REPLICA IDENTITY FULL");
127127
$node_subscriber->safe_psql('postgres',
128128
"ALTER TABLE tab_ins REPLICA IDENTITY FULL");
129+
# tab_mixed can use DEFAULT, since it has a primary key
129130

130131
# and do the updates
131132
$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a");
132133
$node_publisher->safe_psql('postgres',
133134
"UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'");
135+
$node_publisher->safe_psql('postgres',
136+
"UPDATE tab_mixed SET b = 'baz' WHERE a = 1");
134137

135138
# Wait for subscription to catch up
136139
$node_publisher->poll_query_until('postgres', $caughtup_query)
@@ -148,6 +151,42 @@
148151
bb),
149152
'update works with REPLICA IDENTITY FULL and text datums');
150153

154+
$result = $node_subscriber->safe_psql('postgres',
155+
"SELECT * FROM tab_mixed ORDER BY a");
156+
is( $result, qq(local|1.1|baz|1
157+
local|2.2|bar|2),
158+
'update works with different column order and subscriber local values');
159+
160+
# check behavior with dropped columns
161+
162+
$node_publisher->safe_psql('postgres', "ALTER TABLE tab_mixed DROP COLUMN b");
163+
$node_publisher->safe_psql('postgres',
164+
"UPDATE tab_mixed SET c = 11.11 WHERE a = 1");
165+
166+
$node_publisher->poll_query_until('postgres', $caughtup_query)
167+
or die "Timed out while waiting for subscriber to catch up";
168+
169+
$result = $node_subscriber->safe_psql('postgres',
170+
"SELECT * FROM tab_mixed ORDER BY a");
171+
is( $result, qq(local|11.11|baz|1
172+
local|2.2|bar|2),
173+
'update works with dropped publisher column');
174+
175+
$node_subscriber->safe_psql('postgres',
176+
"ALTER TABLE tab_mixed DROP COLUMN d");
177+
178+
$node_publisher->safe_psql('postgres',
179+
"UPDATE tab_mixed SET c = 22.22 WHERE a = 2");
180+
181+
$node_publisher->poll_query_until('postgres', $caughtup_query)
182+
or die "Timed out while waiting for subscriber to catch up";
183+
184+
$result = $node_subscriber->safe_psql('postgres',
185+
"SELECT * FROM tab_mixed ORDER BY a");
186+
is( $result, qq(11.11|baz|1
187+
22.22|bar|2),
188+
'update works with dropped subscriber column');
189+
151190
# check that change of connection string and/or publication list causes
152191
# restart of subscription workers. Not all of these are registered as tests
153192
# as we need to poll for a change but the test suite will fail none the less

0 commit comments

Comments
 (0)