Skip to content

Commit 3e577ff

Browse files
author
Amit Kapila
committed
Optimize the origin drop functionality.
To interlock against concurrent drops, we use to hold ExclusiveLock on pg_replication_origin till xact commit. This blocks even concurrent drops of different origins by tablesync workers. So, instead, lock the specific origin to interlock against concurrent drops. This reduces the test time variability in src/test/subscription where multiple tables are being synced. Author: Vignesh C Reviewed-by: Hou Zhijie, Amit Kapila Discussion: https://postgr.es/m/1412708.1674417574@sss.pgh.pa.us
1 parent 2e9f120 commit 3e577ff

File tree

1 file changed

+33
-29
lines changed

1 file changed

+33
-29
lines changed

src/backend/replication/logical/origin.c

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -338,16 +338,14 @@ replorigin_create(const char *roname)
338338
* Helper function to drop a replication origin.
339339
*/
340340
static void
341-
replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
341+
replorigin_state_clear(RepOriginId roident, bool nowait)
342342
{
343-
HeapTuple tuple;
344343
int i;
345344

346345
/*
347-
* First, clean up the slot state info, if there is any matching slot.
346+
* Clean up the slot state info, if there is any matching slot.
348347
*/
349348
restart:
350-
tuple = NULL;
351349
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
352350

353351
for (i = 0; i < max_replication_slots; i++)
@@ -402,19 +400,6 @@ replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
402400
}
403401
LWLockRelease(ReplicationOriginLock);
404402
ConditionVariableCancelSleep();
405-
406-
/*
407-
* Now, we can delete the catalog entry.
408-
*/
409-
tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
410-
if (!HeapTupleIsValid(tuple))
411-
elog(ERROR, "cache lookup failed for replication origin with ID %d",
412-
roident);
413-
414-
CatalogTupleDelete(rel, &tuple->t_self);
415-
ReleaseSysCache(tuple);
416-
417-
CommandCounterIncrement();
418403
}
419404

420405
/*
@@ -427,24 +412,43 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
427412
{
428413
RepOriginId roident;
429414
Relation rel;
415+
HeapTuple tuple;
430416

431417
Assert(IsTransactionState());
432418

433-
/*
434-
* To interlock against concurrent drops, we hold ExclusiveLock on
435-
* pg_replication_origin till xact commit.
436-
*
437-
* XXX We can optimize this by acquiring the lock on a specific origin by
438-
* using LockSharedObject if required. However, for that, we first to
439-
* acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
440-
* the specific origin and then re-check if the origin still exists.
441-
*/
442-
rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
419+
rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
443420

444421
roident = replorigin_by_name(name, missing_ok);
445422

446-
if (OidIsValid(roident))
447-
replorigin_drop_guts(rel, roident, nowait);
423+
/* Lock the origin to prevent concurrent drops. */
424+
LockSharedObject(ReplicationOriginRelationId, roident, 0,
425+
AccessExclusiveLock);
426+
427+
tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
428+
if (!HeapTupleIsValid(tuple))
429+
{
430+
if (!missing_ok)
431+
elog(ERROR, "cache lookup failed for replication origin with ID %d",
432+
roident);
433+
434+
/*
435+
* We don't need to retain the locks if the origin is already dropped.
436+
*/
437+
UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
438+
AccessExclusiveLock);
439+
table_close(rel, RowExclusiveLock);
440+
return;
441+
}
442+
443+
replorigin_state_clear(roident, nowait);
444+
445+
/*
446+
* Now, we can delete the catalog entry.
447+
*/
448+
CatalogTupleDelete(rel, &tuple->t_self);
449+
ReleaseSysCache(tuple);
450+
451+
CommandCounterIncrement();
448452

449453
/* We keep the lock on pg_replication_origin until commit */
450454
table_close(rel, NoLock);

0 commit comments

Comments
 (0)