@@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
227
227
}
228
228
229
229
/*
230
- * Set the state of a subscription table.
231
- *
232
- * If update_only is true and the record for given table doesn't exist, do
233
- * nothing. This can be used to avoid inserting a new record that was deleted
234
- * by someone else. Generally, subscription DDL commands should use false,
235
- * workers should use true.
236
- *
237
- * The insert-or-update logic in this function is not concurrency safe so it
238
- * might raise an error in rare circumstances. But if we took a stronger lock
239
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
230
+ * Add new state record for a subscription table.
240
231
*/
241
232
Oid
242
- SetSubscriptionRelState (Oid subid , Oid relid , char state ,
243
- XLogRecPtr sublsn , bool update_only )
233
+ AddSubscriptionRelState (Oid subid , Oid relid , char state ,
234
+ XLogRecPtr sublsn )
244
235
{
245
236
Relation rel ;
246
237
HeapTuple tup ;
247
- Oid subrelid = InvalidOid ;
238
+ Oid subrelid ;
248
239
bool nulls [Natts_pg_subscription_rel ];
249
240
Datum values [Natts_pg_subscription_rel ];
250
241
@@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
256
247
tup = SearchSysCacheCopy2 (SUBSCRIPTIONRELMAP ,
257
248
ObjectIdGetDatum (relid ),
258
249
ObjectIdGetDatum (subid ));
250
+ if (HeapTupleIsValid (tup ))
251
+ elog (ERROR , "subscription table %u in subscription %u already exists" ,
252
+ relid , subid );
259
253
260
- /*
261
- * If the record for given table does not exist yet create new record,
262
- * otherwise update the existing one.
263
- */
264
- if (!HeapTupleIsValid (tup ) && !update_only )
265
- {
266
- /* Form the tuple. */
267
- memset (values , 0 , sizeof (values ));
268
- memset (nulls , false, sizeof (nulls ));
269
- values [Anum_pg_subscription_rel_srsubid - 1 ] = ObjectIdGetDatum (subid );
270
- values [Anum_pg_subscription_rel_srrelid - 1 ] = ObjectIdGetDatum (relid );
271
- values [Anum_pg_subscription_rel_srsubstate - 1 ] = CharGetDatum (state );
272
- if (sublsn != InvalidXLogRecPtr )
273
- values [Anum_pg_subscription_rel_srsublsn - 1 ] = LSNGetDatum (sublsn );
274
- else
275
- nulls [Anum_pg_subscription_rel_srsublsn - 1 ] = true;
276
-
277
- tup = heap_form_tuple (RelationGetDescr (rel ), values , nulls );
278
-
279
- /* Insert tuple into catalog. */
280
- subrelid = CatalogTupleInsert (rel , tup );
281
-
282
- heap_freetuple (tup );
283
- }
284
- else if (HeapTupleIsValid (tup ))
285
- {
286
- bool replaces [Natts_pg_subscription_rel ];
254
+ /* Form the tuple. */
255
+ memset (values , 0 , sizeof (values ));
256
+ memset (nulls , false, sizeof (nulls ));
257
+ values [Anum_pg_subscription_rel_srsubid - 1 ] = ObjectIdGetDatum (subid );
258
+ values [Anum_pg_subscription_rel_srrelid - 1 ] = ObjectIdGetDatum (relid );
259
+ values [Anum_pg_subscription_rel_srsubstate - 1 ] = CharGetDatum (state );
260
+ if (sublsn != InvalidXLogRecPtr )
261
+ values [Anum_pg_subscription_rel_srsublsn - 1 ] = LSNGetDatum (sublsn );
262
+ else
263
+ nulls [Anum_pg_subscription_rel_srsublsn - 1 ] = true;
287
264
288
- /* Update the tuple. */
289
- memset (values , 0 , sizeof (values ));
290
- memset (nulls , false, sizeof (nulls ));
291
- memset (replaces , false, sizeof (replaces ));
265
+ tup = heap_form_tuple (RelationGetDescr (rel ), values , nulls );
292
266
293
- replaces [ Anum_pg_subscription_rel_srsubstate - 1 ] = true;
294
- values [ Anum_pg_subscription_rel_srsubstate - 1 ] = CharGetDatum ( state );
267
+ /* Insert tuple into catalog. */
268
+ subrelid = CatalogTupleInsert ( rel , tup );
295
269
296
- replaces [Anum_pg_subscription_rel_srsublsn - 1 ] = true;
297
- if (sublsn != InvalidXLogRecPtr )
298
- values [Anum_pg_subscription_rel_srsublsn - 1 ] = LSNGetDatum (sublsn );
299
- else
300
- nulls [Anum_pg_subscription_rel_srsublsn - 1 ] = true;
270
+ heap_freetuple (tup );
301
271
302
- tup = heap_modify_tuple ( tup , RelationGetDescr ( rel ), values , nulls ,
303
- replaces );
272
+ /* Cleanup. */
273
+ heap_close ( rel , NoLock );
304
274
305
- /* Update the catalog. */
306
- CatalogTupleUpdate ( rel , & tup -> t_self , tup );
275
+ return subrelid ;
276
+ }
307
277
308
- subrelid = HeapTupleGetOid (tup );
309
- }
278
+ /*
279
+ * Update the state of a subscription table.
280
+ */
281
+ Oid
282
+ UpdateSubscriptionRelState (Oid subid , Oid relid , char state ,
283
+ XLogRecPtr sublsn )
284
+ {
285
+ Relation rel ;
286
+ HeapTuple tup ;
287
+ Oid subrelid ;
288
+ bool nulls [Natts_pg_subscription_rel ];
289
+ Datum values [Natts_pg_subscription_rel ];
290
+ bool replaces [Natts_pg_subscription_rel ];
291
+
292
+ LockSharedObject (SubscriptionRelationId , subid , 0 , AccessShareLock );
293
+
294
+ rel = heap_open (SubscriptionRelRelationId , RowExclusiveLock );
295
+
296
+ /* Try finding existing mapping. */
297
+ tup = SearchSysCacheCopy2 (SUBSCRIPTIONRELMAP ,
298
+ ObjectIdGetDatum (relid ),
299
+ ObjectIdGetDatum (subid ));
300
+ if (!HeapTupleIsValid (tup ))
301
+ elog (ERROR , "subscription table %u in subscription %u does not exist" ,
302
+ relid , subid );
303
+
304
+ /* Update the tuple. */
305
+ memset (values , 0 , sizeof (values ));
306
+ memset (nulls , false, sizeof (nulls ));
307
+ memset (replaces , false, sizeof (replaces ));
308
+
309
+ replaces [Anum_pg_subscription_rel_srsubstate - 1 ] = true;
310
+ values [Anum_pg_subscription_rel_srsubstate - 1 ] = CharGetDatum (state );
311
+
312
+ replaces [Anum_pg_subscription_rel_srsublsn - 1 ] = true;
313
+ if (sublsn != InvalidXLogRecPtr )
314
+ values [Anum_pg_subscription_rel_srsublsn - 1 ] = LSNGetDatum (sublsn );
315
+ else
316
+ nulls [Anum_pg_subscription_rel_srsublsn - 1 ] = true;
317
+
318
+ tup = heap_modify_tuple (tup , RelationGetDescr (rel ), values , nulls ,
319
+ replaces );
320
+
321
+ /* Update the catalog. */
322
+ CatalogTupleUpdate (rel , & tup -> t_self , tup );
323
+
324
+ subrelid = HeapTupleGetOid (tup );
310
325
311
326
/* Cleanup. */
312
327
heap_close (rel , NoLock );
0 commit comments