Skip to content

Commit 15f8203

Browse files
committed
Replace replication slot's invalidated_at LSN with an enum
This is mainly useful because the upcoming logical-decoding-on-standby feature adds further reasons for invalidating slots, and we don't want to end up with multiple invalidated_* fields, or check different attributes. Eventually we should consider not resetting restart_lsn when invalidating a slot due to max_slot_wal_keep_size. But that's a user visible change, so left for later. Increases SLOT_VERSION, due to the changed field (with a different alignment, no less). Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org> Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
1 parent d4e71df commit 15f8203

File tree

4 files changed

+41
-11
lines changed

4 files changed

+41
-11
lines changed

src/backend/replication/slot.c

+24-4
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ typedef struct ReplicationSlotOnDisk
8989
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
9090

9191
#define SLOT_MAGIC 0x1051CA1 /* format identifier */
92-
#define SLOT_VERSION 2 /* version for new files */
92+
#define SLOT_VERSION 3 /* version for new files */
9393

9494
/* Control array for replication slot management */
9595
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -855,8 +855,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
855855
SpinLockAcquire(&s->mutex);
856856
effective_xmin = s->effective_xmin;
857857
effective_catalog_xmin = s->effective_catalog_xmin;
858-
invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
859-
XLogRecPtrIsInvalid(s->data.restart_lsn));
858+
invalidated = s->data.invalidated != RS_INVAL_NONE;
860859
SpinLockRelease(&s->mutex);
861860

862861
/* invalidated slots need not apply */
@@ -901,14 +900,20 @@ ReplicationSlotsComputeRequiredLSN(void)
901900
{
902901
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
903902
XLogRecPtr restart_lsn;
903+
bool invalidated;
904904

905905
if (!s->in_use)
906906
continue;
907907

908908
SpinLockAcquire(&s->mutex);
909909
restart_lsn = s->data.restart_lsn;
910+
invalidated = s->data.invalidated != RS_INVAL_NONE;
910911
SpinLockRelease(&s->mutex);
911912

913+
/* invalidated slots need not apply */
914+
if (invalidated)
915+
continue;
916+
912917
if (restart_lsn != InvalidXLogRecPtr &&
913918
(min_required == InvalidXLogRecPtr ||
914919
restart_lsn < min_required))
@@ -946,6 +951,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
946951
{
947952
ReplicationSlot *s;
948953
XLogRecPtr restart_lsn;
954+
bool invalidated;
949955

950956
s = &ReplicationSlotCtl->replication_slots[i];
951957

@@ -960,8 +966,13 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
960966
/* read once, it's ok if it increases while we're checking */
961967
SpinLockAcquire(&s->mutex);
962968
restart_lsn = s->data.restart_lsn;
969+
invalidated = s->data.invalidated != RS_INVAL_NONE;
963970
SpinLockRelease(&s->mutex);
964971

972+
/* invalidated slots need not apply */
973+
if (invalidated)
974+
continue;
975+
965976
if (restart_lsn == InvalidXLogRecPtr)
966977
continue;
967978

@@ -1012,6 +1023,8 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
10121023
if (s->data.database != dboid)
10131024
continue;
10141025

1026+
/* NB: intentionally counting invalidated slots */
1027+
10151028
/* count slots with spinlock held */
10161029
SpinLockAcquire(&s->mutex);
10171030
(*nslots)++;
@@ -1069,6 +1082,8 @@ ReplicationSlotsDropDBSlots(Oid dboid)
10691082
if (s->data.database != dboid)
10701083
continue;
10711084

1085+
/* NB: intentionally including invalidated slots */
1086+
10721087
/* acquire slot, so ReplicationSlotDropAcquired can be reused */
10731088
SpinLockAcquire(&s->mutex);
10741089
/* can't change while ReplicationSlotControlLock is held */
@@ -1294,7 +1309,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
12941309
{
12951310
MyReplicationSlot = s;
12961311
s->active_pid = MyProcPid;
1297-
s->data.invalidated_at = restart_lsn;
1312+
s->data.invalidated = RS_INVAL_WAL_REMOVED;
1313+
1314+
/*
1315+
* XXX: We should consider not overwriting restart_lsn and instead
1316+
* just rely on .invalidated.
1317+
*/
12981318
s->data.restart_lsn = InvalidXLogRecPtr;
12991319

13001320
/* Let caller know */

src/backend/replication/slotfuncs.c

+3-5
Original file line numberDiff line numberDiff line change
@@ -315,12 +315,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
315315
nulls[i++] = true;
316316

317317
/*
318-
* If invalidated_at is valid and restart_lsn is invalid, we know for
319-
* certain that the slot has been invalidated. Otherwise, test
320-
* availability from restart_lsn.
318+
* If the slot has not been invalidated, test availability from
319+
* restart_lsn.
321320
*/
322-
if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
323-
!XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
321+
if (slot_contents.data.invalidated != RS_INVAL_NONE)
324322
walstate = WALAVAIL_REMOVED;
325323
else
326324
walstate = GetWALAvailability(slot_contents.data.restart_lsn);

src/include/replication/slot.h

+13-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,17 @@ typedef enum ReplicationSlotPersistency
3737
RS_TEMPORARY
3838
} ReplicationSlotPersistency;
3939

40+
/*
41+
* Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
42+
* 'invalidated' field is set to a value other than _NONE.
43+
*/
44+
typedef enum ReplicationSlotInvalidationCause
45+
{
46+
RS_INVAL_NONE,
47+
/* required WAL has been removed */
48+
RS_INVAL_WAL_REMOVED,
49+
} ReplicationSlotInvalidationCause;
50+
4051
/*
4152
* On-Disk data of a replication slot, preserved across restarts.
4253
*/
@@ -72,8 +83,8 @@ typedef struct ReplicationSlotPersistentData
7283
/* oldest LSN that might be required by this replication slot */
7384
XLogRecPtr restart_lsn;
7485

75-
/* restart_lsn is copied here when the slot is invalidated */
76-
XLogRecPtr invalidated_at;
86+
/* RS_INVAL_NONE if valid, or the reason for having been invalidated */
87+
ReplicationSlotInvalidationCause invalidated;
7788

7889
/*
7990
* Oldest LSN that the client has acked receipt for. This is used as the

src/tools/pgindent/typedefs.list

+1
Original file line numberDiff line numberDiff line change
@@ -2339,6 +2339,7 @@ ReplicaIdentityStmt
23392339
ReplicationKind
23402340
ReplicationSlot
23412341
ReplicationSlotCtlData
2342+
ReplicationSlotInvalidationCause
23422343
ReplicationSlotOnDisk
23432344
ReplicationSlotPersistency
23442345
ReplicationSlotPersistentData

0 commit comments

Comments
 (0)