18
18
19
19
#include "access/heapam.h"
20
20
#include "access/htup_details.h"
21
+ #include "access/xact.h"
21
22
22
23
#include "catalog/indexing.h"
23
24
#include "catalog/objectaccess.h"
@@ -204,7 +205,7 @@ publicationListToArray(List *publist)
204
205
* Create new subscription.
205
206
*/
206
207
ObjectAddress
207
- CreateSubscription (CreateSubscriptionStmt * stmt )
208
+ CreateSubscription (CreateSubscriptionStmt * stmt , bool isTopLevel )
208
209
{
209
210
Relation rel ;
210
211
ObjectAddress myself ;
@@ -221,6 +222,23 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
221
222
bool create_slot ;
222
223
List * publications ;
223
224
225
+ /*
226
+ * Parse and check options.
227
+ * Connection and publication should not be specified here.
228
+ */
229
+ parse_subscription_options (stmt -> options , NULL , NULL ,
230
+ & enabled_given , & enabled ,
231
+ & create_slot , & slotname );
232
+
233
+ /*
234
+ * Since creating a replication slot is not transactional, rolling back
235
+ * the transaction leaves the created replication slot. So we cannot run
236
+ * CREATE SUBSCRIPTION inside a transaction block if creating a
237
+ * replication slot.
238
+ */
239
+ if (create_slot )
240
+ PreventTransactionChain (isTopLevel , "CREATE SUBSCRIPTION ... CREATE SLOT" );
241
+
224
242
if (!superuser ())
225
243
ereport (ERROR ,
226
244
(errcode (ERRCODE_INSUFFICIENT_PRIVILEGE ),
@@ -239,13 +257,6 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
239
257
stmt -> subname )));
240
258
}
241
259
242
- /*
243
- * Parse and check options.
244
- * Connection and publication should not be specified here.
245
- */
246
- parse_subscription_options (stmt -> options , NULL , NULL ,
247
- & enabled_given , & enabled ,
248
- & create_slot , & slotname );
249
260
if (slotname == NULL )
250
261
slotname = stmt -> subname ;
251
262
@@ -424,7 +435,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
424
435
* Drop a subscription
425
436
*/
426
437
void
427
- DropSubscription (DropSubscriptionStmt * stmt )
438
+ DropSubscription (DropSubscriptionStmt * stmt , bool isTopLevel )
428
439
{
429
440
Relation rel ;
430
441
ObjectAddress myself ;
@@ -441,6 +452,15 @@ DropSubscription(DropSubscriptionStmt *stmt)
441
452
WalReceiverConn * wrconn = NULL ;
442
453
StringInfoData cmd ;
443
454
455
+ /*
456
+ * Since dropping a replication slot is not transactional, the replication
457
+ * slot stays dropped even if the transaction rolls back. So we cannot
458
+ * run DROP SUBSCRIPTION inside a transaction block if dropping the
459
+ * replication slot.
460
+ */
461
+ if (stmt -> drop_slot )
462
+ PreventTransactionChain (isTopLevel , "DROP SUBSCRIPTION ... DROP SLOT" );
463
+
444
464
rel = heap_open (SubscriptionRelationId , RowExclusiveLock );
445
465
446
466
tup = SearchSysCache2 (SUBSCRIPTIONNAME , MyDatabaseId ,
0 commit comments