Skip to content

Commit f09a81f

Browse files
author
Amit Kapila
committed
Invalidate all partitions for a partitioned table in publication.
Updates/Deletes on a partition were allowed even without replica identity after the parent table was added to a publication. This would later lead to an error on subscribers. The reason was that we were not invalidating the partition's relcache and the publication information for partitions was not getting rebuilt. Similarly, we were not invalidating the partitions' relcache after dropping a partitioned table from a publication which will prohibit Updates/Deletes on its partition without replica identity even without any publication. Reported-by: Haiying Tang Author: Hou Zhijie and Vignesh C Reviewed-by: Vignesh C and Amit Kapila Backpatch-through: 13 Discussion: https://postgr.es/m/OS0PR01MB6113D77F583C922F1CEAA1C3FBD29@OS0PR01MB6113.jpnprd01.prod.outlook.com
1 parent 583e15a commit f09a81f

File tree

6 files changed

+116
-55
lines changed

6 files changed

+116
-55
lines changed

src/backend/catalog/pg_publication.c

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "catalog/pg_publication.h"
3232
#include "catalog/pg_publication_rel.h"
3333
#include "catalog/pg_type.h"
34+
#include "commands/publicationcmds.h"
3435
#include "funcapi.h"
3536
#include "miscadmin.h"
3637
#include "utils/array.h"
@@ -136,6 +137,42 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
136137
PG_RETURN_BOOL(result);
137138
}
138139

140+
/*
141+
* Gets the relations based on the publication partition option for a specified
142+
* relation.
143+
*/
144+
List *
145+
GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
146+
Oid relid)
147+
{
148+
if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
149+
pub_partopt != PUBLICATION_PART_ROOT)
150+
{
151+
List *all_parts = find_all_inheritors(relid, NoLock,
152+
NULL);
153+
154+
if (pub_partopt == PUBLICATION_PART_ALL)
155+
result = list_concat(result, all_parts);
156+
else if (pub_partopt == PUBLICATION_PART_LEAF)
157+
{
158+
ListCell *lc;
159+
160+
foreach(lc, all_parts)
161+
{
162+
Oid partOid = lfirst_oid(lc);
163+
164+
if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
165+
result = lappend_oid(result, partOid);
166+
}
167+
}
168+
else
169+
Assert(false);
170+
}
171+
else
172+
result = lappend_oid(result, relid);
173+
174+
return result;
175+
}
139176

140177
/*
141178
* Insert new publication / relation mapping.
@@ -153,6 +190,7 @@ publication_add_relation(Oid pubid, Relation targetrel,
153190
Publication *pub = GetPublication(pubid);
154191
ObjectAddress myself,
155192
referenced;
193+
List *relids = NIL;
156194

157195
rel = table_open(PublicationRelRelationId, RowExclusiveLock);
158196

@@ -208,8 +246,18 @@ publication_add_relation(Oid pubid, Relation targetrel,
208246
/* Close the table. */
209247
table_close(rel, RowExclusiveLock);
210248

211-
/* Invalidate relcache so that publication info is rebuilt. */
212-
CacheInvalidateRelcache(targetrel);
249+
/*
250+
* Invalidate relcache so that publication info is rebuilt.
251+
*
252+
* For the partitioned tables, we must invalidate all partitions contained
253+
* in the respective partition hierarchies, not just the one explicitly
254+
* mentioned in the publication. This is required because we implicitly
255+
* publish the child tables when the parent table is published.
256+
*/
257+
relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
258+
relid);
259+
260+
InvalidatePublicationRels(relids);
213261

214262
return myself;
215263
}
@@ -241,7 +289,7 @@ GetRelationPublications(Oid relid)
241289
/*
242290
* Gets list of relation oids for a publication.
243291
*
244-
* This should only be used for normal publications, the FOR ALL TABLES
292+
* This should only be used FOR TABLE publications, the FOR ALL TABLES
245293
* should use GetAllTablesPublicationRelations().
246294
*/
247295
List *
@@ -270,32 +318,8 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
270318
Form_pg_publication_rel pubrel;
271319

272320
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
273-
274-
if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
275-
pub_partopt != PUBLICATION_PART_ROOT)
276-
{
277-
List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock,
278-
NULL);
279-
280-
if (pub_partopt == PUBLICATION_PART_ALL)
281-
result = list_concat(result, all_parts);
282-
else if (pub_partopt == PUBLICATION_PART_LEAF)
283-
{
284-
ListCell *lc;
285-
286-
foreach(lc, all_parts)
287-
{
288-
Oid partOid = lfirst_oid(lc);
289-
290-
if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
291-
result = lappend_oid(result, partOid);
292-
}
293-
}
294-
else
295-
Assert(false);
296-
}
297-
else
298-
result = lappend_oid(result, pubrel->prrelid);
321+
result = GetPubPartitionOptionRelations(result, pub_partopt,
322+
pubrel->prrelid);
299323
}
300324

301325
systable_endscan(scan);

src/backend/commands/publicationcmds.c

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@
4545
#include "utils/syscache.h"
4646
#include "utils/varlena.h"
4747

48-
/* Same as MAXNUMMESSAGES in sinvaladt.c */
49-
#define MAX_RELCACHE_INVAL_MSGS 4096
50-
5148
static List *OpenTableList(List *tables);
5249
static void CloseTableList(List *rels);
5350
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
@@ -330,23 +327,7 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
330327
List *relids = GetPublicationRelations(pubform->oid,
331328
PUBLICATION_PART_ALL);
332329

333-
/*
334-
* We don't want to send too many individual messages, at some point
335-
* it's cheaper to just reset whole relcache.
336-
*/
337-
if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
338-
{
339-
ListCell *lc;
340-
341-
foreach(lc, relids)
342-
{
343-
Oid relid = lfirst_oid(lc);
344-
345-
CacheInvalidateRelcacheByRelid(relid);
346-
}
347-
}
348-
else
349-
CacheInvalidateRelcacheAll();
330+
InvalidatePublicationRels(relids);
350331
}
351332

352333
ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
@@ -356,6 +337,27 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
356337
InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
357338
}
358339

340+
/*
341+
* Invalidate the relations.
342+
*/
343+
void
344+
InvalidatePublicationRels(List *relids)
345+
{
346+
/*
347+
* We don't want to send too many individual messages, at some point it's
348+
* cheaper to just reset whole relcache.
349+
*/
350+
if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
351+
{
352+
ListCell *lc;
353+
354+
foreach(lc, relids)
355+
CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
356+
}
357+
else
358+
CacheInvalidateRelcacheAll();
359+
}
360+
359361
/*
360362
* Add or remove table to/from publication.
361363
*/
@@ -512,6 +514,7 @@ RemovePublicationRelById(Oid proid)
512514
Relation rel;
513515
HeapTuple tup;
514516
Form_pg_publication_rel pubrel;
517+
List *relids = NIL;
515518

516519
rel = table_open(PublicationRelRelationId, RowExclusiveLock);
517520

@@ -523,8 +526,18 @@ RemovePublicationRelById(Oid proid)
523526

524527
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
525528

526-
/* Invalidate relcache so that publication info is rebuilt. */
527-
CacheInvalidateRelcacheByRelid(pubrel->prrelid);
529+
/*
530+
* Invalidate relcache so that publication info is rebuilt.
531+
*
532+
* For the partitioned tables, we must invalidate all partitions contained
533+
* in the respective partition hierarchies, not just the one explicitly
534+
* mentioned in the publication. This is required because we implicitly
535+
* publish the child tables when the parent table is published.
536+
*/
537+
relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
538+
pubrel->prrelid);
539+
540+
InvalidatePublicationRels(relids);
528541

529542
CatalogTupleDelete(rel, &tup->t_self);
530543

src/include/catalog/pg_publication.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ extern List *GetAllTablesPublicationRelations(bool pubviaroot);
107107
extern bool is_publishable_relation(Relation rel);
108108
extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
109109
bool if_not_exists);
110+
extern List *GetPubPartitionOptionRelations(List *result,
111+
PublicationPartOpt pub_partopt,
112+
Oid relid);
110113

111114
extern Oid get_publication_oid(const char *pubname, bool missing_ok);
112115
extern char *get_publication_name(Oid pubid, bool missing_ok);

src/include/commands/publicationcmds.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
#include "catalog/objectaddress.h"
1919
#include "nodes/parsenodes.h"
20+
#include "utils/inval.h"
21+
22+
/* Same as MAXNUMMESSAGES in sinvaladt.c */
23+
#define MAX_RELCACHE_INVAL_MSGS 4096
2024

2125
extern ObjectAddress CreatePublication(CreatePublicationStmt *stmt);
2226
extern void AlterPublication(AlterPublicationStmt *stmt);
@@ -25,5 +29,6 @@ extern void RemovePublicationRelById(Oid proid);
2529

2630
extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
2731
extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
32+
extern void InvalidatePublicationRels(List *relids);
2833

2934
#endif /* PUBLICATIONCMDS_H */

src/test/regress/expected/publication.out

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,12 @@ CREATE PUBLICATION testpub_forparted;
124124
CREATE PUBLICATION testpub_forparted1;
125125
RESET client_min_messages;
126126
CREATE TABLE testpub_parted1 (LIKE testpub_parted);
127+
CREATE TABLE testpub_parted2 (LIKE testpub_parted);
127128
ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
129+
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
130+
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
128131
-- works despite missing REPLICA IDENTITY, because updates are not replicated
129132
UPDATE testpub_parted1 SET a = 1;
130-
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
131133
-- only parent is listed as being in publication, not the partition
132134
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
133135
\dRp+ testpub_forparted
@@ -154,7 +156,14 @@ ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
154156
Tables:
155157
"public.testpub_parted"
156158

157-
DROP TABLE testpub_parted1;
159+
-- still fail, because parent's publication replicates updates
160+
UPDATE testpub_parted2 SET a = 2;
161+
ERROR: cannot update table "testpub_parted2" because it does not have a replica identity and publishes updates
162+
HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
163+
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
164+
-- works again, because update is no longer replicated
165+
UPDATE testpub_parted2 SET a = 2;
166+
DROP TABLE testpub_parted1, testpub_parted2;
158167
DROP PUBLICATION testpub_forparted, testpub_forparted1;
159168
-- Test cache invalidation FOR ALL TABLES publication
160169
SET client_min_messages = 'ERROR';

src/test/regress/sql/publication.sql

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,12 @@ CREATE PUBLICATION testpub_forparted;
7676
CREATE PUBLICATION testpub_forparted1;
7777
RESET client_min_messages;
7878
CREATE TABLE testpub_parted1 (LIKE testpub_parted);
79+
CREATE TABLE testpub_parted2 (LIKE testpub_parted);
7980
ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
81+
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
82+
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
8083
-- works despite missing REPLICA IDENTITY, because updates are not replicated
8184
UPDATE testpub_parted1 SET a = 1;
82-
ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
8385
-- only parent is listed as being in publication, not the partition
8486
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
8587
\dRp+ testpub_forparted
@@ -90,7 +92,12 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
9092
UPDATE testpub_parted1 SET a = 1;
9193
ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
9294
\dRp+ testpub_forparted
93-
DROP TABLE testpub_parted1;
95+
-- still fail, because parent's publication replicates updates
96+
UPDATE testpub_parted2 SET a = 2;
97+
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
98+
-- works again, because update is no longer replicated
99+
UPDATE testpub_parted2 SET a = 2;
100+
DROP TABLE testpub_parted1, testpub_parted2;
94101
DROP PUBLICATION testpub_forparted, testpub_forparted1;
95102

96103
-- Test cache invalidation FOR ALL TABLES publication

0 commit comments

Comments
 (0)