Skip to content

Commit 776621a

Browse files
author
Amit Kapila
committed
Add a failover option to subscriptions.
This commit introduces a new subscription option named 'failover', which provides users with the ability to set the failover property of the replication slot on the publisher when creating or altering a subscription. This uses the replication commands introduced by commit 7329240 to enable the failover option for a logical replication slot. If the failover option is set to true, the associated replication slots (i.e. the main slot and the table sync slots) in the upstream database are enabled to be synchronized to the standbys. Note that the capability to sync the replication slots will be added in subsequent commits. Thanks to Masahiko Sawada for the design inputs. Author: Shveta Malik, Hou Zhijie, Ajin Cherian Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda Hayato, Amit Kapila Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
1 parent b527ebc commit 776621a

21 files changed

+412
-108
lines changed

doc/src/sgml/catalogs.sgml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8000,6 +8000,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
80008000
</para></entry>
80018001
</row>
80028002

8003+
<row>
8004+
<entry role="catalog_table_entry"><para role="column_definition">
8005+
<structfield>subfailover</structfield> <type>bool</type>
8006+
</para>
8007+
<para>
8008+
If true, the associated replication slots (i.e. the main slot and the
8009+
table sync slots) in the upstream database are enabled to be
8010+
synchronized to the standbys
8011+
</para></entry>
8012+
</row>
8013+
80038014
<row>
80048015
<entry role="catalog_table_entry"><para role="column_definition">
80058016
<structfield>subconninfo</structfield> <type>text</type>

doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,31 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
226226
<link linkend="sql-createsubscription-params-with-streaming"><literal>streaming</literal></link>,
227227
<link linkend="sql-createsubscription-params-with-disable-on-error"><literal>disable_on_error</literal></link>,
228228
<link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
229-
<link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>, and
230-
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>.
229+
<link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
230+
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, and
231+
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>.
231232
Only a superuser can set <literal>password_required = false</literal>.
232233
</para>
234+
235+
<para>
236+
When altering the
237+
<link linkend="sql-createsubscription-params-with-slot-name"><literal>slot_name</literal></link>,
238+
the <literal>failover</literal> property value of the named slot may differ from the
239+
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
240+
parameter specified in the subscription. When creating the slot,
241+
ensure the slot <literal>failover</literal> property matches the
242+
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
243+
parameter value of the subscription. Otherwise, the slot on the
244+
publisher may behave differently from what subscription's
245+
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
246+
option says. The slot on the publisher could either be
247+
synced to the standbys even when the subscription's
248+
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
249+
option is disabled or could be disabled for sync
250+
even when the subscription's
251+
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
252+
option is enabled.
253+
</para>
233254
</listitem>
234255
</varlistentry>
235256

doc/src/sgml/ref/create_subscription.sgml

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,22 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
117117
command should connect to the publisher at all. The default
118118
is <literal>true</literal>. Setting this to
119119
<literal>false</literal> will force the values of
120-
<literal>create_slot</literal>, <literal>enabled</literal> and
121-
<literal>copy_data</literal> to <literal>false</literal>.
120+
<literal>create_slot</literal>, <literal>enabled</literal>,
121+
<literal>copy_data</literal>, and <literal>failover</literal>
122+
to <literal>false</literal>.
122123
(You cannot combine setting <literal>connect</literal>
123124
to <literal>false</literal> with
124125
setting <literal>create_slot</literal>, <literal>enabled</literal>,
125-
or <literal>copy_data</literal> to <literal>true</literal>.)
126+
<literal>copy_data</literal>, or <literal>failover</literal> to
127+
<literal>true</literal>.)
126128
</para>
127129

128130
<para>
129131
Since no connection is made when this option is
130132
<literal>false</literal>, no tables are subscribed. To initiate
131133
replication, you must manually create the replication slot, enable
132-
the subscription, and refresh the subscription. See
134+
the failover if required, enable the subscription, and refresh the
135+
subscription. See
133136
<xref linkend="logical-replication-subscription-examples-deferred-slot"/>
134137
for examples.
135138
</para>
@@ -400,6 +403,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
400403
</para>
401404
</listitem>
402405
</varlistentry>
406+
407+
<varlistentry id="sql-createsubscription-params-with-failover">
408+
<term><literal>failover</literal> (<type>boolean</type>)</term>
409+
<listitem>
410+
<para>
411+
Specifies whether the replication slots associated with the subscription
412+
are enabled to be synced to the standbys so that logical
413+
replication can be resumed from the new primary after failover.
414+
The default is <literal>false</literal>.
415+
</para>
416+
</listitem>
417+
</varlistentry>
403418
</variablelist></para>
404419

405420
</listitem>

doc/src/sgml/ref/pg_dump.sgml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1588,7 +1588,13 @@ CREATE DATABASE foo WITH TEMPLATE template0;
15881588
dump can be restored without requiring network access to the remote
15891589
servers. It is then up to the user to reactivate the subscriptions in a
15901590
suitable way. If the involved hosts have changed, the connection
1591-
information might have to be changed. It might also be appropriate to
1591+
information might have to be changed. If the subscription needs to
1592+
be enabled for
1593+
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
1594+
then same needs to be done by executing
1595+
<link linkend="sql-altersubscription-params-set">
1596+
<literal>ALTER SUBSCRIPTION ... SET (failover = true)</literal></link>
1597+
after the slot has been created. It might also be appropriate to
15921598
truncate the target tables before initiating a new full table copy. If users
15931599
intend to copy initial data during refresh they must create the slot with
15941600
<literal>two_phase = false</literal>. After the initial sync, the

src/backend/catalog/pg_subscription.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ GetSubscription(Oid subid, bool missing_ok)
7373
sub->disableonerr = subform->subdisableonerr;
7474
sub->passwordrequired = subform->subpasswordrequired;
7575
sub->runasowner = subform->subrunasowner;
76+
sub->failover = subform->subfailover;
7677

7778
/* Get conninfo */
7879
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,

src/backend/catalog/system_views.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1358,7 +1358,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
13581358
REVOKE ALL ON pg_subscription FROM public;
13591359
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
13601360
subbinary, substream, subtwophasestate, subdisableonerr,
1361-
subpasswordrequired, subrunasowner,
1361+
subpasswordrequired, subrunasowner, subfailover,
13621362
subslotname, subsynccommit, subpublications, suborigin)
13631363
ON pg_subscription TO public;
13641364

src/backend/commands/subscriptioncmds.c

Lines changed: 111 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@
6969
#define SUBOPT_DISABLE_ON_ERR 0x00000400
7070
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
7171
#define SUBOPT_RUN_AS_OWNER 0x00001000
72-
#define SUBOPT_LSN 0x00002000
73-
#define SUBOPT_ORIGIN 0x00004000
72+
#define SUBOPT_FAILOVER 0x00002000
73+
#define SUBOPT_LSN 0x00004000
74+
#define SUBOPT_ORIGIN 0x00008000
75+
7476

7577
/* check if the 'val' has 'bits' set */
7678
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -95,6 +97,7 @@ typedef struct SubOpts
9597
bool disableonerr;
9698
bool passwordrequired;
9799
bool runasowner;
100+
bool failover;
98101
char *origin;
99102
XLogRecPtr lsn;
100103
} SubOpts;
@@ -155,6 +158,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
155158
opts->passwordrequired = true;
156159
if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
157160
opts->runasowner = false;
161+
if (IsSet(supported_opts, SUBOPT_FAILOVER))
162+
opts->failover = false;
158163
if (IsSet(supported_opts, SUBOPT_ORIGIN))
159164
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
160165

@@ -303,6 +308,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
303308
opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
304309
opts->runasowner = defGetBoolean(defel);
305310
}
311+
else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
312+
strcmp(defel->defname, "failover") == 0)
313+
{
314+
if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
315+
errorConflictingDefElem(defel, pstate);
316+
317+
opts->specified_opts |= SUBOPT_FAILOVER;
318+
opts->failover = defGetBoolean(defel);
319+
}
306320
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
307321
strcmp(defel->defname, "origin") == 0)
308322
{
@@ -388,6 +402,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
388402
errmsg("%s and %s are mutually exclusive options",
389403
"connect = false", "copy_data = true")));
390404

405+
if (opts->failover &&
406+
IsSet(opts->specified_opts, SUBOPT_FAILOVER))
407+
ereport(ERROR,
408+
(errcode(ERRCODE_SYNTAX_ERROR),
409+
errmsg("%s and %s are mutually exclusive options",
410+
"connect = false", "failover = true")));
411+
391412
/* Change the defaults of other options. */
392413
opts->enabled = false;
393414
opts->create_slot = false;
@@ -591,7 +612,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
591612
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
592613
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
593614
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
594-
SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
615+
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
595616
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
596617

597618
/*
@@ -697,6 +718,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
697718
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
698719
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
699720
values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
721+
values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
700722
values[Anum_pg_subscription_subconninfo - 1] =
701723
CStringGetTextDatum(conninfo);
702724
if (opts.slot_name)
@@ -807,7 +829,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
807829
twophase_enabled = true;
808830

809831
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
810-
false, CRS_NOEXPORT_SNAPSHOT, NULL);
832+
opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
811833

812834
if (twophase_enabled)
813835
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
@@ -816,6 +838,24 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
816838
(errmsg("created replication slot \"%s\" on publisher",
817839
opts.slot_name)));
818840
}
841+
842+
/*
843+
* If the slot_name is specified without the create_slot option,
844+
* it is possible that the user intends to use an existing slot on
845+
* the publisher, so here we alter the failover property of the
846+
* slot to match the failover value in subscription.
847+
*
848+
* We do not need to change the failover to false if the server
849+
* does not support failover (e.g. pre-PG17).
850+
*/
851+
else if (opts.slot_name &&
852+
(opts.failover || walrcv_server_version(wrconn) >= 170000))
853+
{
854+
walrcv_alter_slot(wrconn, opts.slot_name, opts.failover);
855+
ereport(NOTICE,
856+
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
857+
opts.slot_name, opts.failover ? "true" : "false")));
858+
}
819859
}
820860
PG_FINALLY();
821861
{
@@ -1132,7 +1172,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
11321172
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
11331173
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
11341174
SUBOPT_PASSWORD_REQUIRED |
1135-
SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
1175+
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1176+
SUBOPT_ORIGIN);
11361177

11371178
parse_subscription_options(pstate, stmt->options,
11381179
supported_opts, &opts);
@@ -1211,6 +1252,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
12111252
replaces[Anum_pg_subscription_subrunasowner - 1] = true;
12121253
}
12131254

1255+
if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1256+
{
1257+
if (!sub->slotname)
1258+
ereport(ERROR,
1259+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1260+
errmsg("cannot set %s for a subscription that does not have a slot name",
1261+
"failover")));
1262+
1263+
/*
1264+
* Do not allow changing the failover state if the
1265+
* subscription is enabled. This is because the failover
1266+
* state of the slot on the publisher cannot be modified
1267+
* if the slot is currently acquired by the apply worker.
1268+
*/
1269+
if (sub->enabled)
1270+
ereport(ERROR,
1271+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1272+
errmsg("cannot set %s for enabled subscription",
1273+
"failover")));
1274+
1275+
values[Anum_pg_subscription_subfailover - 1] =
1276+
BoolGetDatum(opts.failover);
1277+
replaces[Anum_pg_subscription_subfailover - 1] = true;
1278+
}
1279+
12141280
if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
12151281
{
12161282
values[Anum_pg_subscription_suborigin - 1] =
@@ -1453,6 +1519,46 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
14531519
heap_freetuple(tup);
14541520
}
14551521

1522+
/*
1523+
* Try to acquire the connection necessary for altering slot.
1524+
*
1525+
* This has to be at the end because otherwise if there is an error while
1526+
* doing the database operations we won't be able to rollback altered
1527+
* slot.
1528+
*/
1529+
if (replaces[Anum_pg_subscription_subfailover - 1])
1530+
{
1531+
bool must_use_password;
1532+
char *err;
1533+
WalReceiverConn *wrconn;
1534+
1535+
/* Load the library providing us libpq calls. */
1536+
load_file("libpqwalreceiver", false);
1537+
1538+
/* Try to connect to the publisher. */
1539+
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1540+
wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
1541+
sub->name, &err);
1542+
if (!wrconn)
1543+
ereport(ERROR,
1544+
(errcode(ERRCODE_CONNECTION_FAILURE),
1545+
errmsg("could not connect to the publisher: %s", err)));
1546+
1547+
PG_TRY();
1548+
{
1549+
walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
1550+
1551+
ereport(NOTICE,
1552+
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
1553+
sub->slotname, opts.failover ? "true" : "false")));
1554+
}
1555+
PG_FINALLY();
1556+
{
1557+
walrcv_disconnect(wrconn);
1558+
}
1559+
PG_END_TRY();
1560+
}
1561+
14561562
table_close(rel, RowExclusiveLock);
14571563

14581564
ObjectAddressSet(myself, SubscriptionRelationId, subid);

src/backend/replication/logical/tablesync.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1430,7 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
14301430
*/
14311431
walrcv_create_slot(LogRepWorkerWalRcvConn,
14321432
slotname, false /* permanent */ , false /* two_phase */ ,
1433-
false,
1433+
MySubscription->failover,
14341434
CRS_USE_SNAPSHOT, origin_startpos);
14351435

14361436
/*

src/backend/replication/logical/worker.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@
132132
* avoid such deadlocks, we generate a unique GID (consisting of the
133133
* subscription oid and the xid of the prepared transaction) for each prepare
134134
* transaction on the subscriber.
135+
*
136+
* FAILOVER
137+
* ----------------------
138+
* The logical slot on the primary can be synced to the standby by specifying
139+
* failover = true when creating the subscription. Enabling failover allows us
140+
* to smoothly transition to the promoted standby, ensuring that we can
141+
* subscribe to the new primary without losing any data.
135142
*-------------------------------------------------------------------------
136143
*/
137144

0 commit comments

Comments
 (0)