Skip to content

Commit 88f4883

Browse files
author
Amit Kapila
committed
Make the tablesync worker's replication origin drop logic robust.
In commit f6c5edb, we started to drop the replication origin slots before tablesync worker exits to avoid consuming more slots than required. We were dropping the replication origin in the same transaction where we were marking the tablesync state as SYNCDONE. Now, if there is any error after we have dropped the origin but before we commit the containing transaction, the in-memory state of replication progress won't be rolled back. Due to this, after the restart, tablesync worker can start streaming from the wrong location and can apply the already processed transaction. To fix this, we need to opportunistically drop the origin after marking the tablesync state as SYNCDONE. Even, if the tablesync worker fails to remove the replication origin before exit, the apply worker ensures to clean it up afterward. Reported by Tom Lane as per buildfarm. Diagnosed-by: Masahiko Sawada Author: Hou Zhijie Reviewed-By: Masahiko Sawada, Amit Kapila Discussion: https://postgr.es/m/20220714115155.GA5439@depesz.com Discussion: https://postgr.es/m/CAD21AoAw0Oofi4kiDpJBOwpYyBBBkJj=sLUOn4Gd2GjUAKG-fw@mail.gmail.com
1 parent 5015e1e commit 88f4883

File tree

2 files changed

+75
-41
lines changed

2 files changed

+75
-41
lines changed

src/backend/commands/subscriptioncmds.c

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -931,19 +931,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
931931
logicalrep_worker_stop(sub->oid, relid);
932932

933933
/*
934-
* For READY state and SYNCDONE state, we would have already
935-
* dropped the tablesync origin.
934+
* For READY state, we would have already dropped the
935+
* tablesync origin.
936936
*/
937-
if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE)
937+
if (state != SUBREL_STATE_READY)
938938
{
939939
char originname[NAMEDATALEN];
940940

941941
/*
942942
* Drop the tablesync's origin tracking if exists.
943943
*
944944
* It is possible that the origin is not yet created for
945-
* tablesync worker so passing missing_ok = true. This can
946-
* happen for the states before SUBREL_STATE_FINISHEDCOPY.
945+
* tablesync worker, this can happen for the states before
946+
* SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
947+
* apply worker can also concurrently try to drop the
948+
* origin and by this time the origin might be already
949+
* removed. For these reasons, passing missing_ok = true.
947950
*/
948951
ReplicationOriginNameForTablesync(sub->oid, relid, originname,
949952
sizeof(originname));
@@ -1516,19 +1519,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
15161519
/*
15171520
* Drop the tablesync's origin tracking if exists.
15181521
*
1519-
* For SYNCDONE/READY states, the tablesync origin tracking is known
1520-
* to have already been dropped by the tablesync worker.
1521-
*
15221522
* It is possible that the origin is not yet created for tablesync
15231523
* worker so passing missing_ok = true. This can happen for the states
15241524
* before SUBREL_STATE_FINISHEDCOPY.
15251525
*/
1526-
if (rstate->state != SUBREL_STATE_SYNCDONE)
1527-
{
1528-
ReplicationOriginNameForTablesync(subid, relid, originname,
1529-
sizeof(originname));
1530-
replorigin_drop_by_name(originname, true, false);
1531-
}
1526+
ReplicationOriginNameForTablesync(subid, relid, originname,
1527+
sizeof(originname));
1528+
replorigin_drop_by_name(originname, true, false);
15321529
}
15331530

15341531
/* Clean up dependencies */

src/backend/replication/logical/tablesync.c

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
300300

301301
/*
302302
* UpdateSubscriptionRelState must be called within a transaction.
303-
* That transaction will be ended within the finish_sync_worker().
304303
*/
305304
if (!IsTransactionState())
306305
StartTransactionCommand();
@@ -310,30 +309,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
310309
MyLogicalRepWorker->relstate,
311310
MyLogicalRepWorker->relstate_lsn);
312311

313-
/*
314-
* Cleanup the tablesync origin tracking.
315-
*
316-
* Resetting the origin session removes the ownership of the slot.
317-
* This is needed to allow the origin to be dropped.
318-
*/
319-
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
320-
MyLogicalRepWorker->relid,
321-
originname,
322-
sizeof(originname));
323-
replorigin_session_reset();
324-
replorigin_session_origin = InvalidRepOriginId;
325-
replorigin_session_origin_lsn = InvalidXLogRecPtr;
326-
replorigin_session_origin_timestamp = 0;
327-
328-
/*
329-
* We expect that origin must be present. The concurrent operations
330-
* that remove origin like a refresh for the subscription take an
331-
* access exclusive lock on pg_subscription which prevent the previous
332-
* operation to update the rel state to SUBREL_STATE_SYNCDONE to
333-
* succeed.
334-
*/
335-
replorigin_drop_by_name(originname, false, false);
336-
337312
/*
338313
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
339314
* the slot.
@@ -343,7 +318,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
343318
/*
344319
* Cleanup the tablesync slot.
345320
*
346-
* This has to be done after the data changes because otherwise if
321+
* This has to be done after updating the state because otherwise if
347322
* there is an error while doing the database operations we won't be
348323
* able to rollback dropped slot.
349324
*/
@@ -359,6 +334,49 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
359334
*/
360335
ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
361336

337+
CommitTransactionCommand();
338+
pgstat_report_stat(false);
339+
340+
/*
341+
* Start a new transaction to clean up the tablesync origin tracking.
342+
* This transaction will be ended within the finish_sync_worker().
343+
* Now, even, if we fail to remove this here, the apply worker will
344+
* ensure to clean it up afterward.
345+
*
346+
* We need to do this after the table state is set to SYNCDONE.
347+
* Otherwise, if an error occurs while performing the database
348+
* operation, the worker will be restarted and the in-memory state of
349+
* replication progress (remote_lsn) won't be rolled-back which would
350+
* have been cleared before restart. So, the restarted worker will use
351+
* invalid replication progress state resulting in replay of
352+
* transactions that have already been applied.
353+
*/
354+
StartTransactionCommand();
355+
356+
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
357+
MyLogicalRepWorker->relid,
358+
originname,
359+
sizeof(originname));
360+
361+
/*
362+
* Resetting the origin session removes the ownership of the slot.
363+
* This is needed to allow the origin to be dropped.
364+
*/
365+
replorigin_session_reset();
366+
replorigin_session_origin = InvalidRepOriginId;
367+
replorigin_session_origin_lsn = InvalidXLogRecPtr;
368+
replorigin_session_origin_timestamp = 0;
369+
370+
/*
371+
* Drop the tablesync's origin tracking if exists.
372+
*
373+
* There is a chance that the user is concurrently performing refresh
374+
* for the subscription where we remove the table state and its origin
375+
* or the apply worker would have removed this origin. So passing
376+
* missing_ok = true.
377+
*/
378+
replorigin_drop_by_name(originname, true, false);
379+
362380
finish_sync_worker();
363381
}
364382
else
@@ -466,6 +484,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
466484
*/
467485
if (current_lsn >= rstate->lsn)
468486
{
487+
char originname[NAMEDATALEN];
488+
469489
rstate->state = SUBREL_STATE_READY;
470490
rstate->lsn = current_lsn;
471491
if (!started_tx)
@@ -475,7 +495,24 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
475495
}
476496

477497
/*
478-
* Update the state to READY.
498+
* Remove the tablesync origin tracking if exists.
499+
*
500+
* There is a chance that the user is concurrently performing
501+
* refresh for the subscription where we remove the table
502+
* state and its origin or the tablesync worker would have
503+
* already removed this origin. We can't rely on tablesync
504+
* worker to remove the origin tracking as if there is any
505+
* error while dropping we won't restart it to drop the
506+
* origin. So passing missing_ok = true.
507+
*/
508+
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
509+
rstate->relid,
510+
originname,
511+
sizeof(originname));
512+
replorigin_drop_by_name(originname, true, false);
513+
514+
/*
515+
* Update the state to READY only after the origin cleanup.
479516
*/
480517
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
481518
rstate->relid, rstate->state,

0 commit comments

Comments
 (0)