Skip to content

Commit 6fcd885

Browse files
committed
Allow pg_create_physical_replication_slot() to reserve WAL.
When creating a physical slot it's often useful to immediately reserve the current WAL position instead of only doing after the first feedback message arrives. That e.g. allows slots to guarantee that all the WAL for a base backup will be available afterwards. Logical slots already have to reserve WAL during creation, so generalize that logic into being usable for both physical and logical slots. Catversion bump because of the new parameter. Author: Gurjeet Singh Reviewed-By: Andres Freund Discussion: CABwTF4Wh_dBCzTU=49pFXR6coR4NW1ynb+vBqT+Po=7fuq5iCw@mail.gmail.com
1 parent 093d0c8 commit 6fcd885

File tree

8 files changed

+107
-52
lines changed

8 files changed

+107
-52
lines changed

doc/src/sgml/func.sgml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17211,7 +17211,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
1721117211
<indexterm>
1721217212
<primary>pg_create_physical_replication_slot</primary>
1721317213
</indexterm>
17214-
<literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>)</function></literal>
17214+
<literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type><optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
1721517215
</entry>
1721617216
<entry>
1721717217
(<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
@@ -17221,7 +17221,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
1722117221
<parameter>slot_name</parameter>. Streaming changes from a physical slot
1722217222
is only possible with the streaming-replication protocol - see <xref
1722317223
linkend="protocol-replication">. Corresponds to the replication protocol
17224-
command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
17224+
command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. The optional
17225+
second parameter, when <literal>true</>, specifies that the <acronym>LSN</>
17226+
for this replication slot be reserved immediately; the <acronym<LSN</>
17227+
is otherwise reserved on first connection from a streaming replication
17228+
client.
1722517229
</entry>
1722617230
</row>
1722717231
<row>

src/backend/catalog/system_views.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,13 @@ LANGUAGE INTERNAL
917917
VOLATILE ROWS 1000 COST 1000
918918
AS 'pg_logical_slot_peek_binary_changes';
919919

920+
CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
921+
IN slot_name name, IN immediately_reserve boolean DEFAULT false,
922+
OUT slot_name name, OUT xlog_position pg_lsn)
923+
RETURNS RECORD
924+
LANGUAGE INTERNAL
925+
AS 'pg_create_physical_replication_slot';
926+
920927
CREATE OR REPLACE FUNCTION
921928
make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
922929
days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,

src/backend/replication/logical/logical.c

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -250,52 +250,7 @@ CreateInitDecodingContext(char *plugin,
250250
StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
251251
SpinLockRelease(&slot->mutex);
252252

253-
/*
254-
* The replication slot mechanism is used to prevent removal of required
255-
* WAL. As there is no interlock between this and checkpoints required WAL
256-
* could be removed before ReplicationSlotsComputeRequiredLSN() has been
257-
* called to prevent that. In the very unlikely case that this happens
258-
* we'll just retry.
259-
*/
260-
while (true)
261-
{
262-
XLogSegNo segno;
263-
264-
/*
265-
* Let's start with enough information if we can, so log a standby
266-
* snapshot and start decoding at exactly that position.
267-
*/
268-
if (!RecoveryInProgress())
269-
{
270-
XLogRecPtr flushptr;
271-
272-
/* start at current insert position */
273-
slot->data.restart_lsn = GetXLogInsertRecPtr();
274-
275-
/* make sure we have enough information to start */
276-
flushptr = LogStandbySnapshot();
277-
278-
/* and make sure it's fsynced to disk */
279-
XLogFlush(flushptr);
280-
}
281-
else
282-
slot->data.restart_lsn = GetRedoRecPtr();
283-
284-
/* prevent WAL removal as fast as possible */
285-
ReplicationSlotsComputeRequiredLSN();
286-
287-
/*
288-
* If all required WAL is still there, great, otherwise retry. The
289-
* slot should prevent further removal of WAL, unless there's a
290-
* concurrent ReplicationSlotsComputeRequiredLSN() after we've written
291-
* the new restart_lsn above, so normally we should never need to loop
292-
* more than twice.
293-
*/
294-
XLByteToSeg(slot->data.restart_lsn, segno);
295-
if (XLogGetLastRemovedSegno() < segno)
296-
break;
297-
}
298-
253+
ReplicationSlotReserveWal();
299254

300255
/* ----
301256
* This is a bit tricky: We need to determine a safe xmin horizon to start

src/backend/replication/slot.c

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include <sys/stat.h>
4141

4242
#include "access/transam.h"
43+
#include "access/xlog_internal.h"
4344
#include "common/string.h"
4445
#include "miscadmin.h"
4546
#include "replication/slot.h"
@@ -781,6 +782,76 @@ CheckSlotRequirements(void)
781782
errmsg("replication slots can only be used if wal_level >= archive")));
782783
}
783784

785+
/*
786+
* Reserve WAL for the currently active slot.
787+
*
788+
* Compute and set restart_lsn in a manner that's appropriate for the type of
789+
* the slot and concurrency safe.
790+
*/
791+
void
792+
ReplicationSlotReserveWal(void)
793+
{
794+
ReplicationSlot *slot = MyReplicationSlot;
795+
796+
Assert(slot != NULL);
797+
Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
798+
799+
/*
800+
* The replication slot mechanism is used to prevent removal of required
801+
* WAL. As there is no interlock between this routine and checkpoints, WAL
802+
* segments could concurrently be removed when a now stale return value of
803+
* ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
804+
* this happens we'll just retry.
805+
*/
806+
while (true)
807+
{
808+
XLogSegNo segno;
809+
810+
/*
811+
* For logical slots log a standby snapshot and start logical decoding
812+
* at exactly that position. That allows the slot to start up more
813+
* quickly.
814+
*
815+
* That's not needed (or indeed helpful) for physical slots as they'll
816+
* start replay at the last logged checkpoint anyway. Instead return
817+
* the location of the last redo LSN. While that slightly increases
818+
* the chance that we have to retry, it's where a base backup has to
819+
* start replay at.
820+
*/
821+
if (!RecoveryInProgress() && SlotIsLogical(slot))
822+
{
823+
XLogRecPtr flushptr;
824+
825+
/* start at current insert position */
826+
slot->data.restart_lsn = GetXLogInsertRecPtr();
827+
828+
/* make sure we have enough information to start */
829+
flushptr = LogStandbySnapshot();
830+
831+
/* and make sure it's fsynced to disk */
832+
XLogFlush(flushptr);
833+
}
834+
else
835+
{
836+
slot->data.restart_lsn = GetRedoRecPtr();
837+
}
838+
839+
/* prevent WAL removal as fast as possible */
840+
ReplicationSlotsComputeRequiredLSN();
841+
842+
/*
843+
* If all required WAL is still there, great, otherwise retry. The
844+
* slot should prevent further removal of WAL, unless there's a
845+
* concurrent ReplicationSlotsComputeRequiredLSN() after we've written
846+
* the new restart_lsn above, so normally we should never need to loop
847+
* more than twice.
848+
*/
849+
XLByteToSeg(slot->data.restart_lsn, segno);
850+
if (XLogGetLastRemovedSegno() < segno)
851+
break;
852+
}
853+
}
854+
784855
/*
785856
* Flush all replication slots to disk.
786857
*

src/backend/replication/slotfuncs.c

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Datum
4040
pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
4141
{
4242
Name name = PG_GETARG_NAME(0);
43+
bool immediately_reserve = PG_GETARG_BOOL(1);
4344
Datum values[2];
4445
bool nulls[2];
4546
TupleDesc tupdesc;
@@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
5960
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
6061

6162
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
62-
6363
nulls[0] = false;
64-
nulls[1] = true;
64+
65+
if (immediately_reserve)
66+
{
67+
/* Reserve WAL as the user asked for it */
68+
ReplicationSlotReserveWal();
69+
70+
/* Write this slot to disk */
71+
ReplicationSlotMarkDirty();
72+
ReplicationSlotSave();
73+
74+
values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
75+
nulls[1] = false;
76+
}
77+
else
78+
{
79+
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
80+
nulls[1] = true;
81+
}
6582

6683
tuple = heap_form_tuple(tupdesc, values, nulls);
6784
result = HeapTupleGetDatum(tuple);

src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@
5353
*/
5454

5555
/* yyyymmddN */
56-
#define CATALOG_VERSION_NO 201508101
56+
#define CATALOG_VERSION_NO 201508111
5757

5858
#endif

src/include/catalog/pg_proc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5193,7 +5193,7 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0
51935193
DESCR("SP-GiST support for quad tree over range");
51945194

51955195
/* replication slots */
5196-
DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
5196+
DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
51975197
DESCR("create a physical replication slot");
51985198
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
51995199
DESCR("drop a replication slot");

src/include/replication/slot.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ extern void ReplicationSlotMarkDirty(void);
166166

167167
/* misc stuff */
168168
extern bool ReplicationSlotValidateName(const char *name, int elevel);
169+
extern void ReplicationSlotReserveWal(void);
169170
extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
170171
extern void ReplicationSlotsComputeRequiredLSN(void);
171172
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);

0 commit comments

Comments
 (0)