29
29
#include "getopt_long.h"
30
30
31
31
#define DEFAULT_SUB_PORT "50432"
32
+ #define OBJECTTYPE_PUBLICATIONS 0x0001
32
33
33
34
/* Command-line options */
34
35
struct CreateSubscriberOptions
@@ -44,6 +45,7 @@ struct CreateSubscriberOptions
44
45
SimpleStringList sub_names ; /* list of subscription names */
45
46
SimpleStringList replslot_names ; /* list of replication slot names */
46
47
int recovery_timeout ; /* stop recovery after this time */
48
+ SimpleStringList objecttypes_to_remove ; /* list of object types to remove */
47
49
};
48
50
49
51
/* per-database publication/subscription info */
@@ -68,6 +70,8 @@ struct LogicalRepInfos
68
70
{
69
71
struct LogicalRepInfo * dbinfo ;
70
72
bool two_phase ; /* enable-two-phase option */
73
+ bits32 objecttypes_to_remove ; /* flags indicating which object types
74
+ * to remove on subscriber */
71
75
};
72
76
73
77
static void cleanup_objects_atexit (void );
@@ -109,7 +113,9 @@ static void stop_standby_server(const char *datadir);
109
113
static void wait_for_end_recovery (const char * conninfo ,
110
114
const struct CreateSubscriberOptions * opt );
111
115
static void create_publication (PGconn * conn , struct LogicalRepInfo * dbinfo );
112
- static void drop_publication (PGconn * conn , struct LogicalRepInfo * dbinfo );
116
+ static void drop_publication (PGconn * conn , const char * pubname ,
117
+ const char * dbname , bool * made_publication );
118
+ static void check_and_drop_publications (PGconn * conn , struct LogicalRepInfo * dbinfo );
113
119
static void create_subscription (PGconn * conn , const struct LogicalRepInfo * dbinfo );
114
120
static void set_replication_progress (PGconn * conn , const struct LogicalRepInfo * dbinfo ,
115
121
const char * lsn );
@@ -194,7 +200,8 @@ cleanup_objects_atexit(void)
194
200
if (conn != NULL )
195
201
{
196
202
if (dbinfo -> made_publication )
197
- drop_publication (conn , dbinfo );
203
+ drop_publication (conn , dbinfo -> pubname , dbinfo -> dbname ,
204
+ & dbinfo -> made_publication );
198
205
if (dbinfo -> made_replslot )
199
206
drop_replication_slot (conn , dbinfo , dbinfo -> replslotname );
200
207
disconnect_database (conn , false);
@@ -241,6 +248,8 @@ usage(void)
241
248
printf (_ (" -n, --dry-run dry run, just show what would be done\n" ));
242
249
printf (_ (" -p, --subscriber-port=PORT subscriber port number (default %s)\n" ), DEFAULT_SUB_PORT );
243
250
printf (_ (" -P, --publisher-server=CONNSTR publisher connection string\n" ));
251
+ printf (_ (" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"
252
+ " databases on the subscriber; accepts: publications\n" ));
244
253
printf (_ (" -s, --socketdir=DIR socket directory to use (default current dir.)\n" ));
245
254
printf (_ (" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n" ));
246
255
printf (_ (" -T, --enable-two-phase enable two-phase commit for all subscriptions\n" ));
@@ -1193,12 +1202,8 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1193
1202
*/
1194
1203
check_and_drop_existing_subscriptions (conn , & dbinfo [i ]);
1195
1204
1196
- /*
1197
- * Since the publication was created before the consistent LSN, it is
1198
- * available on the subscriber when the physical replica is promoted.
1199
- * Remove publications from the subscriber because it has no use.
1200
- */
1201
- drop_publication (conn , & dbinfo [i ]);
1205
+ /* Check and drop the required publications in the given database. */
1206
+ check_and_drop_publications (conn , & dbinfo [i ]);
1202
1207
1203
1208
create_subscription (conn , & dbinfo [i ]);
1204
1209
@@ -1663,21 +1668,22 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1663
1668
}
1664
1669
1665
1670
/*
1666
- * Remove publication if it couldn't finish all steps .
1671
+ * Drop the specified publication in the given database .
1667
1672
*/
1668
1673
static void
1669
- drop_publication (PGconn * conn , struct LogicalRepInfo * dbinfo )
1674
+ drop_publication (PGconn * conn , const char * pubname , const char * dbname ,
1675
+ bool * made_publication )
1670
1676
{
1671
1677
PQExpBuffer str = createPQExpBuffer ();
1672
1678
PGresult * res ;
1673
1679
char * pubname_esc ;
1674
1680
1675
1681
Assert (conn != NULL );
1676
1682
1677
- pubname_esc = PQescapeIdentifier (conn , dbinfo -> pubname , strlen (dbinfo -> pubname ));
1683
+ pubname_esc = PQescapeIdentifier (conn , pubname , strlen (pubname ));
1678
1684
1679
1685
pg_log_info ("dropping publication \"%s\" in database \"%s\"" ,
1680
- dbinfo -> pubname , dbinfo -> dbname );
1686
+ pubname , dbname );
1681
1687
1682
1688
appendPQExpBuffer (str , "DROP PUBLICATION %s" , pubname_esc );
1683
1689
@@ -1691,8 +1697,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1691
1697
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
1692
1698
{
1693
1699
pg_log_error ("could not drop publication \"%s\" in database \"%s\": %s" ,
1694
- dbinfo -> pubname , dbinfo -> dbname , PQresultErrorMessage (res ));
1695
- dbinfo -> made_publication = false; /* don't try again. */
1700
+ pubname , dbname , PQresultErrorMessage (res ));
1701
+ * made_publication = false; /* don't try again. */
1696
1702
1697
1703
/*
1698
1704
* Don't disconnect and exit here. This routine is used by primary
@@ -1708,6 +1714,55 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1708
1714
destroyPQExpBuffer (str );
1709
1715
}
1710
1716
1717
+ /*
1718
+ * Retrieve and drop the publications.
1719
+ *
1720
+ * Since the publications were created before the consistent LSN, they
1721
+ * remain on the subscriber even after the physical replica is
1722
+ * promoted. Remove these publications from the subscriber because
1723
+ * they have no use. Additionally, if requested, drop all pre-existing
1724
+ * publications.
1725
+ */
1726
+ static void
1727
+ check_and_drop_publications (PGconn * conn , struct LogicalRepInfo * dbinfo )
1728
+ {
1729
+ PGresult * res ;
1730
+ bool drop_all_pubs = dbinfos .objecttypes_to_remove & OBJECTTYPE_PUBLICATIONS ;
1731
+
1732
+ Assert (conn != NULL );
1733
+
1734
+ if (drop_all_pubs )
1735
+ {
1736
+ pg_log_info ("dropping all existing publications in database \"%s\"" ,
1737
+ dbinfo -> dbname );
1738
+
1739
+ /* Fetch all publication names */
1740
+ res = PQexec (conn , "SELECT pubname FROM pg_catalog.pg_publication;" );
1741
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
1742
+ {
1743
+ pg_log_error ("could not obtain publication information: %s" ,
1744
+ PQresultErrorMessage (res ));
1745
+ PQclear (res );
1746
+ disconnect_database (conn , true);
1747
+ }
1748
+
1749
+ /* Drop each publication */
1750
+ for (int i = 0 ; i < PQntuples (res ); i ++ )
1751
+ drop_publication (conn , PQgetvalue (res , i , 0 ), dbinfo -> dbname ,
1752
+ & dbinfo -> made_publication );
1753
+
1754
+ PQclear (res );
1755
+ }
1756
+
1757
+ /*
1758
+ * In dry-run mode, we don't create publications, but we still try to drop
1759
+ * those to provide necessary information to the user.
1760
+ */
1761
+ if (!drop_all_pubs || dry_run )
1762
+ drop_publication (conn , dbinfo -> pubname , dbinfo -> dbname ,
1763
+ & dbinfo -> made_publication );
1764
+ }
1765
+
1711
1766
/*
1712
1767
* Create a subscription with some predefined options.
1713
1768
*
@@ -1914,6 +1969,7 @@ main(int argc, char **argv)
1914
1969
{"dry-run" , no_argument , NULL , 'n' },
1915
1970
{"subscriber-port" , required_argument , NULL , 'p' },
1916
1971
{"publisher-server" , required_argument , NULL , 'P' },
1972
+ {"remove" , required_argument , NULL , 'R' },
1917
1973
{"socketdir" , required_argument , NULL , 's' },
1918
1974
{"recovery-timeout" , required_argument , NULL , 't' },
1919
1975
{"enable-two-phase" , no_argument , NULL , 'T' },
@@ -1995,7 +2051,7 @@ main(int argc, char **argv)
1995
2051
1996
2052
get_restricted_token ();
1997
2053
1998
- while ((c = getopt_long (argc , argv , "d:D:np:P:s:t:TU:v" ,
2054
+ while ((c = getopt_long (argc , argv , "d:D:np:P:R: s:t:TU:v" ,
1999
2055
long_options , & option_index )) != -1 )
2000
2056
{
2001
2057
switch (c )
@@ -2025,6 +2081,12 @@ main(int argc, char **argv)
2025
2081
case 'P' :
2026
2082
opt .pub_conninfo_str = pg_strdup (optarg );
2027
2083
break ;
2084
+ case 'R' :
2085
+ if (!simple_string_list_member (& opt .objecttypes_to_remove , optarg ))
2086
+ simple_string_list_append (& opt .objecttypes_to_remove , optarg );
2087
+ else
2088
+ pg_fatal ("object type \"%s\" is specified more than once for --remove" , optarg );
2089
+ break ;
2028
2090
case 's' :
2029
2091
opt .socket_dir = pg_strdup (optarg );
2030
2092
canonicalize_path (opt .socket_dir );
@@ -2189,6 +2251,19 @@ main(int argc, char **argv)
2189
2251
exit (1 );
2190
2252
}
2191
2253
2254
+ /* Verify the object types specified for removal from the subscriber */
2255
+ for (SimpleStringListCell * cell = opt .objecttypes_to_remove .head ; cell ; cell = cell -> next )
2256
+ {
2257
+ if (pg_strcasecmp (cell -> val , "publications" ) == 0 )
2258
+ dbinfos .objecttypes_to_remove |= OBJECTTYPE_PUBLICATIONS ;
2259
+ else
2260
+ {
2261
+ pg_log_error ("invalid object type \"%s\" specified for --remove" , cell -> val );
2262
+ pg_log_error_hint ("The valid option is: \"publications\"" );
2263
+ exit (1 );
2264
+ }
2265
+ }
2266
+
2192
2267
/* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2193
2268
pg_ctl_path = get_exec_path (argv [0 ], "pg_ctl" );
2194
2269
pg_resetwal_path = get_exec_path (argv [0 ], "pg_resetwal" );
0 commit comments