Skip to content

Commit 4867f8a

Browse files
author
Amit Kapila
committed
Drop pre-existing subscriptions from the converted subscriber.
We don't need the pre-existing subscriptions on the newly formed subscriber by using pg_createsubscriber. The apply workers corresponding to these subscriptions can connect to other publisher nodes and either get some unwarranted data or can lead to ERRORs in connecting to such nodes. Author: Kuroda Hayato Reviewed-by: Amit Kapila, Shlok Kyal, Vignesh C Backpatch-through: 17 Discussion: https://postgr.es/m/OSBPR01MB25526A30A1FBF863ACCDDA3AF5C92@OSBPR01MB2552.jpnprd01.prod.outlook.com
1 parent 8f8bcb8 commit 4867f8a

File tree

2 files changed

+120
-5
lines changed

2 files changed

+120
-5
lines changed

src/bin/pg_basebackup/pg_createsubscriber.c

+106-5
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
9292
const char *slot_name);
9393
static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
9494
static void start_standby_server(const struct CreateSubscriberOptions *opt,
95-
bool restricted_access);
95+
bool restricted_access,
96+
bool restrict_logical_worker);
9697
static void stop_standby_server(const char *datadir);
9798
static void wait_for_end_recovery(const char *conninfo,
9899
const struct CreateSubscriberOptions *opt);
@@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
102103
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
103104
const char *lsn);
104105
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
106+
static void check_and_drop_existing_subscriptions(PGconn *conn,
107+
const struct LogicalRepInfo *dbinfo);
108+
static void drop_existing_subscriptions(PGconn *conn, const char *subname,
109+
const char *dbname);
105110

106111
#define USEC_PER_SEC 1000000
107112
#define WAIT_INTERVAL 1 /* 1 second */
@@ -1025,6 +1030,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
10251030
exit(1);
10261031
}
10271032

1033+
/*
1034+
* Drop a specified subscription. This is to avoid duplicate subscriptions on
1035+
* the primary (publisher node) and the newly created subscriber. We
1036+
* shouldn't drop the associated slot as that would be used by the publisher
1037+
* node.
1038+
*/
1039+
static void
1040+
drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
1041+
{
1042+
PQExpBuffer query = createPQExpBuffer();
1043+
PGresult *res;
1044+
1045+
Assert(conn != NULL);
1046+
1047+
/*
1048+
* Construct a query string. These commands are allowed to be executed
1049+
* within a transaction.
1050+
*/
1051+
appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1052+
subname);
1053+
appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1054+
subname);
1055+
appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1056+
1057+
pg_log_info("dropping subscription \"%s\" on database \"%s\"",
1058+
subname, dbname);
1059+
1060+
if (!dry_run)
1061+
{
1062+
res = PQexec(conn, query->data);
1063+
1064+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
1065+
{
1066+
pg_log_error("could not drop a subscription \"%s\" settings: %s",
1067+
subname, PQresultErrorMessage(res));
1068+
disconnect_database(conn, true);
1069+
}
1070+
1071+
PQclear(res);
1072+
}
1073+
1074+
destroyPQExpBuffer(query);
1075+
}
1076+
1077+
/*
1078+
* Retrieve and drop the pre-existing subscriptions.
1079+
*/
1080+
static void
1081+
check_and_drop_existing_subscriptions(PGconn *conn,
1082+
const struct LogicalRepInfo *dbinfo)
1083+
{
1084+
PQExpBuffer query = createPQExpBuffer();
1085+
char *dbname;
1086+
PGresult *res;
1087+
1088+
Assert(conn != NULL);
1089+
1090+
dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1091+
1092+
appendPQExpBuffer(query,
1093+
"SELECT s.subname FROM pg_catalog.pg_subscription s "
1094+
"INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1095+
"WHERE d.datname = %s",
1096+
dbname);
1097+
res = PQexec(conn, query->data);
1098+
1099+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
1100+
{
1101+
pg_log_error("could not obtain pre-existing subscriptions: %s",
1102+
PQresultErrorMessage(res));
1103+
disconnect_database(conn, true);
1104+
}
1105+
1106+
for (int i = 0; i < PQntuples(res); i++)
1107+
drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
1108+
dbinfo->dbname);
1109+
1110+
PQclear(res);
1111+
destroyPQExpBuffer(query);
1112+
}
1113+
10281114
/*
10291115
* Create the subscriptions, adjust the initial location for logical
10301116
* replication and enable the subscriptions. That's the last step for logical
@@ -1040,6 +1126,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
10401126
/* Connect to subscriber. */
10411127
conn = connect_database(dbinfo[i].subconninfo, true);
10421128

1129+
/*
1130+
* We don't need the pre-existing subscriptions on the newly formed
1131+
* subscriber. They can connect to other publisher nodes and either
1132+
* get some unwarranted data or can lead to ERRORs in connecting to
1133+
* such nodes.
1134+
*/
1135+
check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1136+
10431137
/*
10441138
* Since the publication was created before the consistent LSN, it is
10451139
* available on the subscriber when the physical replica is promoted.
@@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
13141408
}
13151409

13161410
static void
1317-
start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
1411+
start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1412+
bool restrict_logical_worker)
13181413
{
13191414
PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
13201415
int rc;
@@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
13431438
if (opt->config_file != NULL)
13441439
appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
13451440
opt->config_file);
1441+
1442+
/* Suppress to start logical replication if requested */
1443+
if (restrict_logical_worker)
1444+
appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1445+
13461446
pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
13471447
rc = system(pg_ctl_cmd->data);
13481448
pg_ctl_status(pg_ctl_cmd->data, rc);
@@ -2067,7 +2167,7 @@ main(int argc, char **argv)
20672167
* transformation steps.
20682168
*/
20692169
pg_log_info("starting the standby with command-line options");
2070-
start_standby_server(&opt, true);
2170+
start_standby_server(&opt, true, false);
20712171

20722172
/* Check if the standby server is ready for logical replication */
20732173
check_subscriber(dbinfo);
@@ -2098,10 +2198,11 @@ main(int argc, char **argv)
20982198

20992199
/*
21002200
* Start subscriber so the recovery parameters will take effect. Wait
2101-
* until accepting connections.
2201+
* until accepting connections. We don't want to start logical replication
2202+
* during setup.
21022203
*/
21032204
pg_log_info("starting the subscriber");
2104-
start_standby_server(&opt, true);
2205+
start_standby_server(&opt, true, true);
21052206

21062207
/* Waiting the subscriber to be promoted */
21072208
wait_for_end_recovery(dbinfo[0].subconninfo, &opt);

src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

+14
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,13 @@ sub generate_db
298298
"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary"
299299
);
300300
is($result, 'failover_slot', 'failover slot is synced');
301+
302+
# Create subscription to test its removal
303+
my $dummy_sub = 'regress_sub_dummy';
304+
$node_p->safe_psql($db1,
305+
"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
306+
);
307+
$node_p->wait_for_replay_catchup($node_s);
301308
$node_s->stop;
302309

303310
# dry run mode on node S
@@ -372,6 +379,13 @@ sub generate_db
372379
# Start subscriber
373380
$node_s->start;
374381

382+
# Confirm the pre-existing subscription has been removed
383+
$result = $node_s->safe_psql(
384+
'postgres', qq(
385+
SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub'
386+
));
387+
is($result, qq(0), 'pre-existing subscription was dropped');
388+
375389
# Get subscription names
376390
$result = $node_s->safe_psql(
377391
'postgres', qq(

0 commit comments

Comments
 (0)