Skip to content

Commit 7329240

Browse files
author
Amit Kapila
committed
Allow setting failover property in the replication command.
This commit implements a new replication command called ALTER_REPLICATION_SLOT and a corresponding walreceiver API function named walrcv_alter_slot. Additionally, the CREATE_REPLICATION_SLOT command has been extended to support the failover option. These new additions allow the modification of the failover property of a replication slot on the publisher. A subsequent commit will make use of these commands in subscription commands and will add the tests as well to cover the functionality added/changed by this commit. Author: Hou Zhijie, Shveta Malik Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
1 parent 08e6344 commit 7329240

File tree

13 files changed

+230
-11
lines changed

13 files changed

+230
-11
lines changed

doc/src/sgml/protocol.sgml

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
20602060
</para>
20612061
</listitem>
20622062
</varlistentry>
2063+
2064+
<varlistentry>
2065+
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
2066+
<listitem>
2067+
<para>
2068+
If true, the slot is enabled to be synced to the standbys.
2069+
The default is false.
2070+
</para>
2071+
</listitem>
2072+
</varlistentry>
20632073
</variablelist>
20642074

20652075
<para>
@@ -2124,6 +2134,46 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
21242134
</listitem>
21252135
</varlistentry>
21262136

2137+
<varlistentry id="protocol-replication-alter-replication-slot" xreflabel="ALTER_REPLICATION_SLOT">
2138+
<term><literal>ALTER_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ( <replaceable class="parameter">option</replaceable> [, ...] )
2139+
<indexterm><primary>ALTER_REPLICATION_SLOT</primary></indexterm>
2140+
</term>
2141+
<listitem>
2142+
<para>
2143+
Change the definition of a replication slot.
2144+
See <xref linkend="streaming-replication-slots"/> for more about
2145+
replication slots. This command is currently only supported for logical
2146+
replication slots.
2147+
</para>
2148+
2149+
<variablelist>
2150+
<varlistentry>
2151+
<term><replaceable class="parameter">slot_name</replaceable></term>
2152+
<listitem>
2153+
<para>
2154+
The name of the slot to alter. Must be a valid replication slot
2155+
name (see <xref linkend="streaming-replication-slots-manipulation"/>).
2156+
</para>
2157+
</listitem>
2158+
</varlistentry>
2159+
</variablelist>
2160+
2161+
<para>The following option is supported:</para>
2162+
2163+
<variablelist>
2164+
<varlistentry>
2165+
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
2166+
<listitem>
2167+
<para>
2168+
If true, the slot is enabled to be synced to the standbys.
2169+
</para>
2170+
</listitem>
2171+
</varlistentry>
2172+
</variablelist>
2173+
2174+
</listitem>
2175+
</varlistentry>
2176+
21272177
<varlistentry id="protocol-replication-read-replication-slot">
21282178
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
21292179
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>

src/backend/commands/subscriptioncmds.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
807807
twophase_enabled = true;
808808

809809
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
810-
CRS_NOEXPORT_SNAPSHOT, NULL);
810+
false, CRS_NOEXPORT_SNAPSHOT, NULL);
811811

812812
if (twophase_enabled)
813813
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
7373
const char *slotname,
7474
bool temporary,
7575
bool two_phase,
76+
bool failover,
7677
CRSSnapshotAction snapshot_action,
7778
XLogRecPtr *lsn);
79+
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
80+
bool failover);
7881
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
7982
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
8083
const char *query,
@@ -95,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
9598
.walrcv_receive = libpqrcv_receive,
9699
.walrcv_send = libpqrcv_send,
97100
.walrcv_create_slot = libpqrcv_create_slot,
101+
.walrcv_alter_slot = libpqrcv_alter_slot,
98102
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
99103
.walrcv_exec = libpqrcv_exec,
100104
.walrcv_disconnect = libpqrcv_disconnect
@@ -938,8 +942,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
938942
*/
939943
static char *
940944
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
941-
bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
942-
XLogRecPtr *lsn)
945+
bool temporary, bool two_phase, bool failover,
946+
CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
943947
{
944948
PGresult *res;
945949
StringInfoData cmd;
@@ -969,6 +973,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
969973
appendStringInfoChar(&cmd, ' ');
970974
}
971975

976+
if (failover)
977+
{
978+
appendStringInfoString(&cmd, "FAILOVER");
979+
if (use_new_options_syntax)
980+
appendStringInfoString(&cmd, ", ");
981+
else
982+
appendStringInfoChar(&cmd, ' ');
983+
}
984+
972985
if (use_new_options_syntax)
973986
{
974987
switch (snapshot_action)
@@ -1037,6 +1050,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
10371050
return snapshot;
10381051
}
10391052

1053+
/*
1054+
* Change the definition of the replication slot.
1055+
*/
1056+
static void
1057+
libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
1058+
bool failover)
1059+
{
1060+
StringInfoData cmd;
1061+
PGresult *res;
1062+
1063+
initStringInfo(&cmd);
1064+
appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
1065+
quote_identifier(slotname),
1066+
failover ? "true" : "false");
1067+
1068+
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1069+
pfree(cmd.data);
1070+
1071+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
1072+
ereport(ERROR,
1073+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1074+
errmsg("could not alter replication slot \"%s\": %s",
1075+
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1076+
1077+
PQclear(res);
1078+
}
1079+
10401080
/*
10411081
* Return PID of remote backend process.
10421082
*/

src/backend/replication/logical/tablesync.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,6 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
14301430
*/
14311431
walrcv_create_slot(LogRepWorkerWalRcvConn,
14321432
slotname, false /* permanent */ , false /* two_phase */ ,
1433+
false,
14331434
CRS_USE_SNAPSHOT, origin_startpos);
14341435

14351436
/*

src/backend/replication/repl_gram.y

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ Node *replication_parse_result;
6464
%token K_START_REPLICATION
6565
%token K_CREATE_REPLICATION_SLOT
6666
%token K_DROP_REPLICATION_SLOT
67+
%token K_ALTER_REPLICATION_SLOT
6768
%token K_TIMELINE_HISTORY
6869
%token K_WAIT
6970
%token K_TIMELINE
@@ -80,8 +81,9 @@ Node *replication_parse_result;
8081

8182
%type <node> command
8283
%type <node> base_backup start_replication start_logical_replication
83-
create_replication_slot drop_replication_slot identify_system
84-
read_replication_slot timeline_history show upload_manifest
84+
create_replication_slot drop_replication_slot
85+
alter_replication_slot identify_system read_replication_slot
86+
timeline_history show upload_manifest
8587
%type <list> generic_option_list
8688
%type <defelt> generic_option
8789
%type <uintval> opt_timeline
@@ -112,6 +114,7 @@ command:
112114
| start_logical_replication
113115
| create_replication_slot
114116
| drop_replication_slot
117+
| alter_replication_slot
115118
| read_replication_slot
116119
| timeline_history
117120
| show
@@ -259,6 +262,18 @@ drop_replication_slot:
259262
}
260263
;
261264

265+
/* ALTER_REPLICATION_SLOT slot (options) */
266+
alter_replication_slot:
267+
K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')'
268+
{
269+
AlterReplicationSlotCmd *cmd;
270+
cmd = makeNode(AlterReplicationSlotCmd);
271+
cmd->slotname = $2;
272+
cmd->options = $4;
273+
$$ = (Node *) cmd;
274+
}
275+
;
276+
262277
/*
263278
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
264279
*/
@@ -410,6 +425,7 @@ ident_or_keyword:
410425
| K_START_REPLICATION { $$ = "start_replication"; }
411426
| K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; }
412427
| K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; }
428+
| K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; }
413429
| K_TIMELINE_HISTORY { $$ = "timeline_history"; }
414430
| K_WAIT { $$ = "wait"; }
415431
| K_TIMELINE { $$ = "timeline"; }

src/backend/replication/repl_scanner.l

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; }
125125
START_REPLICATION { return K_START_REPLICATION; }
126126
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
127127
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
128+
ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; }
128129
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
129130
PHYSICAL { return K_PHYSICAL; }
130131
RESERVE_WAL { return K_RESERVE_WAL; }
@@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void)
302303
case K_START_REPLICATION:
303304
case K_CREATE_REPLICATION_SLOT:
304305
case K_DROP_REPLICATION_SLOT:
306+
case K_ALTER_REPLICATION_SLOT:
305307
case K_READ_REPLICATION_SLOT:
306308
case K_TIMELINE_HISTORY:
307309
case K_UPLOAD_MANIFEST:

src/backend/replication/slot.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,31 @@ ReplicationSlotDrop(const char *name, bool nowait)
683683
ReplicationSlotDropAcquired();
684684
}
685685

686+
/*
687+
* Change the definition of the slot identified by the specified name.
688+
*/
689+
void
690+
ReplicationSlotAlter(const char *name, bool failover)
691+
{
692+
Assert(MyReplicationSlot == NULL);
693+
694+
ReplicationSlotAcquire(name, false);
695+
696+
if (SlotIsPhysical(MyReplicationSlot))
697+
ereport(ERROR,
698+
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
699+
errmsg("cannot use %s with a physical replication slot",
700+
"ALTER_REPLICATION_SLOT"));
701+
702+
SpinLockAcquire(&MyReplicationSlot->mutex);
703+
MyReplicationSlot->data.failover = failover;
704+
SpinLockRelease(&MyReplicationSlot->mutex);
705+
706+
ReplicationSlotMarkDirty();
707+
ReplicationSlotSave();
708+
ReplicationSlotRelease();
709+
}
710+
686711
/*
687712
* Permanently drop the currently acquired replication slot.
688713
*/

src/backend/replication/walreceiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ WalReceiverMain(void)
387387
"pg_walreceiver_%lld",
388388
(long long int) walrcv_get_backend_pid(wrconn));
389389

390-
walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
390+
walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
391391

392392
SpinLockAcquire(&walrcv->mutex);
393393
strlcpy(walrcv->slotname, slotname, NAMEDATALEN);

src/backend/replication/walsender.c

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,12 +1126,13 @@ static void
11261126
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
11271127
bool *reserve_wal,
11281128
CRSSnapshotAction *snapshot_action,
1129-
bool *two_phase)
1129+
bool *two_phase, bool *failover)
11301130
{
11311131
ListCell *lc;
11321132
bool snapshot_action_given = false;
11331133
bool reserve_wal_given = false;
11341134
bool two_phase_given = false;
1135+
bool failover_given = false;
11351136

11361137
/* Parse options */
11371138
foreach(lc, cmd->options)
@@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
11811182
two_phase_given = true;
11821183
*two_phase = defGetBoolean(defel);
11831184
}
1185+
else if (strcmp(defel->defname, "failover") == 0)
1186+
{
1187+
if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1188+
ereport(ERROR,
1189+
(errcode(ERRCODE_SYNTAX_ERROR),
1190+
errmsg("conflicting or redundant options")));
1191+
failover_given = true;
1192+
*failover = defGetBoolean(defel);
1193+
}
11841194
else
11851195
elog(ERROR, "unrecognized option: %s", defel->defname);
11861196
}
@@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
11971207
char *slot_name;
11981208
bool reserve_wal = false;
11991209
bool two_phase = false;
1210+
bool failover = false;
12001211
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
12011212
DestReceiver *dest;
12021213
TupOutputState *tstate;
@@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
12061217

12071218
Assert(!MyReplicationSlot);
12081219

1209-
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1220+
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1221+
&failover);
12101222

12111223
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
12121224
{
@@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
12431255
*/
12441256
ReplicationSlotCreate(cmd->slotname, true,
12451257
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
1246-
two_phase, false);
1258+
two_phase, failover);
12471259

12481260
/*
12491261
* Do options check early so that we can bail before calling the
@@ -1398,6 +1410,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
13981410
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
13991411
}
14001412

1413+
/*
1414+
* Process extra options given to ALTER_REPLICATION_SLOT.
1415+
*/
1416+
static void
1417+
ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
1418+
{
1419+
bool failover_given = false;
1420+
1421+
/* Parse options */
1422+
foreach_ptr(DefElem, defel, cmd->options)
1423+
{
1424+
if (strcmp(defel->defname, "failover") == 0)
1425+
{
1426+
if (failover_given)
1427+
ereport(ERROR,
1428+
(errcode(ERRCODE_SYNTAX_ERROR),
1429+
errmsg("conflicting or redundant options")));
1430+
failover_given = true;
1431+
*failover = defGetBoolean(defel);
1432+
}
1433+
else
1434+
elog(ERROR, "unrecognized option: %s", defel->defname);
1435+
}
1436+
}
1437+
1438+
/*
1439+
* Change the definition of a replication slot.
1440+
*/
1441+
static void
1442+
AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
1443+
{
1444+
bool failover = false;
1445+
1446+
ParseAlterReplSlotOptions(cmd, &failover);
1447+
ReplicationSlotAlter(cmd->slotname, failover);
1448+
}
1449+
14011450
/*
14021451
* Load previously initiated logical slot and prepare for sending data (via
14031452
* WalSndLoop).
@@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string)
19712020
EndReplicationCommand(cmdtag);
19722021
break;
19732022

2023+
case T_AlterReplicationSlotCmd:
2024+
cmdtag = "ALTER_REPLICATION_SLOT";
2025+
set_ps_display(cmdtag);
2026+
AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node);
2027+
EndReplicationCommand(cmdtag);
2028+
break;
2029+
19742030
case T_StartReplicationCmd:
19752031
{
19762032
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;

0 commit comments

Comments
 (0)