47
47
#include "utils/syscache.h"
48
48
49
49
static List * fetch_table_list (WalReceiverConn * wrconn , List * publications );
50
+ static void check_duplicates_in_publist (List * publist , Datum * datums );
51
+ static List * merge_publications (List * oldpublist , List * newpublist , bool addpub , const char * subname );
50
52
static void ReportSlotConnectionError (List * rstates , Oid subid , char * slotname , char * err );
51
53
52
54
@@ -293,8 +295,6 @@ publicationListToArray(List *publist)
293
295
{
294
296
ArrayType * arr ;
295
297
Datum * datums ;
296
- int j = 0 ;
297
- ListCell * cell ;
298
298
MemoryContext memcxt ;
299
299
MemoryContext oldcxt ;
300
300
@@ -306,28 +306,7 @@ publicationListToArray(List *publist)
306
306
307
307
datums = (Datum * ) palloc (sizeof (Datum ) * list_length (publist ));
308
308
309
- foreach (cell , publist )
310
- {
311
- char * name = strVal (lfirst (cell ));
312
- ListCell * pcell ;
313
-
314
- /* Check for duplicates. */
315
- foreach (pcell , publist )
316
- {
317
- char * pname = strVal (lfirst (pcell ));
318
-
319
- if (pcell == cell )
320
- break ;
321
-
322
- if (strcmp (name , pname ) == 0 )
323
- ereport (ERROR ,
324
- (errcode (ERRCODE_SYNTAX_ERROR ),
325
- errmsg ("publication name \"%s\" used more than once" ,
326
- pname )));
327
- }
328
-
329
- datums [j ++ ] = CStringGetTextDatum (name );
330
- }
309
+ check_duplicates_in_publist (publist , datums );
331
310
332
311
MemoryContextSwitchTo (oldcxt );
333
312
@@ -923,7 +902,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
923
902
update_tuple = true;
924
903
break ;
925
904
926
- case ALTER_SUBSCRIPTION_PUBLICATION :
905
+ case ALTER_SUBSCRIPTION_SET_PUBLICATION :
927
906
{
928
907
bool copy_data ;
929
908
bool refresh ;
@@ -964,6 +943,54 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
964
943
break ;
965
944
}
966
945
946
+ case ALTER_SUBSCRIPTION_ADD_PUBLICATION :
947
+ case ALTER_SUBSCRIPTION_DROP_PUBLICATION :
948
+ {
949
+ bool isadd = stmt -> kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION ;
950
+ bool copy_data ;
951
+ bool refresh ;
952
+ List * publist ;
953
+
954
+ publist = merge_publications (sub -> publications , stmt -> publication , isadd , stmt -> subname );
955
+
956
+ parse_subscription_options (stmt -> options ,
957
+ NULL , /* no "connect" */
958
+ NULL , NULL , /* no "enabled" */
959
+ NULL , /* no "create_slot" */
960
+ NULL , NULL , /* no "slot_name" */
961
+ isadd ? & copy_data : NULL , /* for drop, no
962
+ * "copy_data" */
963
+ NULL , /* no "synchronous_commit" */
964
+ & refresh ,
965
+ NULL , NULL , /* no "binary" */
966
+ NULL , NULL ); /* no "streaming" */
967
+
968
+ values [Anum_pg_subscription_subpublications - 1 ] =
969
+ publicationListToArray (publist );
970
+ replaces [Anum_pg_subscription_subpublications - 1 ] = true;
971
+
972
+ update_tuple = true;
973
+
974
+ /* Refresh if user asked us to. */
975
+ if (refresh )
976
+ {
977
+ if (!sub -> enabled )
978
+ ereport (ERROR ,
979
+ (errcode (ERRCODE_SYNTAX_ERROR ),
980
+ errmsg ("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions" ),
981
+ errhint ("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)." )));
982
+
983
+ PreventInTransactionBlock (isTopLevel , "ALTER SUBSCRIPTION with refresh" );
984
+
985
+ /* Only refresh the added/dropped list of publications. */
986
+ sub -> publications = stmt -> publication ;
987
+
988
+ AlterSubscription_refresh (sub , copy_data );
989
+ }
990
+
991
+ break ;
992
+ }
993
+
967
994
case ALTER_SUBSCRIPTION_REFRESH :
968
995
{
969
996
bool copy_data ;
@@ -1548,3 +1575,103 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
1548
1575
errhint ("Use %s to disassociate the subscription from the slot." ,
1549
1576
"ALTER SUBSCRIPTION ... SET (slot_name = NONE)" )));
1550
1577
}
1578
+
1579
+ /*
1580
+ * Check for duplicates in the given list of publications and error out if
1581
+ * found one. Add publications to datums as text datums, if datums is not
1582
+ * NULL.
1583
+ */
1584
+ static void
1585
+ check_duplicates_in_publist (List * publist , Datum * datums )
1586
+ {
1587
+ ListCell * cell ;
1588
+ int j = 0 ;
1589
+
1590
+ foreach (cell , publist )
1591
+ {
1592
+ char * name = strVal (lfirst (cell ));
1593
+ ListCell * pcell ;
1594
+
1595
+ foreach (pcell , publist )
1596
+ {
1597
+ char * pname = strVal (lfirst (pcell ));
1598
+
1599
+ if (pcell == cell )
1600
+ break ;
1601
+
1602
+ if (strcmp (name , pname ) == 0 )
1603
+ ereport (ERROR ,
1604
+ (errcode (ERRCODE_SYNTAX_ERROR ),
1605
+ errmsg ("publication name \"%s\" used more than once" ,
1606
+ pname )));
1607
+ }
1608
+
1609
+ if (datums )
1610
+ datums [j ++ ] = CStringGetTextDatum (name );
1611
+ }
1612
+ }
1613
+
1614
+ /*
1615
+ * Merge current subscription's publications and user-specified publications
1616
+ * from ADD/DROP PUBLICATIONS.
1617
+ *
1618
+ * If addpub is true, we will add the list of publications into oldpublist.
1619
+ * Otherwise, we will delete the list of publications from oldpublist. The
1620
+ * returned list is a copy, oldpublist itself is not changed.
1621
+ *
1622
+ * subname is the subscription name, for error messages.
1623
+ */
1624
+ static List *
1625
+ merge_publications (List * oldpublist , List * newpublist , bool addpub , const char * subname )
1626
+ {
1627
+ ListCell * lc ;
1628
+
1629
+ oldpublist = list_copy (oldpublist );
1630
+
1631
+ check_duplicates_in_publist (newpublist , NULL );
1632
+
1633
+ foreach (lc , newpublist )
1634
+ {
1635
+ char * name = strVal (lfirst (lc ));
1636
+ ListCell * lc2 ;
1637
+ bool found = false;
1638
+
1639
+ foreach (lc2 , oldpublist )
1640
+ {
1641
+ char * pubname = strVal (lfirst (lc2 ));
1642
+
1643
+ if (strcmp (name , pubname ) == 0 )
1644
+ {
1645
+ found = true;
1646
+ if (addpub )
1647
+ ereport (ERROR ,
1648
+ (errcode (ERRCODE_DUPLICATE_OBJECT ),
1649
+ errmsg ("publication \"%s\" is already in subscription \"%s\"" ,
1650
+ name , subname )));
1651
+ else
1652
+ oldpublist = foreach_delete_current (oldpublist , lc2 );
1653
+
1654
+ break ;
1655
+ }
1656
+ }
1657
+
1658
+ if (addpub && !found )
1659
+ oldpublist = lappend (oldpublist , makeString (name ));
1660
+ else if (!addpub && !found )
1661
+ ereport (ERROR ,
1662
+ (errcode (ERRCODE_SYNTAX_ERROR ),
1663
+ errmsg ("publication \"%s\" is not in subscription \"%s\"" ,
1664
+ name , subname )));
1665
+ }
1666
+
1667
+ /*
1668
+ * XXX Probably no strong reason for this, but for now it's to make ALTER
1669
+ * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
1670
+ */
1671
+ if (!oldpublist )
1672
+ ereport (ERROR ,
1673
+ (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
1674
+ errmsg ("subscription must contain at least one publication" )));
1675
+
1676
+ return oldpublist ;
1677
+ }
0 commit comments