@@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
92
92
const char * slot_name );
93
93
static void pg_ctl_status (const char * pg_ctl_cmd , int rc );
94
94
static void start_standby_server (const struct CreateSubscriberOptions * opt ,
95
- bool restricted_access );
95
+ bool restricted_access ,
96
+ bool restrict_logical_worker );
96
97
static void stop_standby_server (const char * datadir );
97
98
static void wait_for_end_recovery (const char * conninfo ,
98
99
const struct CreateSubscriberOptions * opt );
@@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
102
103
static void set_replication_progress (PGconn * conn , const struct LogicalRepInfo * dbinfo ,
103
104
const char * lsn );
104
105
static void enable_subscription (PGconn * conn , const struct LogicalRepInfo * dbinfo );
106
+ static void check_and_drop_existing_subscriptions (PGconn * conn ,
107
+ const struct LogicalRepInfo * dbinfo );
108
+ static void drop_existing_subscriptions (PGconn * conn , const char * subname ,
109
+ const char * dbname );
105
110
106
111
#define USEC_PER_SEC 1000000
107
112
#define WAIT_INTERVAL 1 /* 1 second */
@@ -1025,6 +1030,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
1025
1030
exit (1 );
1026
1031
}
1027
1032
1033
+ /*
1034
+ * Drop a specified subscription. This is to avoid duplicate subscriptions on
1035
+ * the primary (publisher node) and the newly created subscriber. We
1036
+ * shouldn't drop the associated slot as that would be used by the publisher
1037
+ * node.
1038
+ */
1039
+ static void
1040
+ drop_existing_subscriptions (PGconn * conn , const char * subname , const char * dbname )
1041
+ {
1042
+ PQExpBuffer query = createPQExpBuffer ();
1043
+ PGresult * res ;
1044
+
1045
+ Assert (conn != NULL );
1046
+
1047
+ /*
1048
+ * Construct a query string. These commands are allowed to be executed
1049
+ * within a transaction.
1050
+ */
1051
+ appendPQExpBuffer (query , "ALTER SUBSCRIPTION %s DISABLE;" ,
1052
+ subname );
1053
+ appendPQExpBuffer (query , " ALTER SUBSCRIPTION %s SET (slot_name = NONE);" ,
1054
+ subname );
1055
+ appendPQExpBuffer (query , " DROP SUBSCRIPTION %s;" , subname );
1056
+
1057
+ pg_log_info ("dropping subscription \"%s\" on database \"%s\"" ,
1058
+ subname , dbname );
1059
+
1060
+ if (!dry_run )
1061
+ {
1062
+ res = PQexec (conn , query -> data );
1063
+
1064
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
1065
+ {
1066
+ pg_log_error ("could not drop a subscription \"%s\" settings: %s" ,
1067
+ subname , PQresultErrorMessage (res ));
1068
+ disconnect_database (conn , true);
1069
+ }
1070
+
1071
+ PQclear (res );
1072
+ }
1073
+
1074
+ destroyPQExpBuffer (query );
1075
+ }
1076
+
1077
+ /*
1078
+ * Retrieve and drop the pre-existing subscriptions.
1079
+ */
1080
+ static void
1081
+ check_and_drop_existing_subscriptions (PGconn * conn ,
1082
+ const struct LogicalRepInfo * dbinfo )
1083
+ {
1084
+ PQExpBuffer query = createPQExpBuffer ();
1085
+ char * dbname ;
1086
+ PGresult * res ;
1087
+
1088
+ Assert (conn != NULL );
1089
+
1090
+ dbname = PQescapeLiteral (conn , dbinfo -> dbname , strlen (dbinfo -> dbname ));
1091
+
1092
+ appendPQExpBuffer (query ,
1093
+ "SELECT s.subname FROM pg_catalog.pg_subscription s "
1094
+ "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1095
+ "WHERE d.datname = %s" ,
1096
+ dbname );
1097
+ res = PQexec (conn , query -> data );
1098
+
1099
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
1100
+ {
1101
+ pg_log_error ("could not obtain pre-existing subscriptions: %s" ,
1102
+ PQresultErrorMessage (res ));
1103
+ disconnect_database (conn , true);
1104
+ }
1105
+
1106
+ for (int i = 0 ; i < PQntuples (res ); i ++ )
1107
+ drop_existing_subscriptions (conn , PQgetvalue (res , i , 0 ),
1108
+ dbinfo -> dbname );
1109
+
1110
+ PQclear (res );
1111
+ destroyPQExpBuffer (query );
1112
+ }
1113
+
1028
1114
/*
1029
1115
* Create the subscriptions, adjust the initial location for logical
1030
1116
* replication and enable the subscriptions. That's the last step for logical
@@ -1040,6 +1126,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1040
1126
/* Connect to subscriber. */
1041
1127
conn = connect_database (dbinfo [i ].subconninfo , true);
1042
1128
1129
+ /*
1130
+ * We don't need the pre-existing subscriptions on the newly formed
1131
+ * subscriber. They can connect to other publisher nodes and either
1132
+ * get some unwarranted data or can lead to ERRORs in connecting to
1133
+ * such nodes.
1134
+ */
1135
+ check_and_drop_existing_subscriptions (conn , & dbinfo [i ]);
1136
+
1043
1137
/*
1044
1138
* Since the publication was created before the consistent LSN, it is
1045
1139
* available on the subscriber when the physical replica is promoted.
@@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
1314
1408
}
1315
1409
1316
1410
static void
1317
- start_standby_server (const struct CreateSubscriberOptions * opt , bool restricted_access )
1411
+ start_standby_server (const struct CreateSubscriberOptions * opt , bool restricted_access ,
1412
+ bool restrict_logical_worker )
1318
1413
{
1319
1414
PQExpBuffer pg_ctl_cmd = createPQExpBuffer ();
1320
1415
int rc ;
@@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
1343
1438
if (opt -> config_file != NULL )
1344
1439
appendPQExpBuffer (pg_ctl_cmd , " -o \"-c config_file=%s\"" ,
1345
1440
opt -> config_file );
1441
+
1442
+ /* Suppress to start logical replication if requested */
1443
+ if (restrict_logical_worker )
1444
+ appendPQExpBuffer (pg_ctl_cmd , " -o \"-c max_logical_replication_workers=0\"" );
1445
+
1346
1446
pg_log_debug ("pg_ctl command is: %s" , pg_ctl_cmd -> data );
1347
1447
rc = system (pg_ctl_cmd -> data );
1348
1448
pg_ctl_status (pg_ctl_cmd -> data , rc );
@@ -2067,7 +2167,7 @@ main(int argc, char **argv)
2067
2167
* transformation steps.
2068
2168
*/
2069
2169
pg_log_info ("starting the standby with command-line options" );
2070
- start_standby_server (& opt , true);
2170
+ start_standby_server (& opt , true, false );
2071
2171
2072
2172
/* Check if the standby server is ready for logical replication */
2073
2173
check_subscriber (dbinfo );
@@ -2098,10 +2198,11 @@ main(int argc, char **argv)
2098
2198
2099
2199
/*
2100
2200
* Start subscriber so the recovery parameters will take effect. Wait
2101
- * until accepting connections.
2201
+ * until accepting connections. We don't want to start logical replication
2202
+ * during setup.
2102
2203
*/
2103
2204
pg_log_info ("starting the subscriber" );
2104
- start_standby_server (& opt , true);
2205
+ start_standby_server (& opt , true, true );
2105
2206
2106
2207
/* Waiting the subscriber to be promoted */
2107
2208
wait_for_end_recovery (dbinfo [0 ].subconninfo , & opt );
0 commit comments