@@ -213,6 +213,67 @@ logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
213
213
return -1 ;
214
214
}
215
215
216
+ /*
217
+ * Check if replica identity matches and mark the updatable flag.
218
+ *
219
+ * We allow for stricter replica identity (fewer columns) on subscriber as
220
+ * that will not stop us from finding unique tuple. IE, if publisher has
221
+ * identity (id,timestamp) and subscriber just (id) this will not be a
222
+ * problem, but in the opposite scenario it will.
223
+ *
224
+ * We just mark the relation entry as not updatable here if the local
225
+ * replica identity is found to be insufficient for applying
226
+ * updates/deletes (inserts don't care!) and leave it to
227
+ * check_relation_updatable() to throw the actual error if needed.
228
+ */
229
+ static void
230
+ logicalrep_rel_mark_updatable (LogicalRepRelMapEntry * entry )
231
+ {
232
+ Bitmapset * idkey ;
233
+ LogicalRepRelation * remoterel = & entry -> remoterel ;
234
+ int i ;
235
+
236
+ entry -> updatable = true;
237
+
238
+ idkey = RelationGetIndexAttrBitmap (entry -> localrel ,
239
+ INDEX_ATTR_BITMAP_IDENTITY_KEY );
240
+ /* fallback to PK if no replica identity */
241
+ if (idkey == NULL )
242
+ {
243
+ idkey = RelationGetIndexAttrBitmap (entry -> localrel ,
244
+ INDEX_ATTR_BITMAP_PRIMARY_KEY );
245
+
246
+ /*
247
+ * If no replica identity index and no PK, the published table must
248
+ * have replica identity FULL.
249
+ */
250
+ if (idkey == NULL && remoterel -> replident != REPLICA_IDENTITY_FULL )
251
+ entry -> updatable = false;
252
+ }
253
+
254
+ i = -1 ;
255
+ while ((i = bms_next_member (idkey , i )) >= 0 )
256
+ {
257
+ int attnum = i + FirstLowInvalidHeapAttributeNumber ;
258
+
259
+ if (!AttrNumberIsForUserDefinedAttr (attnum ))
260
+ ereport (ERROR ,
261
+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
262
+ errmsg ("logical replication target relation \"%s.%s\" uses "
263
+ "system columns in REPLICA IDENTITY index" ,
264
+ remoterel -> nspname , remoterel -> relname )));
265
+
266
+ attnum = AttrNumberGetAttrOffset (attnum );
267
+
268
+ if (entry -> attrmap -> attnums [attnum ] < 0 ||
269
+ !bms_is_member (entry -> attrmap -> attnums [attnum ], remoterel -> attkeys ))
270
+ {
271
+ entry -> updatable = false;
272
+ break ;
273
+ }
274
+ }
275
+ }
276
+
216
277
/*
217
278
* Open the local relation associated with the remote one.
218
279
*
@@ -272,7 +333,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
272
333
{
273
334
Oid relid ;
274
335
int found ;
275
- Bitmapset * idkey ;
276
336
TupleDesc desc ;
277
337
MemoryContext oldctx ;
278
338
int i ;
@@ -332,54 +392,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
332
392
remoterel -> nspname , remoterel -> relname )));
333
393
334
394
/*
335
- * Check that replica identity matches. We allow for stricter replica
336
- * identity (fewer columns) on subscriber as that will not stop us
337
- * from finding unique tuple. IE, if publisher has identity
338
- * (id,timestamp) and subscriber just (id) this will not be a problem,
339
- * but in the opposite scenario it will.
340
- *
341
- * Don't throw any error here just mark the relation entry as not
342
- * updatable, as replica identity is only for updates and deletes but
343
- * inserts can be replicated even without it.
395
+ * Set if the table's replica identity is enough to apply
396
+ * update/delete.
344
397
*/
345
- entry -> updatable = true;
346
- idkey = RelationGetIndexAttrBitmap (entry -> localrel ,
347
- INDEX_ATTR_BITMAP_IDENTITY_KEY );
348
- /* fallback to PK if no replica identity */
349
- if (idkey == NULL )
350
- {
351
- idkey = RelationGetIndexAttrBitmap (entry -> localrel ,
352
- INDEX_ATTR_BITMAP_PRIMARY_KEY );
353
-
354
- /*
355
- * If no replica identity index and no PK, the published table
356
- * must have replica identity FULL.
357
- */
358
- if (idkey == NULL && remoterel -> replident != REPLICA_IDENTITY_FULL )
359
- entry -> updatable = false;
360
- }
361
-
362
- i = -1 ;
363
- while ((i = bms_next_member (idkey , i )) >= 0 )
364
- {
365
- int attnum = i + FirstLowInvalidHeapAttributeNumber ;
366
-
367
- if (!AttrNumberIsForUserDefinedAttr (attnum ))
368
- ereport (ERROR ,
369
- (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
370
- errmsg ("logical replication target relation \"%s.%s\" uses "
371
- "system columns in REPLICA IDENTITY index" ,
372
- remoterel -> nspname , remoterel -> relname )));
373
-
374
- attnum = AttrNumberGetAttrOffset (attnum );
375
-
376
- if (entry -> attrmap -> attnums [attnum ] < 0 ||
377
- !bms_is_member (entry -> attrmap -> attnums [attnum ], remoterel -> attkeys ))
378
- {
379
- entry -> updatable = false;
380
- break ;
381
- }
382
- }
398
+ logicalrep_rel_mark_updatable (entry );
383
399
384
400
entry -> localrelvalid = true;
385
401
}
@@ -619,7 +635,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
619
635
attrmap -> maplen * sizeof (AttrNumber ));
620
636
}
621
637
622
- entry -> updatable = root -> updatable ;
638
+ /* Set if the table's replica identity is enough to apply update/delete. */
639
+ logicalrep_rel_mark_updatable (entry );
623
640
624
641
entry -> localrelvalid = true;
625
642
0 commit comments