Skip to content

Commit 3d474a0

Browse files
committed
Don't call palloc() while holding a spinlock, either.
Fix some more violations of the "only straight-line code inside a spinlock" rule. These are hazardous not only because they risk holding the lock for an excessively long time, but because it's possible for palloc to throw elog(ERROR), leaving a stuck spinlock behind. copy_replication_slot() had two separate places that did pallocs while holding a spinlock. We can make the code simpler and safer by copying the whole ReplicationSlot struct into a local variable while holding the spinlock, and then referencing that copy. (While that's arguably more cycles than we really need to spend holding the lock, the struct isn't all that big, and this way seems far more maintainable than copying fields piecemeal. Anyway this is surely much cheaper than a palloc.) That bug goes back to v12. InvalidateObsoleteReplicationSlots() not only did a palloc while holding a spinlock, but for extra sloppiness then leaked the memory --- probably for the lifetime of the checkpointer process, though I didn't try to verify that. Fortunately that silliness is new in HEAD. pg_get_replication_slots() had a cosmetic violation of the rule, in that it only assumed it's safe to call namecpy() while holding a spinlock. Still, that's a hazard waiting to bite somebody, and there were some other cosmetic coding-rule violations in the same function, so clean it up. I back-patched this as far as v10; the code exists before that but it looks different, and this didn't seem important enough to adapt the patch further back. Discussion: https://postgr.es/m/20200602.161518.1399689010416646074.horikyota.ntt@gmail.com
1 parent 53f32ea commit 3d474a0

File tree

2 files changed

+47
-55
lines changed

2 files changed

+47
-55
lines changed

src/backend/replication/slotfuncs.c

Lines changed: 45 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -277,87 +277,73 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
277277
for (slotno = 0; slotno < max_replication_slots; slotno++)
278278
{
279279
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
280+
ReplicationSlot slot_contents;
280281
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
281282
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
282-
283-
ReplicationSlotPersistency persistency;
284-
TransactionId xmin;
285-
TransactionId catalog_xmin;
286-
XLogRecPtr restart_lsn;
287-
XLogRecPtr confirmed_flush_lsn;
288-
pid_t active_pid;
289-
Oid database;
290-
NameData slot_name;
291-
NameData plugin;
292283
int i;
293284

294285
if (!slot->in_use)
295286
continue;
296287

288+
/* Copy slot contents while holding spinlock, then examine at leisure */
297289
SpinLockAcquire(&slot->mutex);
298-
299-
xmin = slot->data.xmin;
300-
catalog_xmin = slot->data.catalog_xmin;
301-
database = slot->data.database;
302-
restart_lsn = slot->data.restart_lsn;
303-
confirmed_flush_lsn = slot->data.confirmed_flush;
304-
namecpy(&slot_name, &slot->data.name);
305-
namecpy(&plugin, &slot->data.plugin);
306-
active_pid = slot->active_pid;
307-
persistency = slot->data.persistency;
308-
290+
slot_contents = *slot;
309291
SpinLockRelease(&slot->mutex);
310292

293+
memset(values, 0, sizeof(values));
311294
memset(nulls, 0, sizeof(nulls));
312295

313296
i = 0;
314-
values[i++] = NameGetDatum(&slot_name);
297+
values[i++] = NameGetDatum(&slot_contents.data.name);
315298

316-
if (database == InvalidOid)
299+
if (slot_contents.data.database == InvalidOid)
317300
nulls[i++] = true;
318301
else
319-
values[i++] = NameGetDatum(&plugin);
302+
values[i++] = NameGetDatum(&slot_contents.data.plugin);
320303

321-
if (database == InvalidOid)
304+
if (slot_contents.data.database == InvalidOid)
322305
values[i++] = CStringGetTextDatum("physical");
323306
else
324307
values[i++] = CStringGetTextDatum("logical");
325308

326-
if (database == InvalidOid)
309+
if (slot_contents.data.database == InvalidOid)
327310
nulls[i++] = true;
328311
else
329-
values[i++] = database;
312+
values[i++] = ObjectIdGetDatum(slot_contents.data.database);
330313

331-
values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
332-
values[i++] = BoolGetDatum(active_pid != 0);
314+
values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
315+
values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
333316

334-
if (active_pid != 0)
335-
values[i++] = Int32GetDatum(active_pid);
317+
if (slot_contents.active_pid != 0)
318+
values[i++] = Int32GetDatum(slot_contents.active_pid);
336319
else
337320
nulls[i++] = true;
338321

339-
if (xmin != InvalidTransactionId)
340-
values[i++] = TransactionIdGetDatum(xmin);
322+
if (slot_contents.data.xmin != InvalidTransactionId)
323+
values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
341324
else
342325
nulls[i++] = true;
343326

344-
if (catalog_xmin != InvalidTransactionId)
345-
values[i++] = TransactionIdGetDatum(catalog_xmin);
327+
if (slot_contents.data.catalog_xmin != InvalidTransactionId)
328+
values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
346329
else
347330
nulls[i++] = true;
348331

349-
if (restart_lsn != InvalidXLogRecPtr)
350-
values[i++] = LSNGetDatum(restart_lsn);
332+
if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
333+
values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
351334
else
352335
nulls[i++] = true;
353336

354-
if (confirmed_flush_lsn != InvalidXLogRecPtr)
355-
values[i++] = LSNGetDatum(confirmed_flush_lsn);
337+
if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
338+
values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
356339
else
357340
nulls[i++] = true;
358341

342+
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
343+
359344
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
360345
}
346+
361347
LWLockRelease(ReplicationSlotControlLock);
362348

363349
tuplestore_donestoring(tupstore);
@@ -616,6 +602,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
616602
Name src_name = PG_GETARG_NAME(0);
617603
Name dst_name = PG_GETARG_NAME(1);
618604
ReplicationSlot *src = NULL;
605+
ReplicationSlot first_slot_contents;
606+
ReplicationSlot second_slot_contents;
619607
XLogRecPtr src_restart_lsn;
620608
bool src_islogical;
621609
bool temporary;
@@ -655,13 +643,10 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
655643

656644
if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
657645
{
646+
/* Copy the slot contents while holding spinlock */
658647
SpinLockAcquire(&s->mutex);
659-
src_islogical = SlotIsLogical(s);
660-
src_restart_lsn = s->data.restart_lsn;
661-
temporary = s->data.persistency == RS_TEMPORARY;
662-
plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
648+
first_slot_contents = *s;
663649
SpinLockRelease(&s->mutex);
664-
665650
src = s;
666651
break;
667652
}
@@ -674,6 +659,11 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
674659
(errcode(ERRCODE_UNDEFINED_OBJECT),
675660
errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
676661

662+
src_islogical = SlotIsLogical(&first_slot_contents);
663+
src_restart_lsn = first_slot_contents.data.restart_lsn;
664+
temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
665+
plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
666+
677667
/* Check type of replication slot */
678668
if (src_islogical != logical_slot)
679669
ereport(ERROR,
@@ -738,18 +728,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
738728

739729
/* Copy data of source slot again */
740730
SpinLockAcquire(&src->mutex);
741-
copy_effective_xmin = src->effective_xmin;
742-
copy_effective_catalog_xmin = src->effective_catalog_xmin;
731+
second_slot_contents = *src;
732+
SpinLockRelease(&src->mutex);
743733

744-
copy_xmin = src->data.xmin;
745-
copy_catalog_xmin = src->data.catalog_xmin;
746-
copy_restart_lsn = src->data.restart_lsn;
747-
copy_confirmed_flush = src->data.confirmed_flush;
734+
copy_effective_xmin = second_slot_contents.effective_xmin;
735+
copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
736+
737+
copy_xmin = second_slot_contents.data.xmin;
738+
copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
739+
copy_restart_lsn = second_slot_contents.data.restart_lsn;
740+
copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
748741

749742
/* for existence check */
750-
copy_name = pstrdup(NameStr(src->data.name));
751-
copy_islogical = SlotIsLogical(src);
752-
SpinLockRelease(&src->mutex);
743+
copy_name = NameStr(second_slot_contents.data.name);
744+
copy_islogical = SlotIsLogical(&second_slot_contents);
753745

754746
/*
755747
* Check if the source slot still exists and is valid. We regard it as

src/include/replication/slot.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ typedef struct ReplicationSlot
151151
XLogRecPtr candidate_restart_lsn;
152152
} ReplicationSlot;
153153

154-
#define SlotIsPhysical(slot) (slot->data.database == InvalidOid)
155-
#define SlotIsLogical(slot) (slot->data.database != InvalidOid)
154+
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
155+
#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
156156

157157
/*
158158
* Shared memory control area for all of replication slots.

0 commit comments

Comments
 (0)