Skip to content

Commit b4ada4e

Browse files
committed
Add replication command READ_REPLICATION_SLOT
The command is supported for physical slots for now, and returns the type of slot, its restart_lsn and its restart_tli. This will be useful for an upcoming patch related to pg_receivewal, to allow the tool to be able to stream from the position of a slot, rather than the last WAL position flushed by the backend (as reported by IDENTIFY_SYSTEM) if the archive directory is found as empty, which would be an advantage in the case of switching to a different archive locations with the same slot used to avoid holes in WAL segment archives. Author: Ronan Dunklau Reviewed-by: Kyotaro Horiguchi, Michael Paquier, Bharath Rupireddy Discussion: https://postgr.es/m/18708360.4lzOvYHigE@aivenronan
1 parent 70bef49 commit b4ada4e

File tree

9 files changed

+224
-3
lines changed

9 files changed

+224
-3
lines changed

doc/src/sgml/protocol.sgml

+48
Original file line numberDiff line numberDiff line change
@@ -2067,6 +2067,54 @@ The commands accepted in replication mode are:
20672067
</listitem>
20682068
</varlistentry>
20692069

2070+
<varlistentry>
2071+
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
2072+
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
2073+
</term>
2074+
<listitem>
2075+
<para>
2076+
Read some information associated to a replication slot. Returns a tuple
2077+
with <literal>NULL</literal> values if the replication slot does not
2078+
exist. This command is currently only supported for physical replication
2079+
slots.
2080+
</para>
2081+
<para>
2082+
In response to this command, the server will return a one-row result set,
2083+
containing the following fields:
2084+
<variablelist>
2085+
<varlistentry>
2086+
<term><literal>slot_type</literal> (<type>text</type>)</term>
2087+
<listitem>
2088+
<para>
2089+
The replication slot's type, either <literal>physical</literal> or
2090+
<literal>NULL</literal>.
2091+
</para>
2092+
</listitem>
2093+
</varlistentry>
2094+
2095+
<varlistentry>
2096+
<term><literal>restart_lsn</literal> (<type>text</type>)</term>
2097+
<listitem>
2098+
<para>
2099+
The replication slot's <literal>restart_lsn</literal>.
2100+
</para>
2101+
</listitem>
2102+
</varlistentry>
2103+
2104+
<varlistentry>
2105+
<term><literal>restart_tli</literal> (<type>int8</type>)</term>
2106+
<listitem>
2107+
<para>
2108+
The timeline ID associated to <literal>restart_lsn</literal>,
2109+
following the current timeline history.
2110+
</para>
2111+
</listitem>
2112+
</varlistentry>
2113+
</variablelist>
2114+
</para>
2115+
</listitem>
2116+
</varlistentry>
2117+
20702118
<varlistentry>
20712119
<term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
20722120
<indexterm><primary>START_REPLICATION</primary></indexterm>

src/backend/replication/repl_gram.y

+15-1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
6464
/* Keyword tokens. */
6565
%token K_BASE_BACKUP
6666
%token K_IDENTIFY_SYSTEM
67+
%token K_READ_REPLICATION_SLOT
6768
%token K_SHOW
6869
%token K_START_REPLICATION
6970
%token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
9495
%type <node> command
9596
%type <node> base_backup start_replication start_logical_replication
9697
create_replication_slot drop_replication_slot identify_system
97-
timeline_history show sql_cmd
98+
read_replication_slot timeline_history show sql_cmd
9899
%type <list> base_backup_legacy_opt_list generic_option_list
99100
%type <defelt> base_backup_legacy_opt generic_option
100101
%type <uintval> opt_timeline
@@ -125,6 +126,7 @@ command:
125126
| start_logical_replication
126127
| create_replication_slot
127128
| drop_replication_slot
129+
| read_replication_slot
128130
| timeline_history
129131
| show
130132
| sql_cmd
@@ -140,6 +142,18 @@ identify_system:
140142
}
141143
;
142144

145+
/*
146+
* READ_REPLICATION_SLOT %s
147+
*/
148+
read_replication_slot:
149+
K_READ_REPLICATION_SLOT var_name
150+
{
151+
ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
152+
n->slotname = $2;
153+
$$ = (Node *) n;
154+
}
155+
;
156+
143157
/*
144158
* SHOW setting
145159
*/

src/backend/replication/repl_scanner.l

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}*
8585
BASE_BACKUP { return K_BASE_BACKUP; }
8686
FAST { return K_FAST; }
8787
IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; }
88+
READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; }
8889
SHOW { return K_SHOW; }
8990
LABEL { return K_LABEL; }
9091
NOWAIT { return K_NOWAIT; }

src/backend/replication/walsender.c

+106
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
232232
static void WalSndDone(WalSndSendDataCallback send_data);
233233
static XLogRecPtr GetStandbyFlushRecPtr(void);
234234
static void IdentifySystem(void);
235+
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
235236
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
236237
static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
237238
static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,104 @@ IdentifySystem(void)
457458
end_tup_output(tstate);
458459
}
459460

461+
/* Handle READ_REPLICATION_SLOT command */
462+
static void
463+
ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
464+
{
465+
#define READ_REPLICATION_SLOT_COLS 3
466+
ReplicationSlot *slot;
467+
DestReceiver *dest;
468+
TupOutputState *tstate;
469+
TupleDesc tupdesc;
470+
Datum values[READ_REPLICATION_SLOT_COLS];
471+
bool nulls[READ_REPLICATION_SLOT_COLS];
472+
473+
tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
474+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
475+
TEXTOID, -1, 0);
476+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
477+
TEXTOID, -1, 0);
478+
/* TimeLineID is unsigned, so int4 is not wide enough. */
479+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
480+
INT8OID, -1, 0);
481+
482+
MemSet(values, 0, READ_REPLICATION_SLOT_COLS * sizeof(Datum));
483+
MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
484+
485+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
486+
slot = SearchNamedReplicationSlot(cmd->slotname, false);
487+
if (slot == NULL || !slot->in_use)
488+
{
489+
LWLockRelease(ReplicationSlotControlLock);
490+
}
491+
else
492+
{
493+
ReplicationSlot slot_contents;
494+
int i = 0;
495+
496+
/* Copy slot contents while holding spinlock */
497+
SpinLockAcquire(&slot->mutex);
498+
slot_contents = *slot;
499+
SpinLockRelease(&slot->mutex);
500+
LWLockRelease(ReplicationSlotControlLock);
501+
502+
if (OidIsValid(slot_contents.data.database))
503+
ereport(ERROR,
504+
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
505+
errmsg("cannot use \"%s\" with logical replication slot \"%s\"",
506+
"READ_REPLICATION_SLOT",
507+
NameStr(slot_contents.data.name)));
508+
509+
/* slot type */
510+
values[i] = CStringGetTextDatum("physical");
511+
nulls[i] = false;
512+
i++;
513+
514+
/* start LSN */
515+
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
516+
{
517+
char xloc[64];
518+
519+
snprintf(xloc, sizeof(xloc), "%X/%X",
520+
LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
521+
values[i] = CStringGetTextDatum(xloc);
522+
nulls[i] = false;
523+
}
524+
i++;
525+
526+
/* timeline this WAL was produced on */
527+
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
528+
{
529+
TimeLineID slots_position_timeline;
530+
TimeLineID current_timeline;
531+
List *timeline_history = NIL;
532+
533+
/*
534+
* While in recovery, use as timeline the currently-replaying one
535+
* to get the LSN position's history.
536+
*/
537+
if (RecoveryInProgress())
538+
(void) GetXLogReplayRecPtr(&current_timeline);
539+
else
540+
current_timeline = ThisTimeLineID;
541+
542+
timeline_history = readTimeLineHistory(current_timeline);
543+
slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
544+
timeline_history);
545+
values[i] = Int64GetDatum((int64) slots_position_timeline);
546+
nulls[i] = false;
547+
}
548+
i++;
549+
550+
Assert(i == READ_REPLICATION_SLOT_COLS);
551+
}
552+
553+
dest = CreateDestReceiver(DestRemoteSimple);
554+
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
555+
do_tup_output(tstate, values, nulls);
556+
end_tup_output(tstate);
557+
}
558+
460559

461560
/*
462561
* Handle TIMELINE_HISTORY command.
@@ -1622,6 +1721,13 @@ exec_replication_command(const char *cmd_string)
16221721
EndReplicationCommand(cmdtag);
16231722
break;
16241723

1724+
case T_ReadReplicationSlotCmd:
1725+
cmdtag = "READ_REPLICATION_SLOT";
1726+
set_ps_display(cmdtag);
1727+
ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
1728+
EndReplicationCommand(cmdtag);
1729+
break;
1730+
16251731
case T_BaseBackupCmd:
16261732
cmdtag = "BASE_BACKUP";
16271733
set_ps_display(cmdtag);

src/include/nodes/nodes.h

+1
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ typedef enum NodeTag
496496
T_BaseBackupCmd,
497497
T_CreateReplicationSlotCmd,
498498
T_DropReplicationSlotCmd,
499+
T_ReadReplicationSlotCmd,
499500
T_StartReplicationCmd,
500501
T_TimeLineHistoryCmd,
501502
T_SQLCmd,

src/include/nodes/replnodes.h

+11
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ typedef struct StartReplicationCmd
8787
} StartReplicationCmd;
8888

8989

90+
/* ----------------------
91+
* READ_REPLICATION_SLOT command
92+
* ----------------------
93+
*/
94+
typedef struct ReadReplicationSlotCmd
95+
{
96+
NodeTag type;
97+
char *slotname;
98+
} ReadReplicationSlotCmd;
99+
100+
90101
/* ----------------------
91102
* TIMELINE_HISTORY command
92103
* ----------------------

src/test/recovery/t/001_stream_rep.pl

+31-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use warnings;
77
use PostgreSQL::Test::Cluster;
88
use PostgreSQL::Test::Utils;
9-
use Test::More tests => 49;
9+
use Test::More tests => 53;
1010

1111
# Initialize primary node
1212
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
@@ -254,6 +254,36 @@ sub test_target_session_attrs
254254
"SHOW with superuser-settable parameter, replication role and logical replication"
255255
);
256256

257+
note "testing READ_REPLICATION_SLOT command for replication connection";
258+
259+
my $slotname = 'test_read_replication_slot_physical';
260+
261+
($ret, $stdout, $stderr) = $node_primary->psql(
262+
'postgres',
263+
'READ_REPLICATION_SLOT non_existent_slot;',
264+
extra_params => [ '-d', $connstr_rep ]);
265+
ok($ret == 0, "READ_REPLICATION_SLOT exit code 0 on success");
266+
like($stdout, qr/^||$/,
267+
"READ_REPLICATION_SLOT returns NULL values if slot does not exist");
268+
269+
$node_primary->psql(
270+
'postgres',
271+
"CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
272+
extra_params => [ '-d', $connstr_rep ]);
273+
274+
($ret, $stdout, $stderr) = $node_primary->psql(
275+
'postgres',
276+
"READ_REPLICATION_SLOT $slotname;",
277+
extra_params => [ '-d', $connstr_rep ]);
278+
ok($ret == 0, "READ_REPLICATION_SLOT success with existing slot");
279+
like($stdout, qr/^physical\|[^|]*\|1$/,
280+
"READ_REPLICATION_SLOT returns tuple with slot information");
281+
282+
$node_primary->psql(
283+
'postgres',
284+
"DROP_REPLICATION_SLOT $slotname;",
285+
extra_params => [ '-d', $connstr_rep ]);
286+
257287
note "switching to physical replication slot";
258288

259289
# Switch to using a physical replication slot. We can do this without a new

src/test/recovery/t/006_logical_decoding.pl

+10-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use warnings;
1111
use PostgreSQL::Test::Cluster;
1212
use PostgreSQL::Test::Utils;
13-
use Test::More tests => 14;
13+
use Test::More tests => 15;
1414
use Config;
1515

1616
# Initialize primary node
@@ -39,6 +39,15 @@
3939
m/replication slot "test_slot" was not created in this database/,
4040
"Logical decoding correctly fails to start");
4141

42+
($result, $stdout, $stderr) = $node_primary->psql(
43+
'template1',
44+
qq[READ_REPLICATION_SLOT test_slot;],
45+
replication => 'database');
46+
like(
47+
$stderr,
48+
qr/cannot use "READ_REPLICATION_SLOT" with logical replication slot "test_slot"/,
49+
'READ_REPLICATION_SLOT not supported for logical slots');
50+
4251
# Check case of walsender not using a database connection. Logical
4352
# decoding should not be allowed.
4453
($result, $stdout, $stderr) = $node_primary->psql(

src/tools/pgindent/typedefs.list

+1
Original file line numberDiff line numberDiff line change
@@ -2129,6 +2129,7 @@ ReadBufferMode
21292129
ReadBytePtrType
21302130
ReadExtraTocPtrType
21312131
ReadFunc
2132+
ReadReplicationSlotCmd
21322133
ReassignOwnedStmt
21332134
RecheckForeignScan_function
21342135
RecordCacheEntry

0 commit comments

Comments
 (0)