Skip to content

Commit 16f5a8d

Browse files
author
Amit Kapila
committed
Fix cache look-up failures while applying changes in logical replication.
While building a new attrmap which maps partition attribute numbers to remoterel's, we incorrectly update the map for dropped column attributes. Later, it caused cache look-up failure when we tried to use the map to fetch the information about attributes. This also fixes the partition map cache invalidation which was using the wrong type cast to fetch the entry. We were using stale partition map entry after invalidation which leads to the assertion or cache look-up failure. Reported-by: Shi Yu Author: Hou Zhijie, Shi Yu Reviewed-by: Amit Langote, Amit Kapila Backpatch-through: 13, where it was introduced Discussion: https://postgr.es/m/OSZPR01MB6310F46CD425A967E4AEF736FDA49@OSZPR01MB6310.jpnprd01.prod.outlook.com
1 parent 12b8fb3 commit 16f5a8d

File tree

2 files changed

+90
-30
lines changed

2 files changed

+90
-30
lines changed

src/backend/replication/logical/relation.c

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
418418
static void
419419
logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
420420
{
421-
LogicalRepRelMapEntry *entry;
421+
LogicalRepPartMapEntry *entry;
422422

423423
/* Just to be sure. */
424424
if (LogicalRepPartMap == NULL)
@@ -431,11 +431,11 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
431431
hash_seq_init(&status, LogicalRepPartMap);
432432

433433
/* TODO, use inverse lookup hashtable? */
434-
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
434+
while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
435435
{
436-
if (entry->localreloid == reloid)
436+
if (entry->relmapentry.localreloid == reloid)
437437
{
438-
entry->localrelvalid = false;
438+
entry->relmapentry.localrelvalid = false;
439439
hash_seq_term(&status);
440440
break;
441441
}
@@ -448,8 +448,8 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
448448

449449
hash_seq_init(&status, LogicalRepPartMap);
450450

451-
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
452-
entry->localrelvalid = false;
451+
while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
452+
entry->relmapentry.localrelvalid = false;
453453
}
454454
}
455455

@@ -502,7 +502,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
502502
Oid partOid = RelationGetRelid(partrel);
503503
AttrMap *attrmap = root->attrmap;
504504
bool found;
505-
int i;
506505
MemoryContext oldctx;
507506

508507
if (LogicalRepPartMap == NULL)
@@ -513,31 +512,40 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
513512
(void *) &partOid,
514513
HASH_ENTER, &found);
515514

516-
if (found)
517-
return &part_entry->relmapentry;
515+
entry = &part_entry->relmapentry;
518516

519-
memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
517+
if (found && entry->localrelvalid)
518+
return entry;
520519

521520
/* Switch to longer-lived context. */
522521
oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
523522

524-
part_entry->partoid = partOid;
523+
if (!found)
524+
{
525+
memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
526+
part_entry->partoid = partOid;
527+
}
525528

526-
/* Remote relation is copied as-is from the root entry. */
527-
entry = &part_entry->relmapentry;
528-
entry->remoterel.remoteid = remoterel->remoteid;
529-
entry->remoterel.nspname = pstrdup(remoterel->nspname);
530-
entry->remoterel.relname = pstrdup(remoterel->relname);
531-
entry->remoterel.natts = remoterel->natts;
532-
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
533-
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
534-
for (i = 0; i < remoterel->natts; i++)
529+
if (!entry->remoterel.remoteid)
535530
{
536-
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
537-
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
531+
int i;
532+
533+
/* Remote relation is copied as-is from the root entry. */
534+
entry = &part_entry->relmapentry;
535+
entry->remoterel.remoteid = remoterel->remoteid;
536+
entry->remoterel.nspname = pstrdup(remoterel->nspname);
537+
entry->remoterel.relname = pstrdup(remoterel->relname);
538+
entry->remoterel.natts = remoterel->natts;
539+
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
540+
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
541+
for (i = 0; i < remoterel->natts; i++)
542+
{
543+
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
544+
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
545+
}
546+
entry->remoterel.replident = remoterel->replident;
547+
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
538548
}
539-
entry->remoterel.replident = remoterel->replident;
540-
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
541549

542550
entry->localrel = partrel;
543551
entry->localreloid = partOid;
@@ -562,7 +570,11 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
562570
{
563571
AttrNumber root_attno = map->attnums[attno];
564572

565-
entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
573+
/* 0 means it's a dropped attribute. See comments atop AttrMap. */
574+
if (root_attno == 0)
575+
entry->attrmap->attnums[attno] = -1;
576+
else
577+
entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
566578
}
567579
}
568580
else

src/test/subscription/t/013_partition.pl

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use warnings;
44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 67;
6+
use Test::More tests => 69;
77

88
# setup
99

@@ -786,7 +786,55 @@ BEGIN
786786
qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
787787
'delete target row is missing in tab2_1');
788788

789-
# No need for this until more tests are added.
790-
# $node_subscriber1->append_conf('postgresql.conf',
791-
# "log_min_messages = warning");
792-
# $node_subscriber1->reload;
789+
$node_subscriber1->append_conf('postgresql.conf',
790+
"log_min_messages = warning");
791+
$node_subscriber1->reload;
792+
793+
# Test that replication continues to work correctly after altering the
794+
# partition of a partitioned target table.
795+
796+
$node_publisher->safe_psql(
797+
'postgres', q{
798+
CREATE TABLE tab5 (a int NOT NULL, b int);
799+
CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
800+
ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;});
801+
802+
$node_subscriber2->safe_psql(
803+
'postgres', q{
804+
CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a);
805+
CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT;
806+
CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
807+
ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;
808+
ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;});
809+
810+
$node_subscriber2->safe_psql('postgres',
811+
"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
812+
813+
$node_subscriber2->poll_query_until('postgres', $synced_query)
814+
or die "Timed out while waiting for subscriber to synchronize data";
815+
816+
# Make partition map cache
817+
$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
818+
$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1");
819+
820+
$node_publisher->wait_for_catchup('sub2');
821+
822+
$result = $node_subscriber2->safe_psql('postgres',
823+
"SELECT a, b FROM tab5 ORDER BY 1");
824+
is($result, qq(2|1), 'updates of tab5 replicated correctly');
825+
826+
# Change the column order of partition on subscriber
827+
$node_subscriber2->safe_psql(
828+
'postgres', q{
829+
ALTER TABLE tab5 DETACH PARTITION tab5_1;
830+
ALTER TABLE tab5_1 DROP COLUMN b;
831+
ALTER TABLE tab5_1 ADD COLUMN b int;
832+
ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT});
833+
834+
$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2");
835+
836+
$node_publisher->wait_for_catchup('sub2');
837+
838+
$result = $node_subscriber2->safe_psql('postgres',
839+
"SELECT a, b, c FROM tab5 ORDER BY 1");
840+
is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');

0 commit comments

Comments
 (0)