Skip to content

Commit 5f113d6

Browse files
author
Amit Kapila
committed
Fix partition table's REPLICA IDENTITY checking on the subscriber.
In logical replication, we will check if the target table on the subscriber is updatable by comparing the replica identity of the table on the publisher with the table on the subscriber. When the target table is a partitioned table, we only check its replica identity but not for the partition tables. This leads to assertion failure while applying changes for update/delete as we expect those to succeed only when the corresponding partition table has a primary key or has a replica identity defined. Fix it by checking the replica identity of the partition table while applying changes. Reported-by: Shi Yu Author: Shi Yu, Hou Zhijie Reviewed-by: Amit Langote, Amit Kapila Backpatch-through: 13, where it was introduced Discussion: https://postgr.es/m/OSZPR01MB6310F46CD425A967E4AEF736FDA49@OSZPR01MB6310.jpnprd01.prod.outlook.com
1 parent 1f9a773 commit 5f113d6

File tree

3 files changed

+102
-56
lines changed

3 files changed

+102
-56
lines changed

src/backend/replication/logical/relation.c

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,67 @@ logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
213213
return -1;
214214
}
215215

216+
/*
217+
* Check if replica identity matches and mark the updatable flag.
218+
*
219+
* We allow for stricter replica identity (fewer columns) on subscriber as
220+
* that will not stop us from finding unique tuple. IE, if publisher has
221+
* identity (id,timestamp) and subscriber just (id) this will not be a
222+
* problem, but in the opposite scenario it will.
223+
*
224+
* We just mark the relation entry as not updatable here if the local
225+
* replica identity is found to be insufficient for applying
226+
* updates/deletes (inserts don't care!) and leave it to
227+
* check_relation_updatable() to throw the actual error if needed.
228+
*/
229+
static void
230+
logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
231+
{
232+
Bitmapset *idkey;
233+
LogicalRepRelation *remoterel = &entry->remoterel;
234+
int i;
235+
236+
entry->updatable = true;
237+
238+
idkey = RelationGetIndexAttrBitmap(entry->localrel,
239+
INDEX_ATTR_BITMAP_IDENTITY_KEY);
240+
/* fallback to PK if no replica identity */
241+
if (idkey == NULL)
242+
{
243+
idkey = RelationGetIndexAttrBitmap(entry->localrel,
244+
INDEX_ATTR_BITMAP_PRIMARY_KEY);
245+
246+
/*
247+
* If no replica identity index and no PK, the published table must
248+
* have replica identity FULL.
249+
*/
250+
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
251+
entry->updatable = false;
252+
}
253+
254+
i = -1;
255+
while ((i = bms_next_member(idkey, i)) >= 0)
256+
{
257+
int attnum = i + FirstLowInvalidHeapAttributeNumber;
258+
259+
if (!AttrNumberIsForUserDefinedAttr(attnum))
260+
ereport(ERROR,
261+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
262+
errmsg("logical replication target relation \"%s.%s\" uses "
263+
"system columns in REPLICA IDENTITY index",
264+
remoterel->nspname, remoterel->relname)));
265+
266+
attnum = AttrNumberGetAttrOffset(attnum);
267+
268+
if (entry->attrmap->attnums[attnum] < 0 ||
269+
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
270+
{
271+
entry->updatable = false;
272+
break;
273+
}
274+
}
275+
}
276+
216277
/*
217278
* Open the local relation associated with the remote one.
218279
*
@@ -272,7 +333,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
272333
{
273334
Oid relid;
274335
int found;
275-
Bitmapset *idkey;
276336
TupleDesc desc;
277337
MemoryContext oldctx;
278338
int i;
@@ -332,54 +392,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
332392
remoterel->nspname, remoterel->relname)));
333393

334394
/*
335-
* Check that replica identity matches. We allow for stricter replica
336-
* identity (fewer columns) on subscriber as that will not stop us
337-
* from finding unique tuple. IE, if publisher has identity
338-
* (id,timestamp) and subscriber just (id) this will not be a problem,
339-
* but in the opposite scenario it will.
340-
*
341-
* Don't throw any error here just mark the relation entry as not
342-
* updatable, as replica identity is only for updates and deletes but
343-
* inserts can be replicated even without it.
395+
* Set if the table's replica identity is enough to apply
396+
* update/delete.
344397
*/
345-
entry->updatable = true;
346-
idkey = RelationGetIndexAttrBitmap(entry->localrel,
347-
INDEX_ATTR_BITMAP_IDENTITY_KEY);
348-
/* fallback to PK if no replica identity */
349-
if (idkey == NULL)
350-
{
351-
idkey = RelationGetIndexAttrBitmap(entry->localrel,
352-
INDEX_ATTR_BITMAP_PRIMARY_KEY);
353-
354-
/*
355-
* If no replica identity index and no PK, the published table
356-
* must have replica identity FULL.
357-
*/
358-
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
359-
entry->updatable = false;
360-
}
361-
362-
i = -1;
363-
while ((i = bms_next_member(idkey, i)) >= 0)
364-
{
365-
int attnum = i + FirstLowInvalidHeapAttributeNumber;
366-
367-
if (!AttrNumberIsForUserDefinedAttr(attnum))
368-
ereport(ERROR,
369-
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
370-
errmsg("logical replication target relation \"%s.%s\" uses "
371-
"system columns in REPLICA IDENTITY index",
372-
remoterel->nspname, remoterel->relname)));
373-
374-
attnum = AttrNumberGetAttrOffset(attnum);
375-
376-
if (entry->attrmap->attnums[attnum] < 0 ||
377-
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
378-
{
379-
entry->updatable = false;
380-
break;
381-
}
382-
}
398+
logicalrep_rel_mark_updatable(entry);
383399

384400
entry->localrelvalid = true;
385401
}
@@ -619,7 +635,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
619635
attrmap->maplen * sizeof(AttrNumber));
620636
}
621637

622-
entry->updatable = root->updatable;
638+
/* Set if the table's replica identity is enough to apply update/delete. */
639+
logicalrep_rel_mark_updatable(entry);
623640

624641
entry->localrelvalid = true;
625642

src/backend/replication/logical/worker.c

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,13 @@ apply_handle_insert_internal(ResultRelInfo *relinfo,
730730
static void
731731
check_relation_updatable(LogicalRepRelMapEntry *rel)
732732
{
733+
/*
734+
* For partitioned tables, we only need to care if the target partition is
735+
* updatable (aka has PK or RI defined for it).
736+
*/
737+
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
738+
return;
739+
733740
/* Updatable, no error. */
734741
if (rel->updatable)
735742
return;
@@ -1064,6 +1071,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
10641071
PartitionRoutingInfo *partinfo;
10651072
TupleConversionMap *map;
10661073
MemoryContext oldctx;
1074+
LogicalRepRelMapEntry *part_entry = NULL;
1075+
AttrMap *attrmap = NULL;
10671076

10681077
/* ModifyTableState is needed for ExecFindPartition(). */
10691078
edata->mtstate = mtstate = makeNode(ModifyTableState);
@@ -1097,15 +1106,26 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
10971106
remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
10981107
map = partinfo->pi_RootToPartitionMap;
10991108
if (map != NULL)
1100-
remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1109+
{
1110+
attrmap = map->attrMap;
1111+
remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
11011112
remoteslot_part);
1113+
}
11021114
else
11031115
{
11041116
remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
11051117
slot_getallattrs(remoteslot_part);
11061118
}
11071119
MemoryContextSwitchTo(oldctx);
11081120

1121+
/* Check if we can do the update or delete on the leaf partition. */
1122+
if (operation == CMD_UPDATE || operation == CMD_DELETE)
1123+
{
1124+
part_entry = logicalrep_partition_open(relmapentry, partrel,
1125+
attrmap);
1126+
check_relation_updatable(part_entry);
1127+
}
1128+
11091129
estate->es_result_relation_info = partrelinfo;
11101130
switch (operation)
11111131
{
@@ -1129,15 +1149,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
11291149
* suitable partition.
11301150
*/
11311151
{
1132-
AttrMap *attrmap = map ? map->attrMap : NULL;
1133-
LogicalRepRelMapEntry *part_entry;
11341152
TupleTableSlot *localslot;
11351153
ResultRelInfo *partrelinfo_new;
11361154
bool found;
11371155

1138-
part_entry = logicalrep_partition_open(relmapentry, partrel,
1139-
attrmap);
1140-
11411156
/* Get the matching local tuple from the partition. */
11421157
found = FindReplTupleInLocalRel(estate, partrel,
11431158
&part_entry->remoterel,

src/test/subscription/t/013_partition.pl

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use warnings;
44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 70;
6+
use Test::More tests => 71;
77

88
# setup
99

@@ -853,3 +853,17 @@ BEGIN
853853
$result = $node_subscriber2->safe_psql('postgres',
854854
"SELECT a, b, c FROM tab5 ORDER BY 1");
855855
is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
856+
857+
# Test that replication works correctly as long as the leaf partition
858+
# has the necessary REPLICA IDENTITY, even though the actual target
859+
# partitioned table does not.
860+
$node_subscriber2->safe_psql('postgres',
861+
"ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
862+
863+
$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
864+
865+
$node_publisher->wait_for_catchup('sub2');
866+
867+
$result = $node_subscriber2->safe_psql('postgres',
868+
"SELECT a, b, c FROM tab5_1 ORDER BY 1");
869+
is($result, qq(4||1), 'updates of tab5 replicated correctly');

0 commit comments

Comments
 (0)