@@ -338,16 +338,14 @@ replorigin_create(const char *roname)
338
338
* Helper function to drop a replication origin.
339
339
*/
340
340
static void
341
- replorigin_drop_guts ( Relation rel , RepOriginId roident , bool nowait )
341
+ replorigin_state_clear ( RepOriginId roident , bool nowait )
342
342
{
343
- HeapTuple tuple ;
344
343
int i ;
345
344
346
345
/*
347
- * First, clean up the slot state info, if there is any matching slot.
346
+ * Clean up the slot state info, if there is any matching slot.
348
347
*/
349
348
restart :
350
- tuple = NULL ;
351
349
LWLockAcquire (ReplicationOriginLock , LW_EXCLUSIVE );
352
350
353
351
for (i = 0 ; i < max_replication_slots ; i ++ )
@@ -402,19 +400,6 @@ replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
402
400
}
403
401
LWLockRelease (ReplicationOriginLock );
404
402
ConditionVariableCancelSleep ();
405
-
406
- /*
407
- * Now, we can delete the catalog entry.
408
- */
409
- tuple = SearchSysCache1 (REPLORIGIDENT , ObjectIdGetDatum (roident ));
410
- if (!HeapTupleIsValid (tuple ))
411
- elog (ERROR , "cache lookup failed for replication origin with ID %d" ,
412
- roident );
413
-
414
- CatalogTupleDelete (rel , & tuple -> t_self );
415
- ReleaseSysCache (tuple );
416
-
417
- CommandCounterIncrement ();
418
403
}
419
404
420
405
/*
@@ -427,24 +412,43 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
427
412
{
428
413
RepOriginId roident ;
429
414
Relation rel ;
415
+ HeapTuple tuple ;
430
416
431
417
Assert (IsTransactionState ());
432
418
433
- /*
434
- * To interlock against concurrent drops, we hold ExclusiveLock on
435
- * pg_replication_origin till xact commit.
436
- *
437
- * XXX We can optimize this by acquiring the lock on a specific origin by
438
- * using LockSharedObject if required. However, for that, we first to
439
- * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
440
- * the specific origin and then re-check if the origin still exists.
441
- */
442
- rel = table_open (ReplicationOriginRelationId , ExclusiveLock );
419
+ rel = table_open (ReplicationOriginRelationId , RowExclusiveLock );
443
420
444
421
roident = replorigin_by_name (name , missing_ok );
445
422
446
- if (OidIsValid (roident ))
447
- replorigin_drop_guts (rel , roident , nowait );
423
+ /* Lock the origin to prevent concurrent drops. */
424
+ LockSharedObject (ReplicationOriginRelationId , roident , 0 ,
425
+ AccessExclusiveLock );
426
+
427
+ tuple = SearchSysCache1 (REPLORIGIDENT , ObjectIdGetDatum (roident ));
428
+ if (!HeapTupleIsValid (tuple ))
429
+ {
430
+ if (!missing_ok )
431
+ elog (ERROR , "cache lookup failed for replication origin with ID %d" ,
432
+ roident );
433
+
434
+ /*
435
+ * We don't need to retain the locks if the origin is already dropped.
436
+ */
437
+ UnlockSharedObject (ReplicationOriginRelationId , roident , 0 ,
438
+ AccessExclusiveLock );
439
+ table_close (rel , RowExclusiveLock );
440
+ return ;
441
+ }
442
+
443
+ replorigin_state_clear (roident , nowait );
444
+
445
+ /*
446
+ * Now, we can delete the catalog entry.
447
+ */
448
+ CatalogTupleDelete (rel , & tuple -> t_self );
449
+ ReleaseSysCache (tuple );
450
+
451
+ CommandCounterIncrement ();
448
452
449
453
/* We keep the lock on pg_replication_origin until commit */
450
454
table_close (rel , NoLock );
0 commit comments