Skip to content

Commit 82ed774

Browse files
committed
ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
At present, if we want to update publications in a subscription, we can use SET PUBLICATION. However, it requires supplying all publications that exists and the new publications. If we want to add new publications, it's inconvenient. The new syntax only supplies the new publications. When the refresh is true, it only refreshes the new publications. Author: Japin Li <japinli@hotmail.com> Author: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Discussion: https://www.postgresql.org/message-id/flat/MEYP282MB166939D0D6C480B7FBE7EFFBB6BC0@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM
1 parent 266b567 commit 82ed774

File tree

7 files changed

+267
-37
lines changed

7 files changed

+267
-37
lines changed

doc/src/sgml/ref/alter_subscription.sgml

+14-5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ PostgreSQL documentation
2323
<synopsis>
2424
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
2525
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
26+
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
27+
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DROP PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
2628
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUBLICATION [ WITH ( <replaceable class="parameter">refresh_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
2729
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
2830
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
@@ -63,7 +65,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
6365

6466
<para>
6567
Commands <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command> and
66-
<command>ALTER SUBSCRIPTION ... SET PUBLICATION ...</command> with refresh
68+
<command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command> with refresh
6769
option as true cannot be executed inside a transaction block.
6870
</para>
6971
</refsect1>
@@ -94,12 +96,19 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
9496

9597
<varlistentry>
9698
<term><literal>SET PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
99+
<term><literal>ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
100+
<term><literal>DROP PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
97101
<listitem>
98102
<para>
99-
Changes list of subscribed publications. See
100-
<xref linkend="sql-createsubscription"/> for more information.
101-
By default this command will also act like <literal>REFRESH
102-
PUBLICATION</literal>.
103+
Changes the list of subscribed publications. <literal>SET</literal>
104+
replaces the entire list of publications with a new list,
105+
<literal>ADD</literal> adds additional publications,
106+
<literal>DROP</literal> removes publications from the list of
107+
publications. See <xref linkend="sql-createsubscription"/> for more
108+
information. By default, this command will also act like
109+
<literal>REFRESH PUBLICATION</literal>, except that in case of
110+
<literal>ADD</literal> or <literal>DROP</literal>, only the added or
111+
dropped publications are refreshed.
103112
</para>
104113

105114
<para>

src/backend/commands/subscriptioncmds.c

+152-25
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
#include "utils/syscache.h"
4848

4949
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
50+
static void check_duplicates_in_publist(List *publist, Datum *datums);
51+
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
5052
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
5153

5254

@@ -293,8 +295,6 @@ publicationListToArray(List *publist)
293295
{
294296
ArrayType *arr;
295297
Datum *datums;
296-
int j = 0;
297-
ListCell *cell;
298298
MemoryContext memcxt;
299299
MemoryContext oldcxt;
300300

@@ -306,28 +306,7 @@ publicationListToArray(List *publist)
306306

307307
datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
308308

309-
foreach(cell, publist)
310-
{
311-
char *name = strVal(lfirst(cell));
312-
ListCell *pcell;
313-
314-
/* Check for duplicates. */
315-
foreach(pcell, publist)
316-
{
317-
char *pname = strVal(lfirst(pcell));
318-
319-
if (pcell == cell)
320-
break;
321-
322-
if (strcmp(name, pname) == 0)
323-
ereport(ERROR,
324-
(errcode(ERRCODE_SYNTAX_ERROR),
325-
errmsg("publication name \"%s\" used more than once",
326-
pname)));
327-
}
328-
329-
datums[j++] = CStringGetTextDatum(name);
330-
}
309+
check_duplicates_in_publist(publist, datums);
331310

332311
MemoryContextSwitchTo(oldcxt);
333312

@@ -923,7 +902,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
923902
update_tuple = true;
924903
break;
925904

926-
case ALTER_SUBSCRIPTION_PUBLICATION:
905+
case ALTER_SUBSCRIPTION_SET_PUBLICATION:
927906
{
928907
bool copy_data;
929908
bool refresh;
@@ -964,6 +943,54 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
964943
break;
965944
}
966945

946+
case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
947+
case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
948+
{
949+
bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
950+
bool copy_data;
951+
bool refresh;
952+
List *publist;
953+
954+
publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
955+
956+
parse_subscription_options(stmt->options,
957+
NULL, /* no "connect" */
958+
NULL, NULL, /* no "enabled" */
959+
NULL, /* no "create_slot" */
960+
NULL, NULL, /* no "slot_name" */
961+
isadd ? &copy_data : NULL, /* for drop, no
962+
* "copy_data" */
963+
NULL, /* no "synchronous_commit" */
964+
&refresh,
965+
NULL, NULL, /* no "binary" */
966+
NULL, NULL); /* no "streaming" */
967+
968+
values[Anum_pg_subscription_subpublications - 1] =
969+
publicationListToArray(publist);
970+
replaces[Anum_pg_subscription_subpublications - 1] = true;
971+
972+
update_tuple = true;
973+
974+
/* Refresh if user asked us to. */
975+
if (refresh)
976+
{
977+
if (!sub->enabled)
978+
ereport(ERROR,
979+
(errcode(ERRCODE_SYNTAX_ERROR),
980+
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
981+
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
982+
983+
PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
984+
985+
/* Only refresh the added/dropped list of publications. */
986+
sub->publications = stmt->publication;
987+
988+
AlterSubscription_refresh(sub, copy_data);
989+
}
990+
991+
break;
992+
}
993+
967994
case ALTER_SUBSCRIPTION_REFRESH:
968995
{
969996
bool copy_data;
@@ -1548,3 +1575,103 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
15481575
errhint("Use %s to disassociate the subscription from the slot.",
15491576
"ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
15501577
}
1578+
1579+
/*
1580+
* Check for duplicates in the given list of publications and error out if
1581+
* found one. Add publications to datums as text datums, if datums is not
1582+
* NULL.
1583+
*/
1584+
static void
1585+
check_duplicates_in_publist(List *publist, Datum *datums)
1586+
{
1587+
ListCell *cell;
1588+
int j = 0;
1589+
1590+
foreach(cell, publist)
1591+
{
1592+
char *name = strVal(lfirst(cell));
1593+
ListCell *pcell;
1594+
1595+
foreach(pcell, publist)
1596+
{
1597+
char *pname = strVal(lfirst(pcell));
1598+
1599+
if (pcell == cell)
1600+
break;
1601+
1602+
if (strcmp(name, pname) == 0)
1603+
ereport(ERROR,
1604+
(errcode(ERRCODE_SYNTAX_ERROR),
1605+
errmsg("publication name \"%s\" used more than once",
1606+
pname)));
1607+
}
1608+
1609+
if (datums)
1610+
datums[j++] = CStringGetTextDatum(name);
1611+
}
1612+
}
1613+
1614+
/*
1615+
* Merge current subscription's publications and user-specified publications
1616+
* from ADD/DROP PUBLICATIONS.
1617+
*
1618+
* If addpub is true, we will add the list of publications into oldpublist.
1619+
* Otherwise, we will delete the list of publications from oldpublist. The
1620+
* returned list is a copy, oldpublist itself is not changed.
1621+
*
1622+
* subname is the subscription name, for error messages.
1623+
*/
1624+
static List *
1625+
merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
1626+
{
1627+
ListCell *lc;
1628+
1629+
oldpublist = list_copy(oldpublist);
1630+
1631+
check_duplicates_in_publist(newpublist, NULL);
1632+
1633+
foreach(lc, newpublist)
1634+
{
1635+
char *name = strVal(lfirst(lc));
1636+
ListCell *lc2;
1637+
bool found = false;
1638+
1639+
foreach(lc2, oldpublist)
1640+
{
1641+
char *pubname = strVal(lfirst(lc2));
1642+
1643+
if (strcmp(name, pubname) == 0)
1644+
{
1645+
found = true;
1646+
if (addpub)
1647+
ereport(ERROR,
1648+
(errcode(ERRCODE_DUPLICATE_OBJECT),
1649+
errmsg("publication \"%s\" is already in subscription \"%s\"",
1650+
name, subname)));
1651+
else
1652+
oldpublist = foreach_delete_current(oldpublist, lc2);
1653+
1654+
break;
1655+
}
1656+
}
1657+
1658+
if (addpub && !found)
1659+
oldpublist = lappend(oldpublist, makeString(name));
1660+
else if (!addpub && !found)
1661+
ereport(ERROR,
1662+
(errcode(ERRCODE_SYNTAX_ERROR),
1663+
errmsg("publication \"%s\" is not in subscription \"%s\"",
1664+
name, subname)));
1665+
}
1666+
1667+
/*
1668+
* XXX Probably no strong reason for this, but for now it's to make ALTER
1669+
* SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
1670+
*/
1671+
if (!oldpublist)
1672+
ereport(ERROR,
1673+
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1674+
errmsg("subscription must contain at least one publication")));
1675+
1676+
return oldpublist;
1677+
}

src/backend/parser/gram.y

+21-1
Original file line numberDiff line numberDiff line change
@@ -9687,11 +9687,31 @@ AlterSubscriptionStmt:
96879687
n->options = $6;
96889688
$$ = (Node *)n;
96899689
}
9690+
| ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
9691+
{
9692+
AlterSubscriptionStmt *n =
9693+
makeNode(AlterSubscriptionStmt);
9694+
n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION;
9695+
n->subname = $3;
9696+
n->publication = $6;
9697+
n->options = $7;
9698+
$$ = (Node *)n;
9699+
}
9700+
| ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition
9701+
{
9702+
AlterSubscriptionStmt *n =
9703+
makeNode(AlterSubscriptionStmt);
9704+
n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION;
9705+
n->subname = $3;
9706+
n->publication = $6;
9707+
n->options = $7;
9708+
$$ = (Node *)n;
9709+
}
96909710
| ALTER SUBSCRIPTION name SET PUBLICATION name_list opt_definition
96919711
{
96929712
AlterSubscriptionStmt *n =
96939713
makeNode(AlterSubscriptionStmt);
9694-
n->kind = ALTER_SUBSCRIPTION_PUBLICATION;
9714+
n->kind = ALTER_SUBSCRIPTION_SET_PUBLICATION;
96959715
n->subname = $3;
96969716
n->publication = $6;
96979717
n->options = $7;

src/bin/psql/tab-complete.c

+7-5
Original file line numberDiff line numberDiff line change
@@ -1652,7 +1652,8 @@ psql_completion(const char *text, int start, int end)
16521652
/* ALTER SUBSCRIPTION <name> */
16531653
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
16541654
COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
1655-
"RENAME TO", "REFRESH PUBLICATION", "SET");
1655+
"RENAME TO", "REFRESH PUBLICATION", "SET",
1656+
"ADD PUBLICATION", "DROP PUBLICATION");
16561657
/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
16571658
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
16581659
TailMatches("REFRESH", "PUBLICATION"))
@@ -1672,14 +1673,15 @@ psql_completion(const char *text, int start, int end)
16721673
{
16731674
/* complete with nothing here as this refers to remote publications */
16741675
}
1675-
/* ALTER SUBSCRIPTION <name> SET PUBLICATION <name> */
1676+
/* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> */
16761677
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
1677-
TailMatches("SET", "PUBLICATION", MatchAny))
1678+
TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny))
16781679
COMPLETE_WITH("WITH (");
1679-
/* ALTER SUBSCRIPTION <name> SET PUBLICATION <name> WITH ( */
1680+
/* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> WITH ( */
16801681
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
1681-
TailMatches("SET", "PUBLICATION", MatchAny, "WITH", "("))
1682+
TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "("))
16821683
COMPLETE_WITH("copy_data", "refresh");
1684+
16831685
/* ALTER SCHEMA <name> */
16841686
else if (Matches("ALTER", "SCHEMA", MatchAny))
16851687
COMPLETE_WITH("OWNER TO", "RENAME TO");

src/include/nodes/parsenodes.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -3618,7 +3618,9 @@ typedef enum AlterSubscriptionType
36183618
{
36193619
ALTER_SUBSCRIPTION_OPTIONS,
36203620
ALTER_SUBSCRIPTION_CONNECTION,
3621-
ALTER_SUBSCRIPTION_PUBLICATION,
3621+
ALTER_SUBSCRIPTION_SET_PUBLICATION,
3622+
ALTER_SUBSCRIPTION_ADD_PUBLICATION,
3623+
ALTER_SUBSCRIPTION_DROP_PUBLICATION,
36223624
ALTER_SUBSCRIPTION_REFRESH,
36233625
ALTER_SUBSCRIPTION_ENABLED
36243626
} AlterSubscriptionType;

src/test/regress/expected/subscription.out

+39
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,45 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
200200
regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
201201
(1 row)
202202

203+
-- fail - publication already exists
204+
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub WITH (refresh = false);
205+
ERROR: publication "testpub" is already in subscription "regress_testsub"
206+
-- fail - publication used more than once
207+
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub1 WITH (refresh = false);
208+
ERROR: publication name "testpub1" used more than once
209+
-- ok - add two publications into subscription
210+
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
211+
-- fail - publications already exist
212+
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
213+
ERROR: publication "testpub1" is already in subscription "regress_testsub"
214+
\dRs+
215+
List of subscriptions
216+
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
217+
-----------------+---------------------------+---------+-----------------------------+--------+-----------+--------------------+-----------------------------
218+
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | off | dbname=regress_doesnotexist
219+
(1 row)
220+
221+
-- fail - publication used more then once
222+
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub1 WITH (refresh = false);
223+
ERROR: publication name "testpub1" used more than once
224+
-- fail - all publications are deleted
225+
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 WITH (refresh = false);
226+
ERROR: subscription must contain at least one publication
227+
-- fail - publication does not exist in subscription
228+
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false);
229+
ERROR: publication "testpub3" is not in subscription "regress_testsub"
230+
-- fail - do not support copy_data option
231+
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true);
232+
ERROR: unrecognized subscription parameter: "copy_data"
233+
-- ok - delete publications
234+
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
235+
\dRs+
236+
List of subscriptions
237+
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
238+
-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
239+
regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
240+
(1 row)
241+
203242
DROP SUBSCRIPTION regress_testsub;
204243
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION mypub
205244
WITH (connect = false, create_slot = false, copy_data = false);

0 commit comments

Comments
 (0)