Skip to content

Commit 1f9a773

Browse files
author
Amit Kapila
committed
Fix data inconsistency between publisher and subscriber.
We were not updating the partition map cache in the subscriber even when the corresponding remote rel is changed. Due to this data was getting incorrectly replicated for partition tables after the publisher has changed the table schema. Fix it by resetting the required entries in the partition map cache after receiving a new relation mapping from the publisher. Reported-by: Shi Yu Author: Shi Yu, Hou Zhijie Reviewed-by: Amit Langote, Amit Kapila Backpatch-through: 13, where it was introduced Discussion: https://postgr.es/m/OSZPR01MB6310F46CD425A967E4AEF736FDA49@OSZPR01MB6310.jpnprd01.prod.outlook.com
1 parent 16f5a8d commit 1f9a773

File tree

4 files changed

+54
-1
lines changed

4 files changed

+54
-1
lines changed

src/backend/replication/logical/relation.c

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,40 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
453453
}
454454
}
455455

456+
/*
457+
* Reset the entries in the partition map that refer to remoterel.
458+
*
459+
* Called when new relation mapping is sent by the publisher to update our
460+
* expected view of incoming data from said publisher.
461+
*
462+
* Note that we don't update the remoterel information in the entry here,
463+
* we will update the information in logicalrep_partition_open to avoid
464+
* unnecessary work.
465+
*/
466+
void
467+
logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
468+
{
469+
HASH_SEQ_STATUS status;
470+
LogicalRepPartMapEntry *part_entry;
471+
LogicalRepRelMapEntry *entry;
472+
473+
if (LogicalRepPartMap == NULL)
474+
return;
475+
476+
hash_seq_init(&status, LogicalRepPartMap);
477+
while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
478+
{
479+
entry = &part_entry->relmapentry;
480+
481+
if (entry->remoterel.remoteid != remoterel->remoteid)
482+
continue;
483+
484+
logicalrep_relmap_free_entry(entry);
485+
486+
memset(entry, 0, sizeof(LogicalRepRelMapEntry));
487+
}
488+
}
489+
456490
/*
457491
* Initialize the partition map cache.
458492
*/

src/backend/replication/logical/worker.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,9 @@ apply_handle_relation(StringInfo s)
612612

613613
rel = logicalrep_read_rel(s);
614614
logicalrep_relmap_update(rel);
615+
616+
/* Also reset all entries in the partition map that refer to remoterel. */
617+
logicalrep_partmap_reset_relmap(rel);
615618
}
616619

617620
/*

src/include/replication/logicalrelation.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ typedef struct LogicalRepRelMapEntry
3333
} LogicalRepRelMapEntry;
3434

3535
extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
36+
extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
3637

3738
extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
3839
LOCKMODE lockmode);

src/test/subscription/t/013_partition.pl

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use warnings;
44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 69;
6+
use Test::More tests => 70;
77

88
# setup
99

@@ -838,3 +838,18 @@ BEGIN
838838
$result = $node_subscriber2->safe_psql('postgres',
839839
"SELECT a, b, c FROM tab5 ORDER BY 1");
840840
is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');
841+
842+
# Test that replication into the partitioned target table continues to
843+
# work correctly when the published table is altered.
844+
$node_publisher->safe_psql(
845+
'postgres', q{
846+
ALTER TABLE tab5 DROP COLUMN b, ADD COLUMN c INT;
847+
ALTER TABLE tab5 ADD COLUMN b INT;});
848+
849+
$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 3");
850+
851+
$node_publisher->wait_for_catchup('sub2');
852+
853+
$result = $node_subscriber2->safe_psql('postgres',
854+
"SELECT a, b, c FROM tab5 ORDER BY 1");
855+
is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');

0 commit comments

Comments
 (0)