Skip to content

Commit 0266e98

Browse files
committed
Flexible options for CREATE_REPLICATION_SLOT.
Like BASE_BACKUP, CREATE_REPLICATION_SLOT has historically used a hard-coded syntax. To improve future extensibility, adopt a flexible options syntax here, too. In the new syntax, instead of three mutually exclusive options EXPORT_SNAPSHOT, USE_SNAPSHOT, and NOEXPORT_SNAPSHOT, there is now a single SNAPSHOT option with three possible values: 'export', 'use', and 'nothing'. This commit does not remove support for the old syntax. It just adds the new one as an additional option, makes pg_receivewal, pg_recvlogical, and walreceiver processes use it. Patch by me, reviewed by Fabien Coelho, Sergei Kornilov, and Fujii Masao. Discussion: http://postgr.es/m/CA+TgmobAczXDRO_Gr2euo_TxgzaH1JxbNxvFx=HYvBinefNH8Q@mail.gmail.com Discussion: http://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com
1 parent 0ba281c commit 0266e98

File tree

5 files changed

+150
-63
lines changed

5 files changed

+150
-63
lines changed

doc/src/sgml/protocol.sgml

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1914,7 +1914,7 @@ The commands accepted in replication mode are:
19141914
</varlistentry>
19151915

19161916
<varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
1917-
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
1917+
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> | <literal>LOGICAL</literal> } [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
19181918
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
19191919
</term>
19201920
<listitem>
@@ -1954,46 +1954,50 @@ The commands accepted in replication mode are:
19541954
</para>
19551955
</listitem>
19561956
</varlistentry>
1957+
</variablelist>
1958+
1959+
<para>The following options are supported:</para>
19571960

1961+
<variablelist>
19581962
<varlistentry>
1959-
<term><literal>TWO_PHASE</literal></term>
1963+
<term><literal>TWO_PHASE [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
19601964
<listitem>
19611965
<para>
1962-
Specify that this logical replication slot supports decoding of two-phase
1966+
If true, this logical replication slot supports decoding of two-phase
19631967
transactions. With this option, two-phase commands like
19641968
<literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
19651969
and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
19661970
The transaction will be decoded and transmitted at
19671971
<literal>PREPARE TRANSACTION</literal> time.
1972+
The default is false.
19681973
</para>
19691974
</listitem>
19701975
</varlistentry>
19711976

19721977
<varlistentry>
1973-
<term><literal>RESERVE_WAL</literal></term>
1978+
<term><literal>RESERVE_WAL [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
19741979
<listitem>
19751980
<para>
1976-
Specify that this physical replication slot reserves <acronym>WAL</acronym>
1981+
If true, this physical replication slot reserves <acronym>WAL</acronym>
19771982
immediately. Otherwise, <acronym>WAL</acronym> is only reserved upon
19781983
connection from a streaming replication client.
1984+
The default is false.
19791985
</para>
19801986
</listitem>
19811987
</varlistentry>
19821988

19831989
<varlistentry>
1984-
<term><literal>EXPORT_SNAPSHOT</literal></term>
1985-
<term><literal>NOEXPORT_SNAPSHOT</literal></term>
1986-
<term><literal>USE_SNAPSHOT</literal></term>
1990+
<term><literal>SNAPSHOT { 'export' | 'use' | 'nothing' }</literal></term>
19871991
<listitem>
19881992
<para>
19891993
Decides what to do with the snapshot created during logical slot
1990-
initialization. <literal>EXPORT_SNAPSHOT</literal>, which is the default,
1994+
initialization. <literal>'export'</literal>, which is the default,
19911995
will export the snapshot for use in other sessions. This option can't
1992-
be used inside a transaction. <literal>USE_SNAPSHOT</literal> will use the
1996+
be used inside a transaction. <literal>'use'</literal> will use the
19931997
snapshot for the current transaction executing the command. This
19941998
option must be used in a transaction, and
19951999
<literal>CREATE_REPLICATION_SLOT</literal> must be the first command
1996-
run in that transaction. Finally, <literal>NOEXPORT_SNAPSHOT</literal> will
2000+
run in that transaction. Finally, <literal>'nothing'</literal> will
19972001
just use the snapshot for logical decoding as normal but won't do
19982002
anything else with it.
19992003
</para>
@@ -2052,6 +2056,17 @@ The commands accepted in replication mode are:
20522056
</listitem>
20532057
</varlistentry>
20542058

2059+
<varlistentry>
2060+
<term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
2061+
</term>
2062+
<listitem>
2063+
<para>
2064+
For compatibility with older releases, this alternative syntax for
2065+
the <literal>CREATE_REPLICATION_SLOT</literal> command is still supported.
2066+
</para>
2067+
</listitem>
2068+
</varlistentry>
2069+
20552070
<varlistentry>
20562071
<term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
20572072
<indexterm><primary>START_REPLICATION</primary></indexterm>

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
862862
PGresult *res;
863863
StringInfoData cmd;
864864
char *snapshot;
865+
int use_new_options_syntax;
866+
867+
use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
865868

866869
initStringInfo(&cmd);
867870

@@ -872,26 +875,58 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
872875

873876
if (conn->logical)
874877
{
875-
appendStringInfoString(&cmd, " LOGICAL pgoutput");
878+
appendStringInfoString(&cmd, " LOGICAL pgoutput ");
879+
if (use_new_options_syntax)
880+
appendStringInfoChar(&cmd, '(');
876881
if (two_phase)
877-
appendStringInfoString(&cmd, " TWO_PHASE");
882+
{
883+
appendStringInfoString(&cmd, "TWO_PHASE");
884+
if (use_new_options_syntax)
885+
appendStringInfoString(&cmd, ", ");
886+
else
887+
appendStringInfoChar(&cmd, ' ');
888+
}
878889

879-
switch (snapshot_action)
890+
if (use_new_options_syntax)
880891
{
881-
case CRS_EXPORT_SNAPSHOT:
882-
appendStringInfoString(&cmd, " EXPORT_SNAPSHOT");
883-
break;
884-
case CRS_NOEXPORT_SNAPSHOT:
885-
appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT");
886-
break;
887-
case CRS_USE_SNAPSHOT:
888-
appendStringInfoString(&cmd, " USE_SNAPSHOT");
889-
break;
892+
switch (snapshot_action)
893+
{
894+
case CRS_EXPORT_SNAPSHOT:
895+
appendStringInfoString(&cmd, "SNAPSHOT 'export'");
896+
break;
897+
case CRS_NOEXPORT_SNAPSHOT:
898+
appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
899+
break;
900+
case CRS_USE_SNAPSHOT:
901+
appendStringInfoString(&cmd, "SNAPSHOT 'use'");
902+
break;
903+
}
890904
}
905+
else
906+
{
907+
switch (snapshot_action)
908+
{
909+
case CRS_EXPORT_SNAPSHOT:
910+
appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
911+
break;
912+
case CRS_NOEXPORT_SNAPSHOT:
913+
appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
914+
break;
915+
case CRS_USE_SNAPSHOT:
916+
appendStringInfoString(&cmd, "USE_SNAPSHOT");
917+
break;
918+
}
919+
}
920+
921+
if (use_new_options_syntax)
922+
appendStringInfoChar(&cmd, ')');
891923
}
892924
else
893925
{
894-
appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
926+
if (use_new_options_syntax)
927+
appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
928+
else
929+
appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
895930
}
896931

897932
res = libpqrcv_PQexec(conn->streamConn, cmd.data);

src/backend/replication/repl_gram.y

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ static SQLCmd *make_sqlcmd(void);
103103
%type <node> plugin_opt_arg
104104
%type <str> opt_slot var_name ident_or_keyword
105105
%type <boolval> opt_temporary
106-
%type <list> create_slot_opt_list
107-
%type <defelt> create_slot_opt
106+
%type <list> create_slot_options create_slot_legacy_opt_list
107+
%type <defelt> create_slot_legacy_opt
108108

109109
%%
110110

@@ -243,8 +243,8 @@ base_backup_legacy_opt:
243243
;
244244

245245
create_replication_slot:
246-
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
247-
K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
246+
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL [options] */
247+
K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_options
248248
{
249249
CreateReplicationSlotCmd *cmd;
250250
cmd = makeNode(CreateReplicationSlotCmd);
@@ -254,8 +254,8 @@ create_replication_slot:
254254
cmd->options = $5;
255255
$$ = (Node *) cmd;
256256
}
257-
/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
258-
| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
257+
/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin [options] */
258+
| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_options
259259
{
260260
CreateReplicationSlotCmd *cmd;
261261
cmd = makeNode(CreateReplicationSlotCmd);
@@ -268,28 +268,33 @@ create_replication_slot:
268268
}
269269
;
270270

271-
create_slot_opt_list:
272-
create_slot_opt_list create_slot_opt
271+
create_slot_options:
272+
'(' generic_option_list ')' { $$ = $2; }
273+
| create_slot_legacy_opt_list { $$ = $1; }
274+
;
275+
276+
create_slot_legacy_opt_list:
277+
create_slot_legacy_opt_list create_slot_legacy_opt
273278
{ $$ = lappend($1, $2); }
274279
| /* EMPTY */
275280
{ $$ = NIL; }
276281
;
277282

278-
create_slot_opt:
283+
create_slot_legacy_opt:
279284
K_EXPORT_SNAPSHOT
280285
{
281-
$$ = makeDefElem("export_snapshot",
282-
(Node *)makeInteger(true), -1);
286+
$$ = makeDefElem("snapshot",
287+
(Node *)makeString("export"), -1);
283288
}
284289
| K_NOEXPORT_SNAPSHOT
285290
{
286-
$$ = makeDefElem("export_snapshot",
287-
(Node *)makeInteger(false), -1);
291+
$$ = makeDefElem("snapshot",
292+
(Node *)makeString("nothing"), -1);
288293
}
289294
| K_USE_SNAPSHOT
290295
{
291-
$$ = makeDefElem("use_snapshot",
292-
(Node *)makeInteger(true), -1);
296+
$$ = makeDefElem("snapshot",
297+
(Node *)makeString("use"), -1);
293298
}
294299
| K_RESERVE_WAL
295300
{

src/backend/replication/walsender.c

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -872,26 +872,30 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
872872
{
873873
DefElem *defel = (DefElem *) lfirst(lc);
874874

875-
if (strcmp(defel->defname, "export_snapshot") == 0)
875+
if (strcmp(defel->defname, "snapshot") == 0)
876876
{
877+
char *action;
878+
877879
if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
878880
ereport(ERROR,
879881
(errcode(ERRCODE_SYNTAX_ERROR),
880882
errmsg("conflicting or redundant options")));
881883

884+
action = defGetString(defel);
882885
snapshot_action_given = true;
883-
*snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
884-
CRS_NOEXPORT_SNAPSHOT;
885-
}
886-
else if (strcmp(defel->defname, "use_snapshot") == 0)
887-
{
888-
if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
886+
887+
if (strcmp(action, "export") == 0)
888+
*snapshot_action = CRS_EXPORT_SNAPSHOT;
889+
else if (strcmp(action, "nothing") == 0)
890+
*snapshot_action = CRS_NOEXPORT_SNAPSHOT;
891+
else if (strcmp(action, "use") == 0)
892+
*snapshot_action = CRS_USE_SNAPSHOT;
893+
else
889894
ereport(ERROR,
890-
(errcode(ERRCODE_SYNTAX_ERROR),
891-
errmsg("conflicting or redundant options")));
895+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
896+
errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
897+
defel->defname, action)));
892898

893-
snapshot_action_given = true;
894-
*snapshot_action = CRS_USE_SNAPSHOT;
895899
}
896900
else if (strcmp(defel->defname, "reserve_wal") == 0)
897901
{
@@ -901,7 +905,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
901905
errmsg("conflicting or redundant options")));
902906

903907
reserve_wal_given = true;
904-
*reserve_wal = true;
908+
*reserve_wal = defGetBoolean(defel);
905909
}
906910
else if (strcmp(defel->defname, "two_phase") == 0)
907911
{
@@ -910,7 +914,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
910914
(errcode(ERRCODE_SYNTAX_ERROR),
911915
errmsg("conflicting or redundant options")));
912916
two_phase_given = true;
913-
*two_phase = true;
917+
*two_phase = defGetBoolean(defel);
914918
}
915919
else
916920
elog(ERROR, "unrecognized option: %s", defel->defname);
@@ -980,7 +984,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
980984
ereport(ERROR,
981985
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
982986
(errmsg("%s must not be called inside a transaction",
983-
"CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
987+
"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
984988

985989
need_full_snapshot = true;
986990
}
@@ -990,25 +994,25 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
990994
ereport(ERROR,
991995
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
992996
(errmsg("%s must be called inside a transaction",
993-
"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
997+
"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
994998

995999
if (XactIsoLevel != XACT_REPEATABLE_READ)
9961000
ereport(ERROR,
9971001
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
9981002
(errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
999-
"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1003+
"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
10001004

10011005
if (FirstSnapshotSet)
10021006
ereport(ERROR,
10031007
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
10041008
(errmsg("%s must be called before any query",
1005-
"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1009+
"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
10061010

10071011
if (IsSubTransaction())
10081012
ereport(ERROR,
10091013
/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
10101014
(errmsg("%s must not be called in a subtransaction",
1011-
"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1015+
"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
10121016

10131017
need_full_snapshot = true;
10141018
}

0 commit comments

Comments
 (0)