Skip to content

Commit 5a97b13

Browse files
author
Amit Kapila
committed
Fix cache look-up failures while applying changes in logical replication.
While building a new attrmap which maps partition attribute numbers to remoterel's, we incorrectly update the map for dropped column attributes. Later, it caused cache look-up failure when we tried to use the map to fetch the information about attributes. This also fixes the partition map cache invalidation which was using the wrong type cast to fetch the entry. We were using stale partition map entry after invalidation which leads to the assertion or cache look-up failure. Reported-by: Shi Yu Author: Hou Zhijie, Shi Yu Reviewed-by: Amit Langote, Amit Kapila Backpatch-through: 13, where it was introduced Discussion: https://postgr.es/m/OSZPR01MB6310F46CD425A967E4AEF736FDA49@OSZPR01MB6310.jpnprd01.prod.outlook.com
1 parent a3ff08e commit 5a97b13

File tree

2 files changed

+89
-29
lines changed

2 files changed

+89
-29
lines changed

src/backend/replication/logical/relation.c

+37-25
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
451451
static void
452452
logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
453453
{
454-
LogicalRepRelMapEntry *entry;
454+
LogicalRepPartMapEntry *entry;
455455

456456
/* Just to be sure. */
457457
if (LogicalRepPartMap == NULL)
@@ -464,11 +464,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
464464
hash_seq_init(&status, LogicalRepPartMap);
465465

466466
/* TODO, use inverse lookup hashtable? */
467-
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
467+
while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
468468
{
469-
if (entry->localreloid == reloid)
469+
if (entry->relmapentry.localreloid == reloid)
470470
{
471-
entry->localrelvalid = false;
471+
entry->relmapentry.localrelvalid = false;
472472
hash_seq_term(&status);
473473
break;
474474
}
@@ -481,8 +481,8 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
481481

482482
hash_seq_init(&status, LogicalRepPartMap);
483483

484-
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
485-
entry->localrelvalid = false;
484+
while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
485+
entry->relmapentry.localrelvalid = false;
486486
}
487487
}
488488

@@ -534,7 +534,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
534534
Oid partOid = RelationGetRelid(partrel);
535535
AttrMap *attrmap = root->attrmap;
536536
bool found;
537-
int i;
538537
MemoryContext oldctx;
539538

540539
if (LogicalRepPartMap == NULL)
@@ -545,31 +544,40 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
545544
(void *) &partOid,
546545
HASH_ENTER, &found);
547546

548-
if (found)
549-
return &part_entry->relmapentry;
547+
entry = &part_entry->relmapentry;
550548

551-
memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
549+
if (found && entry->localrelvalid)
550+
return entry;
552551

553552
/* Switch to longer-lived context. */
554553
oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
555554

556-
part_entry->partoid = partOid;
555+
if (!found)
556+
{
557+
memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
558+
part_entry->partoid = partOid;
559+
}
557560

558-
/* Remote relation is copied as-is from the root entry. */
559-
entry = &part_entry->relmapentry;
560-
entry->remoterel.remoteid = remoterel->remoteid;
561-
entry->remoterel.nspname = pstrdup(remoterel->nspname);
562-
entry->remoterel.relname = pstrdup(remoterel->relname);
563-
entry->remoterel.natts = remoterel->natts;
564-
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
565-
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
566-
for (i = 0; i < remoterel->natts; i++)
561+
if (!entry->remoterel.remoteid)
567562
{
568-
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
569-
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
563+
int i;
564+
565+
/* Remote relation is copied as-is from the root entry. */
566+
entry = &part_entry->relmapentry;
567+
entry->remoterel.remoteid = remoterel->remoteid;
568+
entry->remoterel.nspname = pstrdup(remoterel->nspname);
569+
entry->remoterel.relname = pstrdup(remoterel->relname);
570+
entry->remoterel.natts = remoterel->natts;
571+
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
572+
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
573+
for (i = 0; i < remoterel->natts; i++)
574+
{
575+
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
576+
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
577+
}
578+
entry->remoterel.replident = remoterel->replident;
579+
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
570580
}
571-
entry->remoterel.replident = remoterel->replident;
572-
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
573581

574582
entry->localrel = partrel;
575583
entry->localreloid = partOid;
@@ -594,7 +602,11 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
594602
{
595603
AttrNumber root_attno = map->attnums[attno];
596604

597-
entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
605+
/* 0 means it's a dropped attribute. See comments atop AttrMap. */
606+
if (root_attno == 0)
607+
entry->attrmap->attnums[attno] = -1;
608+
else
609+
entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
598610
}
599611
}
600612
else

src/test/subscription/t/013_partition.pl

+52-4
Original file line numberDiff line numberDiff line change
@@ -800,9 +800,57 @@ BEGIN
800800
qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
801801
'delete target row is missing in tab2_1');
802802

803-
# No need for this until more tests are added.
804-
# $node_subscriber1->append_conf('postgresql.conf',
805-
# "log_min_messages = warning");
806-
# $node_subscriber1->reload;
803+
$node_subscriber1->append_conf('postgresql.conf',
804+
"log_min_messages = warning");
805+
$node_subscriber1->reload;
806+
807+
# Test that replication continues to work correctly after altering the
808+
# partition of a partitioned target table.
809+
810+
$node_publisher->safe_psql(
811+
'postgres', q{
812+
CREATE TABLE tab5 (a int NOT NULL, b int);
813+
CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
814+
ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;});
815+
816+
$node_subscriber2->safe_psql(
817+
'postgres', q{
818+
CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a);
819+
CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT;
820+
CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
821+
ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;
822+
ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;});
823+
824+
$node_subscriber2->safe_psql('postgres',
825+
"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
826+
827+
$node_subscriber2->poll_query_until('postgres', $synced_query)
828+
or die "Timed out while waiting for subscriber to synchronize data";
829+
830+
# Make partition map cache
831+
$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
832+
$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1");
833+
834+
$node_publisher->wait_for_catchup('sub2');
835+
836+
$result = $node_subscriber2->safe_psql('postgres',
837+
"SELECT a, b FROM tab5 ORDER BY 1");
838+
is($result, qq(2|1), 'updates of tab5 replicated correctly');
839+
840+
# Change the column order of partition on subscriber
841+
$node_subscriber2->safe_psql(
842+
'postgres', q{
843+
ALTER TABLE tab5 DETACH PARTITION tab5_1;
844+
ALTER TABLE tab5_1 DROP COLUMN b;
845+
ALTER TABLE tab5_1 ADD COLUMN b int;
846+
ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT});
847+
848+
$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2");
849+
850+
$node_publisher->wait_for_catchup('sub2');
851+
852+
$result = $node_subscriber2->safe_psql('postgres',
853+
"SELECT a, b, c FROM tab5 ORDER BY 1");
854+
is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');
807855

808856
done_testing();

0 commit comments

Comments
 (0)