Skip to content

Commit bcf79b5

Browse files
committed
Split the SetSubscriptionRelState function into two
We don't actually need the insert-or-update logic, so it's clearer to have separate functions for the inserting and updating. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
1 parent c25304a commit bcf79b5

File tree

4 files changed

+96
-81
lines changed

4 files changed

+96
-81
lines changed

src/backend/catalog/pg_subscription.c

Lines changed: 72 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
227227
}
228228

229229
/*
230-
* Set the state of a subscription table.
231-
*
232-
* If update_only is true and the record for given table doesn't exist, do
233-
* nothing. This can be used to avoid inserting a new record that was deleted
234-
* by someone else. Generally, subscription DDL commands should use false,
235-
* workers should use true.
236-
*
237-
* The insert-or-update logic in this function is not concurrency safe so it
238-
* might raise an error in rare circumstances. But if we took a stronger lock
239-
* such as ShareRowExclusiveLock, we would risk more deadlocks.
230+
* Add new state record for a subscription table.
240231
*/
241232
Oid
242-
SetSubscriptionRelState(Oid subid, Oid relid, char state,
243-
XLogRecPtr sublsn, bool update_only)
233+
AddSubscriptionRelState(Oid subid, Oid relid, char state,
234+
XLogRecPtr sublsn)
244235
{
245236
Relation rel;
246237
HeapTuple tup;
247-
Oid subrelid = InvalidOid;
238+
Oid subrelid;
248239
bool nulls[Natts_pg_subscription_rel];
249240
Datum values[Natts_pg_subscription_rel];
250241

@@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
256247
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
257248
ObjectIdGetDatum(relid),
258249
ObjectIdGetDatum(subid));
250+
if (HeapTupleIsValid(tup))
251+
elog(ERROR, "subscription table %u in subscription %u already exists",
252+
relid, subid);
259253

260-
/*
261-
* If the record for given table does not exist yet create new record,
262-
* otherwise update the existing one.
263-
*/
264-
if (!HeapTupleIsValid(tup) && !update_only)
265-
{
266-
/* Form the tuple. */
267-
memset(values, 0, sizeof(values));
268-
memset(nulls, false, sizeof(nulls));
269-
values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
270-
values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
271-
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
272-
if (sublsn != InvalidXLogRecPtr)
273-
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
274-
else
275-
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
276-
277-
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
278-
279-
/* Insert tuple into catalog. */
280-
subrelid = CatalogTupleInsert(rel, tup);
281-
282-
heap_freetuple(tup);
283-
}
284-
else if (HeapTupleIsValid(tup))
285-
{
286-
bool replaces[Natts_pg_subscription_rel];
254+
/* Form the tuple. */
255+
memset(values, 0, sizeof(values));
256+
memset(nulls, false, sizeof(nulls));
257+
values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
258+
values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
259+
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
260+
if (sublsn != InvalidXLogRecPtr)
261+
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
262+
else
263+
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
287264

288-
/* Update the tuple. */
289-
memset(values, 0, sizeof(values));
290-
memset(nulls, false, sizeof(nulls));
291-
memset(replaces, false, sizeof(replaces));
265+
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
292266

293-
replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
294-
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
267+
/* Insert tuple into catalog. */
268+
subrelid = CatalogTupleInsert(rel, tup);
295269

296-
replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
297-
if (sublsn != InvalidXLogRecPtr)
298-
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
299-
else
300-
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
270+
heap_freetuple(tup);
301271

302-
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
303-
replaces);
272+
/* Cleanup. */
273+
heap_close(rel, NoLock);
304274

305-
/* Update the catalog. */
306-
CatalogTupleUpdate(rel, &tup->t_self, tup);
275+
return subrelid;
276+
}
307277

308-
subrelid = HeapTupleGetOid(tup);
309-
}
278+
/*
279+
* Update the state of a subscription table.
280+
*/
281+
Oid
282+
UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
283+
XLogRecPtr sublsn)
284+
{
285+
Relation rel;
286+
HeapTuple tup;
287+
Oid subrelid;
288+
bool nulls[Natts_pg_subscription_rel];
289+
Datum values[Natts_pg_subscription_rel];
290+
bool replaces[Natts_pg_subscription_rel];
291+
292+
LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
293+
294+
rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
295+
296+
/* Try finding existing mapping. */
297+
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
298+
ObjectIdGetDatum(relid),
299+
ObjectIdGetDatum(subid));
300+
if (!HeapTupleIsValid(tup))
301+
elog(ERROR, "subscription table %u in subscription %u does not exist",
302+
relid, subid);
303+
304+
/* Update the tuple. */
305+
memset(values, 0, sizeof(values));
306+
memset(nulls, false, sizeof(nulls));
307+
memset(replaces, false, sizeof(replaces));
308+
309+
replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
310+
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
311+
312+
replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
313+
if (sublsn != InvalidXLogRecPtr)
314+
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
315+
else
316+
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
317+
318+
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
319+
replaces);
320+
321+
/* Update the catalog. */
322+
CatalogTupleUpdate(rel, &tup->t_self, tup);
323+
324+
subrelid = HeapTupleGetOid(tup);
310325

311326
/* Cleanup. */
312327
heap_close(rel, NoLock);

src/backend/commands/subscriptioncmds.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
450450
CheckSubscriptionRelkind(get_rel_relkind(relid),
451451
rv->schemaname, rv->relname);
452452

453-
SetSubscriptionRelState(subid, relid, table_state,
454-
InvalidXLogRecPtr, false);
453+
AddSubscriptionRelState(subid, relid, table_state,
454+
InvalidXLogRecPtr);
455455
}
456456

457457
/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
569569
if (!bsearch(&relid, subrel_local_oids,
570570
list_length(subrel_states), sizeof(Oid), oid_cmp))
571571
{
572-
SetSubscriptionRelState(sub->oid, relid,
572+
AddSubscriptionRelState(sub->oid, relid,
573573
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
574-
InvalidXLogRecPtr, false);
574+
InvalidXLogRecPtr);
575575
ereport(DEBUG1,
576576
(errmsg("table \"%s.%s\" added to subscription \"%s\"",
577577
rv->schemaname, rv->relname, sub->name)));

src/backend/replication/logical/tablesync.c

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
298298

299299
SpinLockRelease(&MyLogicalRepWorker->relmutex);
300300

301-
SetSubscriptionRelState(MyLogicalRepWorker->subid,
302-
MyLogicalRepWorker->relid,
303-
MyLogicalRepWorker->relstate,
304-
MyLogicalRepWorker->relstate_lsn,
305-
true);
301+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
302+
MyLogicalRepWorker->relid,
303+
MyLogicalRepWorker->relstate,
304+
MyLogicalRepWorker->relstate_lsn);
306305

307306
walrcv_endstreaming(wrconn, &tli);
308307
finish_sync_worker();
@@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
427426
StartTransactionCommand();
428427
started_tx = true;
429428
}
430-
SetSubscriptionRelState(MyLogicalRepWorker->subid,
431-
rstate->relid, rstate->state,
432-
rstate->lsn, true);
429+
430+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
431+
rstate->relid, rstate->state,
432+
rstate->lsn);
433433
}
434434
}
435435
else
@@ -870,11 +870,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
870870

871871
/* Update the state and make it visible to others. */
872872
StartTransactionCommand();
873-
SetSubscriptionRelState(MyLogicalRepWorker->subid,
874-
MyLogicalRepWorker->relid,
875-
MyLogicalRepWorker->relstate,
876-
MyLogicalRepWorker->relstate_lsn,
877-
true);
873+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
874+
MyLogicalRepWorker->relid,
875+
MyLogicalRepWorker->relstate,
876+
MyLogicalRepWorker->relstate_lsn);
878877
CommitTransactionCommand();
879878
pgstat_report_stat(false);
880879

@@ -961,11 +960,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
961960
* Update the new state in catalog. No need to bother
962961
* with the shmem state as we are exiting for good.
963962
*/
964-
SetSubscriptionRelState(MyLogicalRepWorker->subid,
965-
MyLogicalRepWorker->relid,
966-
SUBREL_STATE_SYNCDONE,
967-
*origin_startpos,
968-
true);
963+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
964+
MyLogicalRepWorker->relid,
965+
SUBREL_STATE_SYNCDONE,
966+
*origin_startpos);
969967
finish_sync_worker();
970968
}
971969
break;

src/include/catalog/pg_subscription_rel.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ typedef struct SubscriptionRelState
6767
char state;
6868
} SubscriptionRelState;
6969

70-
extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
71-
XLogRecPtr sublsn, bool update_only);
70+
extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
71+
XLogRecPtr sublsn);
72+
extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
73+
XLogRecPtr sublsn);
7274
extern char GetSubscriptionRelState(Oid subid, Oid relid,
7375
XLogRecPtr *sublsn, bool missing_ok);
7476
extern void RemoveSubscriptionRel(Oid subid, Oid relid);

0 commit comments

Comments
 (0)