Skip to content

Commit d811c03

Browse files
committed
Add 'active_in' column to pg_replication_slots.
Right now it is visible whether a replication slot is active in any session, but not in which. Adding the active_in column, containing the pid of the backend having acquired the slot, makes it much easier to associate pg_replication_slots entries with the corresponding pg_stat_replication/pg_stat_activity row. This should have been done from the start, but I (Andres) dropped the ball there somehow. Author: Craig Ringer, revised by me Discussion: CAMsr+YFKgZca5_7_ouaMWxA5PneJC9LNViPzpDHusaPhU9pA7g@mail.gmail.com
1 parent 528c2e4 commit d811c03

File tree

9 files changed

+47
-28
lines changed

9 files changed

+47
-28
lines changed

contrib/test_decoding/expected/ddl.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ SELECT pg_drop_replication_slot('regression_slot');
603603

604604
/* check that the slot is gone */
605605
SELECT * FROM pg_replication_slots;
606-
slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn
607-
-----------+--------+-----------+--------+----------+--------+------+--------------+-------------
606+
slot_name | plugin | slot_type | datoid | database | active | active_in | xmin | catalog_xmin | restart_lsn
607+
-----------+--------+-----------+--------+----------+--------+-----------+------+--------------+-------------
608608
(0 rows)
609609

doc/src/sgml/catalogs.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5400,6 +5400,16 @@
54005400
<entry>True if this slot is currently actively being used</entry>
54015401
</row>
54025402

5403+
<row>
5404+
<entry><structfield>active_in</structfield></entry>
5405+
<entry><type>integer</type></entry>
5406+
<entry></entry>
5407+
<entry>The process ID of the session using this slot if the slot
5408+
is currently actively being used. <literal>NULL</literal> if
5409+
inactive.
5410+
</entry>
5411+
</row>
5412+
54035413
<row>
54045414
<entry><structfield>xmin</structfield></entry>
54055415
<entry><type>xid</type></entry>

doc/src/sgml/logicaldecoding.sgml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', '
6262
regression_slot | 0/16B1970
6363
(1 row)
6464

65-
postgres=# SELECT * FROM pg_replication_slots;
66-
slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn
67-
-----------------+---------------+-----------+--------+----------+--------+--------+--------------+-------------
68-
regression_slot | test_decoding | logical | 12052 | postgres | f | | 684 | 0/16A4408
65+
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots;
66+
slot_name | plugin | slot_type | database | active | restart_lsn
67+
-----------------+---------------+-----------+----------+--------+-------------
68+
regression_slot | test_decoding | logical | postgres | f | 0/16A4408
6969
(1 row)
7070

7171
postgres=# -- There are no changes to see yet

src/backend/catalog/system_views.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,7 @@ CREATE VIEW pg_replication_slots AS
665665
L.datoid,
666666
D.datname AS database,
667667
L.active,
668+
L.active_in,
668669
L.xmin,
669670
L.catalog_xmin,
670671
L.restart_lsn

src/backend/replication/slot.c

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
262262
* be doing that. So it's safe to initialize the slot.
263263
*/
264264
Assert(!slot->in_use);
265-
Assert(!slot->active);
265+
Assert(slot->active_pid == 0);
266266
slot->data.persistency = persistency;
267267
slot->data.xmin = InvalidTransactionId;
268268
slot->effective_xmin = InvalidTransactionId;
@@ -291,8 +291,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
291291
volatile ReplicationSlot *vslot = slot;
292292

293293
SpinLockAcquire(&slot->mutex);
294-
Assert(!vslot->active);
295-
vslot->active = true;
294+
Assert(vslot->active_pid == 0);
295+
vslot->active_pid = MyProcPid;
296296
SpinLockRelease(&slot->mutex);
297297
MyReplicationSlot = slot;
298298
}
@@ -314,7 +314,7 @@ ReplicationSlotAcquire(const char *name)
314314
{
315315
ReplicationSlot *slot = NULL;
316316
int i;
317-
bool active = false;
317+
int active_pid = 0;
318318

319319
Assert(MyReplicationSlot == NULL);
320320

@@ -331,8 +331,9 @@ ReplicationSlotAcquire(const char *name)
331331
volatile ReplicationSlot *vslot = s;
332332

333333
SpinLockAcquire(&s->mutex);
334-
active = vslot->active;
335-
vslot->active = true;
334+
active_pid = vslot->active_pid;
335+
if (active_pid == 0)
336+
vslot->active_pid = MyProcPid;
336337
SpinLockRelease(&s->mutex);
337338
slot = s;
338339
break;
@@ -345,10 +346,11 @@ ReplicationSlotAcquire(const char *name)
345346
ereport(ERROR,
346347
(errcode(ERRCODE_UNDEFINED_OBJECT),
347348
errmsg("replication slot \"%s\" does not exist", name)));
348-
if (active)
349+
if (active_pid != 0)
349350
ereport(ERROR,
350351
(errcode(ERRCODE_OBJECT_IN_USE),
351-
errmsg("replication slot \"%s\" is already active", name)));
352+
errmsg("replication slot \"%s\" is already active for pid %d",
353+
name, active_pid)));
352354

353355
/* We made this slot active, so it's ours now. */
354356
MyReplicationSlot = slot;
@@ -363,7 +365,7 @@ ReplicationSlotRelease(void)
363365
{
364366
ReplicationSlot *slot = MyReplicationSlot;
365367

366-
Assert(slot != NULL && slot->active);
368+
Assert(slot != NULL && slot->active_pid != 0);
367369

368370
if (slot->data.persistency == RS_EPHEMERAL)
369371
{
@@ -380,7 +382,7 @@ ReplicationSlotRelease(void)
380382
volatile ReplicationSlot *vslot = slot;
381383

382384
SpinLockAcquire(&slot->mutex);
383-
vslot->active = false;
385+
vslot->active_pid = 0;
384386
SpinLockRelease(&slot->mutex);
385387
}
386388

@@ -460,7 +462,7 @@ ReplicationSlotDropAcquired(void)
460462
bool fail_softly = slot->data.persistency == RS_EPHEMERAL;
461463

462464
SpinLockAcquire(&slot->mutex);
463-
vslot->active = false;
465+
vslot->active_pid = 0;
464466
SpinLockRelease(&slot->mutex);
465467

466468
ereport(fail_softly ? WARNING : ERROR,
@@ -477,7 +479,7 @@ ReplicationSlotDropAcquired(void)
477479
* scanning the array.
478480
*/
479481
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
480-
slot->active = false;
482+
slot->active_pid = 0;
481483
slot->in_use = false;
482484
LWLockRelease(ReplicationSlotControlLock);
483485

@@ -749,7 +751,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
749751
/* count slots with spinlock held */
750752
SpinLockAcquire(&s->mutex);
751753
(*nslots)++;
752-
if (s->active)
754+
if (s->active_pid != 0)
753755
(*nactive)++;
754756
SpinLockRelease(&s->mutex);
755757
}
@@ -1227,7 +1229,7 @@ RestoreSlotFromDisk(const char *name)
12271229
slot->candidate_restart_valid = InvalidXLogRecPtr;
12281230

12291231
slot->in_use = true;
1230-
slot->active = false;
1232+
slot->active_pid = 0;
12311233

12321234
restored = true;
12331235
break;

src/backend/replication/slotfuncs.c

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
158158
Datum
159159
pg_get_replication_slots(PG_FUNCTION_ARGS)
160160
{
161-
#define PG_GET_REPLICATION_SLOTS_COLS 8
161+
#define PG_GET_REPLICATION_SLOTS_COLS 9
162162
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
163163
TupleDesc tupdesc;
164164
Tuplestorestate *tupstore;
@@ -206,7 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
206206
TransactionId xmin;
207207
TransactionId catalog_xmin;
208208
XLogRecPtr restart_lsn;
209-
bool active;
209+
pid_t active_pid;
210210
Oid database;
211211
NameData slot_name;
212212
NameData plugin;
@@ -227,7 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
227227
namecpy(&slot_name, &slot->data.name);
228228
namecpy(&plugin, &slot->data.plugin);
229229

230-
active = slot->active;
230+
active_pid = slot->active_pid;
231231
}
232232
SpinLockRelease(&slot->mutex);
233233

@@ -251,7 +251,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
251251
else
252252
values[i++] = database;
253253

254-
values[i++] = BoolGetDatum(active);
254+
values[i++] = BoolGetDatum(active_pid != 0);
255+
256+
if (active_pid != 0)
257+
values[i++] = Int32GetDatum(active_pid);
258+
else
259+
nulls[i++] = true;
255260

256261
if (xmin != InvalidTransactionId)
257262
values[i++] = TransactionIdGetDatum(xmin);

src/include/catalog/pg_proc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5106,7 +5106,7 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0
51065106
DESCR("create a physical replication slot");
51075107
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
51085108
DESCR("drop a replication slot");
5109-
DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
5109+
DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_in,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
51105110
DESCR("information about replication slots currently in use");
51115111
DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
51125112
DESCR("set up a logical replication slot");

src/include/replication/slot.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ typedef struct ReplicationSlot
8484
/* is this slot defined */
8585
bool in_use;
8686

87-
/* is somebody streaming out changes for this slot */
88-
bool active;
87+
/* Who is streaming out changes for this slot? 0 in unused slots. */
88+
pid_t active_pid;
8989

9090
/* any outstanding modifications? */
9191
bool just_dirtied;

src/test/regress/expected/rules.out

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1396,10 +1396,11 @@ pg_replication_slots| SELECT l.slot_name,
13961396
l.datoid,
13971397
d.datname AS database,
13981398
l.active,
1399+
l.active_in,
13991400
l.xmin,
14001401
l.catalog_xmin,
14011402
l.restart_lsn
1402-
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, xmin, catalog_xmin, restart_lsn)
1403+
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_in, xmin, catalog_xmin, restart_lsn)
14031404
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
14041405
pg_roles| SELECT pg_authid.rolname,
14051406
pg_authid.rolsuper,

0 commit comments

Comments
 (0)