Skip to content

Commit 4beb25c

Browse files
committed
Add subtransaction handling for table synchronization workers.
Since the old logic was completely unaware of subtransactions, a change made in a subsequently-aborted subtransaction would still cause workers to be stopped at toplevel transaction commit. Fix that by managing a stack of worker lists rather than just one. Amit Khandekar and Robert Haas Discussion: http://postgr.es/m/CAJ3gD9eaG_mWqiOTA2LfAug-VRNn1hrhf50Xi1YroxL37QkZNg@mail.gmail.com
1 parent 0bb28ca commit 4beb25c

File tree

4 files changed

+112
-8
lines changed

4 files changed

+112
-8
lines changed

src/backend/access/transam/xact.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4542,6 +4542,7 @@ CommitSubTransaction(void)
45424542
AtEOSubXact_HashTables(true, s->nestingLevel);
45434543
AtEOSubXact_PgStat(true, s->nestingLevel);
45444544
AtSubCommit_Snapshot(s->nestingLevel);
4545+
AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
45454546

45464547
/*
45474548
* We need to restore the upper transaction's read-only state, in case the
@@ -4695,6 +4696,7 @@ AbortSubTransaction(void)
46954696
AtEOSubXact_HashTables(false, s->nestingLevel);
46964697
AtEOSubXact_PgStat(false, s->nestingLevel);
46974698
AtSubAbort_Snapshot(s->nestingLevel);
4699+
AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
46984700
}
46994701

47004702
/*

src/backend/replication/logical/launcher.c

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
7979
Oid relid;
8080
} LogicalRepWorkerId;
8181

82-
static List *on_commit_stop_workers = NIL;
82+
typedef struct StopWorkersData
83+
{
84+
int nestDepth; /* Sub-transaction nest level */
85+
List *workers; /* List of LogicalRepWorkerId */
86+
struct StopWorkersData *parent; /* This need not be an immediate
87+
* subtransaction parent */
88+
} StopWorkersData;
89+
90+
/*
91+
* Stack of StopWorkersData elements. Each stack element contains the workers
92+
* to be stopped for that subtransaction.
93+
*/
94+
static StopWorkersData *on_commit_stop_workers = NULL;
8395

8496
static void ApplyLauncherWakeup(void);
8597
static void logicalrep_launcher_onexit(int code, Datum arg);
@@ -558,17 +570,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
558570
void
559571
logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
560572
{
573+
int nestDepth = GetCurrentTransactionNestLevel();
561574
LogicalRepWorkerId *wid;
562575
MemoryContext oldctx;
563576

564577
/* Make sure we store the info in context that survives until commit. */
565578
oldctx = MemoryContextSwitchTo(TopTransactionContext);
566579

580+
/* Check that previous transactions were properly cleaned up. */
581+
Assert(on_commit_stop_workers == NULL ||
582+
nestDepth >= on_commit_stop_workers->nestDepth);
583+
584+
/*
585+
* Push a new stack element if we don't already have one for the current
586+
* nestDepth.
587+
*/
588+
if (on_commit_stop_workers == NULL ||
589+
nestDepth > on_commit_stop_workers->nestDepth)
590+
{
591+
StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
592+
593+
newdata->nestDepth = nestDepth;
594+
newdata->workers = NIL;
595+
newdata->parent = on_commit_stop_workers;
596+
on_commit_stop_workers = newdata;
597+
}
598+
599+
/*
600+
* Finally add a new worker into the worker list of the current
601+
* subtransaction.
602+
*/
567603
wid = palloc(sizeof(LogicalRepWorkerId));
568604
wid->subid = subid;
569605
wid->relid = relid;
570-
571-
on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
606+
on_commit_stop_workers->workers =
607+
lappend(on_commit_stop_workers->workers, wid);
572608

573609
MemoryContextSwitchTo(oldctx);
574610
}
@@ -820,7 +856,7 @@ ApplyLauncherShmemInit(void)
820856
bool
821857
XactManipulatesLogicalReplicationWorkers(void)
822858
{
823-
return (on_commit_stop_workers != NIL);
859+
return (on_commit_stop_workers != NULL);
824860
}
825861

826862
/*
@@ -829,15 +865,25 @@ XactManipulatesLogicalReplicationWorkers(void)
829865
void
830866
AtEOXact_ApplyLauncher(bool isCommit)
831867
{
868+
869+
Assert(on_commit_stop_workers == NULL ||
870+
(on_commit_stop_workers->nestDepth == 1 &&
871+
on_commit_stop_workers->parent == NULL));
872+
832873
if (isCommit)
833874
{
834875
ListCell *lc;
835876

836-
foreach(lc, on_commit_stop_workers)
877+
if (on_commit_stop_workers != NULL)
837878
{
838-
LogicalRepWorkerId *wid = lfirst(lc);
879+
List *workers = on_commit_stop_workers->workers;
880+
881+
foreach(lc, workers)
882+
{
883+
LogicalRepWorkerId *wid = lfirst(lc);
839884

840-
logicalrep_worker_stop(wid->subid, wid->relid);
885+
logicalrep_worker_stop(wid->subid, wid->relid);
886+
}
841887
}
842888

843889
if (on_commit_launcher_wakeup)
@@ -848,10 +894,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
848894
* No need to pfree on_commit_stop_workers. It was allocated in
849895
* transaction memory context, which is going to be cleaned soon.
850896
*/
851-
on_commit_stop_workers = NIL;
897+
on_commit_stop_workers = NULL;
852898
on_commit_launcher_wakeup = false;
853899
}
854900

901+
/*
902+
* On commit, merge the current on_commit_stop_workers list into the
903+
* immediate parent, if present.
904+
* On rollback, discard the current on_commit_stop_workers list.
905+
* Pop out the stack.
906+
*/
907+
void
908+
AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
909+
{
910+
StopWorkersData *parent;
911+
912+
/* Exit immediately if there's no work to do at this level. */
913+
if (on_commit_stop_workers == NULL ||
914+
on_commit_stop_workers->nestDepth < nestDepth)
915+
return;
916+
917+
Assert(on_commit_stop_workers->nestDepth == nestDepth);
918+
919+
parent = on_commit_stop_workers->parent;
920+
921+
if (isCommit)
922+
{
923+
/*
924+
* If the upper stack element is not an immediate parent
925+
* subtransaction, just decrement the notional nesting depth without
926+
* doing any real work. Else, we need to merge the current workers
927+
* list into the parent.
928+
*/
929+
if (!parent || parent->nestDepth < nestDepth - 1)
930+
{
931+
on_commit_stop_workers->nestDepth--;
932+
return;
933+
}
934+
935+
parent->workers =
936+
list_concat(parent->workers, on_commit_stop_workers->workers);
937+
}
938+
else
939+
{
940+
/*
941+
* Abandon everything that was done at this nesting level. Explicitly
942+
* free memory to avoid a transaction-lifespan leak.
943+
*/
944+
list_free_deep(on_commit_stop_workers->workers);
945+
}
946+
947+
/*
948+
* We have taken care of the current subtransaction workers list for both
949+
* abort or commit. So we are ready to pop the stack.
950+
*/
951+
pfree(on_commit_stop_workers);
952+
on_commit_stop_workers = parent;
953+
}
954+
855955
/*
856956
* Request wakeup of the launcher on commit of the transaction.
857957
*

src/include/replication/logicallauncher.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ extern void ApplyLauncherShmemInit(void);
2424
extern void ApplyLauncherWakeupAtCommit(void);
2525
extern bool XactManipulatesLogicalReplicationWorkers(void);
2626
extern void AtEOXact_ApplyLauncher(bool isCommit);
27+
extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
2728

2829
extern bool IsLogicalLauncher(void);
2930

src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2112,6 +2112,7 @@ StdAnalyzeData
21122112
StdRdOptions
21132113
Step
21142114
StopList
2115+
StopWorkersData
21152116
StrategyNumber
21162117
StreamCtl
21172118
StringInfo

0 commit comments

Comments
 (0)