Skip to content

Commit cf2655a

Browse files
pg_recvlogical: Add --failover option.
This new option instructs pg_recvlogical to create the logical replication slot with the failover option enabled. It can be used in conjunction with the --create-slot option. Author: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Michael Banck <mbanck@gmx.net> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Discussion: https://postgr.es/m/OSCPR01MB14966C54097FC83AF19F3516BF5AC2@OSCPR01MB14966.jpnprd01.prod.outlook.com
1 parent 3556c89 commit cf2655a

File tree

7 files changed

+61
-11
lines changed

7 files changed

+61
-11
lines changed

doc/src/sgml/ref/pg_recvlogical.sgml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ PostgreSQL documentation
7979
</para>
8080

8181
<para>
82-
The <option>--two-phase</option> can be specified with
83-
<option>--create-slot</option> to enable decoding of prepared transactions.
82+
The <option>--two-phase</option> and <option>--falover</option> options
83+
can be specified with <option>--create-slot</option>.
8484
</para>
8585
</listitem>
8686
</varlistentry>
@@ -165,6 +165,16 @@ PostgreSQL documentation
165165
</listitem>
166166
</varlistentry>
167167

168+
<varlistentry>
169+
<term><option>--failover</option></term>
170+
<listitem>
171+
<para>
172+
Enables the slot to be synchronized to the standbys. This option may
173+
only be specified with <option>--create-slot</option>.
174+
</para>
175+
</listitem>
176+
</varlistentry>
177+
168178
<varlistentry>
169179
<term><option>-f <replaceable>filename</replaceable></option></term>
170180
<term><option>--file=<replaceable>filename</replaceable></option></term>

src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,8 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier,
667667
if (temp_replication_slot || create_slot)
668668
{
669669
if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
670-
temp_replication_slot, true, true, false, false))
670+
temp_replication_slot, true, true, false,
671+
false, false))
671672
exit(1);
672673

673674
if (verbose)

src/bin/pg_basebackup/pg_receivewal.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -889,7 +889,7 @@ main(int argc, char **argv)
889889
pg_log_info("creating replication slot \"%s\"", replication_slot);
890890

891891
if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
892-
slot_exists_ok, false))
892+
slot_exists_ok, false, false))
893893
exit(1);
894894
exit(0);
895895
}

src/bin/pg_basebackup/pg_recvlogical.c

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ typedef enum
4242
static char *outfile = NULL;
4343
static int verbose = 0;
4444
static bool two_phase = false;
45+
static bool failover = false;
4546
static int noloop = 0;
4647
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
4748
static int fsync_interval = 10 * 1000; /* 10 sec = default */
@@ -89,6 +90,8 @@ usage(void)
8990
printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
9091
printf(_("\nOptions:\n"));
9192
printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
93+
printf(_(" --failover enable replication slot synchronization to standby servers when\n"
94+
" creating a slot\n"));
9295
printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
9396
printf(_(" -F --fsync-interval=SECS\n"
9497
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
@@ -695,6 +698,7 @@ main(int argc, char **argv)
695698
{"file", required_argument, NULL, 'f'},
696699
{"fsync-interval", required_argument, NULL, 'F'},
697700
{"no-loop", no_argument, NULL, 'n'},
701+
{"failover", no_argument, NULL, 5},
698702
{"verbose", no_argument, NULL, 'v'},
699703
{"two-phase", no_argument, NULL, 't'},
700704
{"version", no_argument, NULL, 'V'},
@@ -770,6 +774,9 @@ main(int argc, char **argv)
770774
case 'v':
771775
verbose++;
772776
break;
777+
case 5:
778+
failover = true;
779+
break;
773780
/* connection options */
774781
case 'd':
775782
dbname = pg_strdup(optarg);
@@ -917,11 +924,21 @@ main(int argc, char **argv)
917924
exit(1);
918925
}
919926

920-
if (two_phase && !do_create_slot)
927+
if (!do_create_slot)
921928
{
922-
pg_log_error("--two-phase may only be specified with --create-slot");
923-
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
924-
exit(1);
929+
if (two_phase)
930+
{
931+
pg_log_error("--two-phase may only be specified with --create-slot");
932+
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
933+
exit(1);
934+
}
935+
936+
if (failover)
937+
{
938+
pg_log_error("--failover may only be specified with --create-slot");
939+
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
940+
exit(1);
941+
}
925942
}
926943

927944
/*
@@ -984,7 +1001,8 @@ main(int argc, char **argv)
9841001
pg_log_info("creating replication slot \"%s\"", replication_slot);
9851002

9861003
if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
987-
false, false, slot_exists_ok, two_phase))
1004+
false, false, slot_exists_ok, two_phase,
1005+
failover))
9881006
exit(1);
9891007
startpos = InvalidXLogRecPtr;
9901008
}

src/bin/pg_basebackup/streamutil.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ GetSlotInformation(PGconn *conn, const char *slot_name,
583583
bool
584584
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
585585
bool is_temporary, bool is_physical, bool reserve_wal,
586-
bool slot_exists_ok, bool two_phase)
586+
bool slot_exists_ok, bool two_phase, bool failover)
587587
{
588588
PQExpBuffer query;
589589
PGresult *res;
@@ -594,6 +594,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
594594
Assert((is_physical && plugin == NULL) ||
595595
(!is_physical && plugin != NULL));
596596
Assert(!(two_phase && is_physical));
597+
Assert(!(failover && is_physical));
597598
Assert(slot_name != NULL);
598599

599600
/* Build base portion of query */
@@ -616,6 +617,10 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
616617
}
617618
else
618619
{
620+
if (failover && PQserverVersion(conn) >= 170000)
621+
AppendPlainCommandOption(query, use_new_option_syntax,
622+
"FAILOVER");
623+
619624
if (two_phase && PQserverVersion(conn) >= 150000)
620625
AppendPlainCommandOption(query, use_new_option_syntax,
621626
"TWO_PHASE");

src/bin/pg_basebackup/streamutil.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ extern PGconn *GetConnection(void);
3535
extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
3636
const char *plugin, bool is_temporary,
3737
bool is_physical, bool reserve_wal,
38-
bool slot_exists_ok, bool two_phase);
38+
bool slot_exists_ok, bool two_phase,
39+
bool failover);
3940
extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
4041
extern bool RunIdentifySystem(PGconn *conn, char **sysid,
4142
TimeLineID *starttli,

src/bin/pg_basebackup/t/030_pg_recvlogical.pl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,19 @@
135135
],
136136
'drop could work without dbname');
137137

138+
# test with failover option enabled
139+
$node->command_ok(
140+
[
141+
'pg_recvlogical',
142+
'--slot' => 'test',
143+
'--dbname' => $node->connstr('postgres'),
144+
'--create-slot',
145+
'--failover',
146+
],
147+
'slot with failover created');
148+
149+
my $result = $node->safe_psql('postgres',
150+
"SELECT failover FROM pg_catalog.pg_replication_slots WHERE slot_name = 'test'");
151+
is($result, 't', "failover is enabled for the new slot");
152+
138153
done_testing();

0 commit comments

Comments
 (0)