Skip to content

Commit e5aeed4

Browse files
author
Amit Kapila
committed
pg_createsubscriber: Add -R publications option.
This patch introduces a new '-R'/'--remove' option in the 'pg_createsubscriber' utility to specify the object types to be removed from the subscriber. Currently, we add support to specify 'publications' as an object type. In the future, other object types like failover-slots could be added. This feature allows optionally to remove publications on the subscriber that were replicated from the primary server (before running this tool) during physical replication. Users may want to retain these publications in case they want some pre-existing subscribers to point to the newly created subscriber. Author: Shubham Khanna <khannashubham1197@gmail.com> Reviewed-by: Peter Smith <smithpb2250@gmail.com> Reviewed-by: David G. Johnston <david.g.johnston@gmail.com> Reviewed-by: Euler Taveira <euler@eulerto.com> Reviewed-by: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: vignesh C <vignesh21@gmail.com> Reviewed-by: Nisha Moond <nisha.moond412@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Discussion: https://postgr.es/m/CAHv8RjL4OvoYafofTb_U_JD5HuyoNowBoGpMfnEbhDSENA74Kg@mail.gmail.com
1 parent 5941946 commit e5aeed4

File tree

3 files changed

+136
-17
lines changed

3 files changed

+136
-17
lines changed

doc/src/sgml/ref/pg_createsubscriber.sgml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,29 @@ PostgreSQL documentation
146146
</listitem>
147147
</varlistentry>
148148

149+
<varlistentry>
150+
<term><option>-R</option></term>
151+
<term><option>--remove</option></term>
152+
<listitem>
153+
<para>
154+
Remove all objects of the specified type from specified databases on the
155+
target server.
156+
</para>
157+
<para>
158+
publications: The "all tables" publications established for this
159+
subscriber are always removed; specifying this object type causes all
160+
other publications replicated from the source server to be dropped as
161+
well.
162+
</para>
163+
<para>
164+
The objects selected to be dropped are individually logged and do show
165+
up in a --dry-run. There is no opportunity to affect or stop the
166+
dropping of the selected objects so consider taking a backup of them
167+
using pg_dump.
168+
</para>
169+
</listitem>
170+
</varlistentry>
171+
149172
<varlistentry>
150173
<term><option>-s <replaceable class="parameter">dir</replaceable></option></term>
151174
<term><option>--socketdir=<replaceable class="parameter">dir</replaceable></option></term>

src/bin/pg_basebackup/pg_createsubscriber.c

Lines changed: 90 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "getopt_long.h"
3030

3131
#define DEFAULT_SUB_PORT "50432"
32+
#define OBJECTTYPE_PUBLICATIONS 0x0001
3233

3334
/* Command-line options */
3435
struct CreateSubscriberOptions
@@ -44,6 +45,7 @@ struct CreateSubscriberOptions
4445
SimpleStringList sub_names; /* list of subscription names */
4546
SimpleStringList replslot_names; /* list of replication slot names */
4647
int recovery_timeout; /* stop recovery after this time */
48+
SimpleStringList objecttypes_to_remove; /* list of object types to remove */
4749
};
4850

4951
/* per-database publication/subscription info */
@@ -68,6 +70,8 @@ struct LogicalRepInfos
6870
{
6971
struct LogicalRepInfo *dbinfo;
7072
bool two_phase; /* enable-two-phase option */
73+
bits32 objecttypes_to_remove; /* flags indicating which object types
74+
* to remove on subscriber */
7175
};
7276

7377
static void cleanup_objects_atexit(void);
@@ -109,7 +113,9 @@ static void stop_standby_server(const char *datadir);
109113
static void wait_for_end_recovery(const char *conninfo,
110114
const struct CreateSubscriberOptions *opt);
111115
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
112-
static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
116+
static void drop_publication(PGconn *conn, const char *pubname,
117+
const char *dbname, bool *made_publication);
118+
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
113119
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
114120
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
115121
const char *lsn);
@@ -194,7 +200,8 @@ cleanup_objects_atexit(void)
194200
if (conn != NULL)
195201
{
196202
if (dbinfo->made_publication)
197-
drop_publication(conn, dbinfo);
203+
drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
204+
&dbinfo->made_publication);
198205
if (dbinfo->made_replslot)
199206
drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
200207
disconnect_database(conn, false);
@@ -241,6 +248,8 @@ usage(void)
241248
printf(_(" -n, --dry-run dry run, just show what would be done\n"));
242249
printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
243250
printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
251+
printf(_(" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"
252+
" databases on the subscriber; accepts: publications\n"));
244253
printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
245254
printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
246255
printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
@@ -1193,12 +1202,8 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
11931202
*/
11941203
check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
11951204

1196-
/*
1197-
* Since the publication was created before the consistent LSN, it is
1198-
* available on the subscriber when the physical replica is promoted.
1199-
* Remove publications from the subscriber because it has no use.
1200-
*/
1201-
drop_publication(conn, &dbinfo[i]);
1205+
/* Check and drop the required publications in the given database. */
1206+
check_and_drop_publications(conn, &dbinfo[i]);
12021207

12031208
create_subscription(conn, &dbinfo[i]);
12041209

@@ -1663,21 +1668,22 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
16631668
}
16641669

16651670
/*
1666-
* Remove publication if it couldn't finish all steps.
1671+
* Drop the specified publication in the given database.
16671672
*/
16681673
static void
1669-
drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1674+
drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1675+
bool *made_publication)
16701676
{
16711677
PQExpBuffer str = createPQExpBuffer();
16721678
PGresult *res;
16731679
char *pubname_esc;
16741680

16751681
Assert(conn != NULL);
16761682

1677-
pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1683+
pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
16781684

16791685
pg_log_info("dropping publication \"%s\" in database \"%s\"",
1680-
dbinfo->pubname, dbinfo->dbname);
1686+
pubname, dbname);
16811687

16821688
appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
16831689

@@ -1691,8 +1697,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
16911697
if (PQresultStatus(res) != PGRES_COMMAND_OK)
16921698
{
16931699
pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1694-
dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1695-
dbinfo->made_publication = false; /* don't try again. */
1700+
pubname, dbname, PQresultErrorMessage(res));
1701+
*made_publication = false; /* don't try again. */
16961702

16971703
/*
16981704
* Don't disconnect and exit here. This routine is used by primary
@@ -1708,6 +1714,55 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
17081714
destroyPQExpBuffer(str);
17091715
}
17101716

1717+
/*
1718+
* Retrieve and drop the publications.
1719+
*
1720+
* Since the publications were created before the consistent LSN, they
1721+
* remain on the subscriber even after the physical replica is
1722+
* promoted. Remove these publications from the subscriber because
1723+
* they have no use. Additionally, if requested, drop all pre-existing
1724+
* publications.
1725+
*/
1726+
static void
1727+
check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
1728+
{
1729+
PGresult *res;
1730+
bool drop_all_pubs = dbinfos.objecttypes_to_remove & OBJECTTYPE_PUBLICATIONS;
1731+
1732+
Assert(conn != NULL);
1733+
1734+
if (drop_all_pubs)
1735+
{
1736+
pg_log_info("dropping all existing publications in database \"%s\"",
1737+
dbinfo->dbname);
1738+
1739+
/* Fetch all publication names */
1740+
res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1741+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
1742+
{
1743+
pg_log_error("could not obtain publication information: %s",
1744+
PQresultErrorMessage(res));
1745+
PQclear(res);
1746+
disconnect_database(conn, true);
1747+
}
1748+
1749+
/* Drop each publication */
1750+
for (int i = 0; i < PQntuples(res); i++)
1751+
drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1752+
&dbinfo->made_publication);
1753+
1754+
PQclear(res);
1755+
}
1756+
1757+
/*
1758+
* In dry-run mode, we don't create publications, but we still try to drop
1759+
* those to provide necessary information to the user.
1760+
*/
1761+
if (!drop_all_pubs || dry_run)
1762+
drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1763+
&dbinfo->made_publication);
1764+
}
1765+
17111766
/*
17121767
* Create a subscription with some predefined options.
17131768
*
@@ -1914,6 +1969,7 @@ main(int argc, char **argv)
19141969
{"dry-run", no_argument, NULL, 'n'},
19151970
{"subscriber-port", required_argument, NULL, 'p'},
19161971
{"publisher-server", required_argument, NULL, 'P'},
1972+
{"remove", required_argument, NULL, 'R'},
19171973
{"socketdir", required_argument, NULL, 's'},
19181974
{"recovery-timeout", required_argument, NULL, 't'},
19191975
{"enable-two-phase", no_argument, NULL, 'T'},
@@ -1995,7 +2051,7 @@ main(int argc, char **argv)
19952051

19962052
get_restricted_token();
19972053

1998-
while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
2054+
while ((c = getopt_long(argc, argv, "d:D:np:P:R:s:t:TU:v",
19992055
long_options, &option_index)) != -1)
20002056
{
20012057
switch (c)
@@ -2025,6 +2081,12 @@ main(int argc, char **argv)
20252081
case 'P':
20262082
opt.pub_conninfo_str = pg_strdup(optarg);
20272083
break;
2084+
case 'R':
2085+
if (!simple_string_list_member(&opt.objecttypes_to_remove, optarg))
2086+
simple_string_list_append(&opt.objecttypes_to_remove, optarg);
2087+
else
2088+
pg_fatal("object type \"%s\" is specified more than once for --remove", optarg);
2089+
break;
20282090
case 's':
20292091
opt.socket_dir = pg_strdup(optarg);
20302092
canonicalize_path(opt.socket_dir);
@@ -2189,6 +2251,19 @@ main(int argc, char **argv)
21892251
exit(1);
21902252
}
21912253

2254+
/* Verify the object types specified for removal from the subscriber */
2255+
for (SimpleStringListCell *cell = opt.objecttypes_to_remove.head; cell; cell = cell->next)
2256+
{
2257+
if (pg_strcasecmp(cell->val, "publications") == 0)
2258+
dbinfos.objecttypes_to_remove |= OBJECTTYPE_PUBLICATIONS;
2259+
else
2260+
{
2261+
pg_log_error("invalid object type \"%s\" specified for --remove", cell->val);
2262+
pg_log_error_hint("The valid option is: \"publications\"");
2263+
exit(1);
2264+
}
2265+
}
2266+
21922267
/* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
21932268
pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
21942269
pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");

src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,21 @@ sub generate_db
329329
"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
330330
);
331331
$node_p->wait_for_replay_catchup($node_s);
332+
333+
# Create user-defined publications, wait for streaming replication to sync them
334+
# to the standby, then verify that '--remove'
335+
# removes them.
336+
$node_p->safe_psql(
337+
$db1, qq(
338+
CREATE PUBLICATION test_pub1 FOR ALL TABLES;
339+
CREATE PUBLICATION test_pub2 FOR ALL TABLES;
340+
));
341+
342+
$node_p->wait_for_replay_catchup($node_s);
343+
344+
ok($node_s->safe_psql($db1, "SELECT COUNT(*) = 2 FROM pg_publication"),
345+
'two pre-existing publications on subscriber');
346+
332347
$node_s->stop;
333348

334349
# dry run mode on node S
@@ -373,7 +388,8 @@ sub generate_db
373388

374389
# Run pg_createsubscriber on node S. --verbose is used twice
375390
# to show more information.
376-
# In passing, also test the --enable-two-phase option
391+
# In passing, also test the --enable-two-phase option and
392+
# --remove option
377393
command_ok(
378394
[
379395
'pg_createsubscriber',
@@ -389,7 +405,8 @@ sub generate_db
389405
'--replication-slot' => 'replslot2',
390406
'--database' => $db1,
391407
'--database' => $db2,
392-
'--enable-two-phase'
408+
'--enable-two-phase',
409+
'--remove' => 'publications',
393410
],
394411
'run pg_createsubscriber on node S');
395412

@@ -408,6 +425,10 @@ sub generate_db
408425
# Start subscriber
409426
$node_s->start;
410427

428+
# Confirm publications are removed from the subscriber node
429+
is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
430+
'0', 'all publications on subscriber have been removed');
431+
411432
# Verify that all subtwophase states are pending or enabled,
412433
# e.g. there are no subscriptions where subtwophase is disabled ('d')
413434
is( $node_s->safe_psql(

0 commit comments

Comments
 (0)