Skip to content

Commit 32df1c9

Browse files
committed
Add subtransaction handling for table synchronization workers.
Since the old logic was completely unaware of subtransactions, a change made in a subsequently-aborted subtransaction would still cause workers to be stopped at toplevel transaction commit. Fix that by managing a stack of worker lists rather than just one. Amit Khandekar and Robert Haas Discussion: http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com
1 parent f7cb284 commit 32df1c9

File tree

4 files changed

+112
-8
lines changed

4 files changed

+112
-8
lines changed

src/backend/access/transam/xact.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4637,6 +4637,7 @@ CommitSubTransaction(void)
46374637
AtEOSubXact_HashTables(true, s->nestingLevel);
46384638
AtEOSubXact_PgStat(true, s->nestingLevel);
46394639
AtSubCommit_Snapshot(s->nestingLevel);
4640+
AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
46404641

46414642
/*
46424643
* We need to restore the upper transaction's read-only state, in case the
@@ -4790,6 +4791,7 @@ AbortSubTransaction(void)
47904791
AtEOSubXact_HashTables(false, s->nestingLevel);
47914792
AtEOSubXact_PgStat(false, s->nestingLevel);
47924793
AtSubAbort_Snapshot(s->nestingLevel);
4794+
AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
47934795
}
47944796

47954797
/*

src/backend/replication/logical/launcher.c

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
7979
Oid relid;
8080
} LogicalRepWorkerId;
8181

82-
static List *on_commit_stop_workers = NIL;
82+
typedef struct StopWorkersData
83+
{
84+
int nestDepth; /* Sub-transaction nest level */
85+
List *workers; /* List of LogicalRepWorkerId */
86+
struct StopWorkersData *parent; /* This need not be an immediate
87+
* subtransaction parent */
88+
} StopWorkersData;
89+
90+
/*
91+
* Stack of StopWorkersData elements. Each stack element contains the workers
92+
* to be stopped for that subtransaction.
93+
*/
94+
static StopWorkersData *on_commit_stop_workers = NULL;
8395

8496
static void ApplyLauncherWakeup(void);
8597
static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -559,17 +571,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
559571
void
560572
logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
561573
{
574+
int nestDepth = GetCurrentTransactionNestLevel();
562575
LogicalRepWorkerId *wid;
563576
MemoryContext oldctx;
564577

565578
/* Make sure we store the info in context that survives until commit. */
566579
oldctx = MemoryContextSwitchTo(TopTransactionContext);
567580

581+
/* Check that previous transactions were properly cleaned up. */
582+
Assert(on_commit_stop_workers == NULL ||
583+
nestDepth >= on_commit_stop_workers->nestDepth);
584+
585+
/*
586+
* Push a new stack element if we don't already have one for the current
587+
* nestDepth.
588+
*/
589+
if (on_commit_stop_workers == NULL ||
590+
nestDepth > on_commit_stop_workers->nestDepth)
591+
{
592+
StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
593+
594+
newdata->nestDepth = nestDepth;
595+
newdata->workers = NIL;
596+
newdata->parent = on_commit_stop_workers;
597+
on_commit_stop_workers = newdata;
598+
}
599+
600+
/*
601+
* Finally add a new worker into the worker list of the current
602+
* subtransaction.
603+
*/
568604
wid = palloc(sizeof(LogicalRepWorkerId));
569605
wid->subid = subid;
570606
wid->relid = relid;
571-
572-
on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
607+
on_commit_stop_workers->workers =
608+
lappend(on_commit_stop_workers->workers, wid);
573609

574610
MemoryContextSwitchTo(oldctx);
575611
}
@@ -823,7 +859,7 @@ ApplyLauncherShmemInit(void)
823859
bool
824860
XactManipulatesLogicalReplicationWorkers(void)
825861
{
826-
return (on_commit_stop_workers != NIL);
862+
return (on_commit_stop_workers != NULL);
827863
}
828864

829865
/*
@@ -832,15 +868,25 @@ XactManipulatesLogicalReplicationWorkers(void)
832868
void
833869
AtEOXact_ApplyLauncher(bool isCommit)
834870
{
871+
872+
Assert(on_commit_stop_workers == NULL ||
873+
(on_commit_stop_workers->nestDepth == 1 &&
874+
on_commit_stop_workers->parent == NULL));
875+
835876
if (isCommit)
836877
{
837878
ListCell *lc;
838879

839-
foreach(lc, on_commit_stop_workers)
880+
if (on_commit_stop_workers != NULL)
840881
{
841-
LogicalRepWorkerId *wid = lfirst(lc);
882+
List *workers = on_commit_stop_workers->workers;
883+
884+
foreach(lc, workers)
885+
{
886+
LogicalRepWorkerId *wid = lfirst(lc);
842887

843-
logicalrep_worker_stop(wid->subid, wid->relid);
888+
logicalrep_worker_stop(wid->subid, wid->relid);
889+
}
844890
}
845891

846892
if (on_commit_launcher_wakeup)
@@ -851,10 +897,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
851897
* No need to pfree on_commit_stop_workers. It was allocated in
852898
* transaction memory context, which is going to be cleaned soon.
853899
*/
854-
on_commit_stop_workers = NIL;
900+
on_commit_stop_workers = NULL;
855901
on_commit_launcher_wakeup = false;
856902
}
857903

904+
/*
905+
* On commit, merge the current on_commit_stop_workers list into the
906+
* immediate parent, if present.
907+
* On rollback, discard the current on_commit_stop_workers list.
908+
* Pop out the stack.
909+
*/
910+
void
911+
AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
912+
{
913+
StopWorkersData *parent;
914+
915+
/* Exit immediately if there's no work to do at this level. */
916+
if (on_commit_stop_workers == NULL ||
917+
on_commit_stop_workers->nestDepth < nestDepth)
918+
return;
919+
920+
Assert(on_commit_stop_workers->nestDepth == nestDepth);
921+
922+
parent = on_commit_stop_workers->parent;
923+
924+
if (isCommit)
925+
{
926+
/*
927+
* If the upper stack element is not an immediate parent
928+
* subtransaction, just decrement the notional nesting depth without
929+
* doing any real work. Else, we need to merge the current workers
930+
* list into the parent.
931+
*/
932+
if (!parent || parent->nestDepth < nestDepth - 1)
933+
{
934+
on_commit_stop_workers->nestDepth--;
935+
return;
936+
}
937+
938+
parent->workers =
939+
list_concat(parent->workers, on_commit_stop_workers->workers);
940+
}
941+
else
942+
{
943+
/*
944+
* Abandon everything that was done at this nesting level. Explicitly
945+
* free memory to avoid a transaction-lifespan leak.
946+
*/
947+
list_free_deep(on_commit_stop_workers->workers);
948+
}
949+
950+
/*
951+
* We have taken care of the current subtransaction workers list for both
952+
* abort or commit. So we are ready to pop the stack.
953+
*/
954+
pfree(on_commit_stop_workers);
955+
on_commit_stop_workers = parent;
956+
}
957+
858958
/*
859959
* Request wakeup of the launcher on commit of the transaction.
860960
*

src/include/replication/logicallauncher.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
2424
extern void ApplyLauncherWakeupAtCommit(void);
2525
extern bool XactManipulatesLogicalReplicationWorkers(void);
2626
extern void AtEOXact_ApplyLauncher(bool isCommit);
27+
extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
2728

2829
extern bool IsLogicalLauncher(void);
2930

src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2227,6 +2227,7 @@ StdAnalyzeData
22272227
StdRdOptions
22282228
Step
22292229
StopList
2230+
StopWorkersData
22302231
StrategyNumber
22312232
StreamCtl
22322233
StringInfo

0 commit comments

Comments
 (0)