Skip to content

Commit 52d5ea9

Browse files
author
Amit Kapila
committed
Fix partition table's REPLICA IDENTITY checking on the subscriber.
In logical replication, we will check if the target table on the subscriber is updatable by comparing the replica identity of the table on the publisher with the table on the subscriber. When the target table is a partitioned table, we only check its replica identity but not for the partition tables. This leads to assertion failure while applying changes for update/delete as we expect those to succeed only when the corresponding partition table has a primary key or has a replica identity defined. Fix it by checking the replica identity of the partition table while applying changes. Reported-by: Shi Yu Author: Shi Yu, Hou Zhijie Reviewed-by: Amit Langote, Amit Kapila Backpatch-through: 13, where it was introduced Discussion: https://postgr.es/m/OSZPR01MB6310F46CD425A967E4AEF736FDA49@OSZPR01MB6310.jpnprd01.prod.outlook.com
1 parent 0980adf commit 52d5ea9

File tree

3 files changed

+102
-56
lines changed

3 files changed

+102
-56
lines changed

src/backend/replication/logical/relation.c

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,67 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
249249
}
250250
}
251251

252+
/*
253+
* Check if replica identity matches and mark the updatable flag.
254+
*
255+
* We allow for stricter replica identity (fewer columns) on subscriber as
256+
* that will not stop us from finding unique tuple. IE, if publisher has
257+
* identity (id,timestamp) and subscriber just (id) this will not be a
258+
* problem, but in the opposite scenario it will.
259+
*
260+
* We just mark the relation entry as not updatable here if the local
261+
* replica identity is found to be insufficient for applying
262+
* updates/deletes (inserts don't care!) and leave it to
263+
* check_relation_updatable() to throw the actual error if needed.
264+
*/
265+
static void
266+
logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
267+
{
268+
Bitmapset *idkey;
269+
LogicalRepRelation *remoterel = &entry->remoterel;
270+
int i;
271+
272+
entry->updatable = true;
273+
274+
idkey = RelationGetIndexAttrBitmap(entry->localrel,
275+
INDEX_ATTR_BITMAP_IDENTITY_KEY);
276+
/* fallback to PK if no replica identity */
277+
if (idkey == NULL)
278+
{
279+
idkey = RelationGetIndexAttrBitmap(entry->localrel,
280+
INDEX_ATTR_BITMAP_PRIMARY_KEY);
281+
282+
/*
283+
* If no replica identity index and no PK, the published table must
284+
* have replica identity FULL.
285+
*/
286+
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
287+
entry->updatable = false;
288+
}
289+
290+
i = -1;
291+
while ((i = bms_next_member(idkey, i)) >= 0)
292+
{
293+
int attnum = i + FirstLowInvalidHeapAttributeNumber;
294+
295+
if (!AttrNumberIsForUserDefinedAttr(attnum))
296+
ereport(ERROR,
297+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
298+
errmsg("logical replication target relation \"%s.%s\" uses "
299+
"system columns in REPLICA IDENTITY index",
300+
remoterel->nspname, remoterel->relname)));
301+
302+
attnum = AttrNumberGetAttrOffset(attnum);
303+
304+
if (entry->attrmap->attnums[attnum] < 0 ||
305+
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
306+
{
307+
entry->updatable = false;
308+
break;
309+
}
310+
}
311+
}
312+
252313
/*
253314
* Open the local relation associated with the remote one.
254315
*
@@ -307,7 +368,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
307368
if (!entry->localrelvalid)
308369
{
309370
Oid relid;
310-
Bitmapset *idkey;
311371
TupleDesc desc;
312372
MemoryContext oldctx;
313373
int i;
@@ -366,54 +426,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
366426
bms_free(missingatts);
367427

368428
/*
369-
* Check that replica identity matches. We allow for stricter replica
370-
* identity (fewer columns) on subscriber as that will not stop us
371-
* from finding unique tuple. IE, if publisher has identity
372-
* (id,timestamp) and subscriber just (id) this will not be a problem,
373-
* but in the opposite scenario it will.
374-
*
375-
* Don't throw any error here just mark the relation entry as not
376-
* updatable, as replica identity is only for updates and deletes but
377-
* inserts can be replicated even without it.
429+
* Set if the table's replica identity is enough to apply
430+
* update/delete.
378431
*/
379-
entry->updatable = true;
380-
idkey = RelationGetIndexAttrBitmap(entry->localrel,
381-
INDEX_ATTR_BITMAP_IDENTITY_KEY);
382-
/* fallback to PK if no replica identity */
383-
if (idkey == NULL)
384-
{
385-
idkey = RelationGetIndexAttrBitmap(entry->localrel,
386-
INDEX_ATTR_BITMAP_PRIMARY_KEY);
387-
388-
/*
389-
* If no replica identity index and no PK, the published table
390-
* must have replica identity FULL.
391-
*/
392-
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
393-
entry->updatable = false;
394-
}
395-
396-
i = -1;
397-
while ((i = bms_next_member(idkey, i)) >= 0)
398-
{
399-
int attnum = i + FirstLowInvalidHeapAttributeNumber;
400-
401-
if (!AttrNumberIsForUserDefinedAttr(attnum))
402-
ereport(ERROR,
403-
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
404-
errmsg("logical replication target relation \"%s.%s\" uses "
405-
"system columns in REPLICA IDENTITY index",
406-
remoterel->nspname, remoterel->relname)));
407-
408-
attnum = AttrNumberGetAttrOffset(attnum);
409-
410-
if (entry->attrmap->attnums[attnum] < 0 ||
411-
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
412-
{
413-
entry->updatable = false;
414-
break;
415-
}
416-
}
432+
logicalrep_rel_mark_updatable(entry);
417433

418434
entry->localrelvalid = true;
419435
}
@@ -651,7 +667,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
651667
attrmap->maplen * sizeof(AttrNumber));
652668
}
653669

654-
entry->updatable = root->updatable;
670+
/* Set if the table's replica identity is enough to apply update/delete. */
671+
logicalrep_rel_mark_updatable(entry);
655672

656673
entry->localrelvalid = true;
657674

src/backend/replication/logical/worker.c

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,6 +1323,13 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
13231323
static void
13241324
check_relation_updatable(LogicalRepRelMapEntry *rel)
13251325
{
1326+
/*
1327+
* For partitioned tables, we only need to care if the target partition is
1328+
* updatable (aka has PK or RI defined for it).
1329+
*/
1330+
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1331+
return;
1332+
13261333
/* Updatable, no error. */
13271334
if (rel->updatable)
13281335
return;
@@ -1676,6 +1683,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
16761683
TupleTableSlot *remoteslot_part;
16771684
TupleConversionMap *map;
16781685
MemoryContext oldctx;
1686+
LogicalRepRelMapEntry *part_entry = NULL;
1687+
AttrMap *attrmap = NULL;
16791688

16801689
/* ModifyTableState is needed for ExecFindPartition(). */
16811690
edata->mtstate = mtstate = makeNode(ModifyTableState);
@@ -1707,15 +1716,26 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
17071716
remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
17081717
map = partrelinfo->ri_RootToPartitionMap;
17091718
if (map != NULL)
1710-
remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1719+
{
1720+
attrmap = map->attrMap;
1721+
remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
17111722
remoteslot_part);
1723+
}
17121724
else
17131725
{
17141726
remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
17151727
slot_getallattrs(remoteslot_part);
17161728
}
17171729
MemoryContextSwitchTo(oldctx);
17181730

1731+
/* Check if we can do the update or delete on the leaf partition. */
1732+
if (operation == CMD_UPDATE || operation == CMD_DELETE)
1733+
{
1734+
part_entry = logicalrep_partition_open(relmapentry, partrel,
1735+
attrmap);
1736+
check_relation_updatable(part_entry);
1737+
}
1738+
17191739
switch (operation)
17201740
{
17211741
case CMD_INSERT:
@@ -1737,15 +1757,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
17371757
* suitable partition.
17381758
*/
17391759
{
1740-
AttrMap *attrmap = map ? map->attrMap : NULL;
1741-
LogicalRepRelMapEntry *part_entry;
17421760
TupleTableSlot *localslot;
17431761
ResultRelInfo *partrelinfo_new;
17441762
bool found;
17451763

1746-
part_entry = logicalrep_partition_open(relmapentry, partrel,
1747-
attrmap);
1748-
17491764
/* Get the matching local tuple from the partition. */
17501765
found = FindReplTupleInLocalRel(estate, partrel,
17511766
&part_entry->remoterel,

src/test/subscription/t/013_partition.pl

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use warnings;
77
use PostgresNode;
88
use TestLib;
9-
use Test::More tests => 70;
9+
use Test::More tests => 71;
1010

1111
# setup
1212

@@ -856,3 +856,17 @@ BEGIN
856856
$result = $node_subscriber2->safe_psql('postgres',
857857
"SELECT a, b, c FROM tab5 ORDER BY 1");
858858
is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
859+
860+
# Test that replication works correctly as long as the leaf partition
861+
# has the necessary REPLICA IDENTITY, even though the actual target
862+
# partitioned table does not.
863+
$node_subscriber2->safe_psql('postgres',
864+
"ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
865+
866+
$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
867+
868+
$node_publisher->wait_for_catchup('sub2');
869+
870+
$result = $node_subscriber2->safe_psql('postgres',
871+
"SELECT a, b, c FROM tab5_1 ORDER BY 1");
872+
is($result, qq(4||1), 'updates of tab5 replicated correctly');

0 commit comments

Comments
 (0)