@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
79
79
Oid relid ;
80
80
} LogicalRepWorkerId ;
81
81
82
- static List * on_commit_stop_workers = NIL ;
82
+ typedef struct StopWorkersData
83
+ {
84
+ int nestDepth ; /* Sub-transaction nest level */
85
+ List * workers ; /* List of LogicalRepWorkerId */
86
+ struct StopWorkersData * parent ; /* This need not be an immediate
87
+ * subtransaction parent */
88
+ } StopWorkersData ;
89
+
90
+ /*
91
+ * Stack of StopWorkersData elements. Each stack element contains the workers
92
+ * to be stopped for that subtransaction.
93
+ */
94
+ static StopWorkersData * on_commit_stop_workers = NULL ;
83
95
84
96
static void ApplyLauncherWakeup (void );
85
97
static void logicalrep_launcher_onexit (int code , Datum arg );
@@ -558,17 +570,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
558
570
void
559
571
logicalrep_worker_stop_at_commit (Oid subid , Oid relid )
560
572
{
573
+ int nestDepth = GetCurrentTransactionNestLevel ();
561
574
LogicalRepWorkerId * wid ;
562
575
MemoryContext oldctx ;
563
576
564
577
/* Make sure we store the info in context that survives until commit. */
565
578
oldctx = MemoryContextSwitchTo (TopTransactionContext );
566
579
580
+ /* Check that previous transactions were properly cleaned up. */
581
+ Assert (on_commit_stop_workers == NULL ||
582
+ nestDepth >= on_commit_stop_workers -> nestDepth );
583
+
584
+ /*
585
+ * Push a new stack element if we don't already have one for the current
586
+ * nestDepth.
587
+ */
588
+ if (on_commit_stop_workers == NULL ||
589
+ nestDepth > on_commit_stop_workers -> nestDepth )
590
+ {
591
+ StopWorkersData * newdata = palloc (sizeof (StopWorkersData ));
592
+
593
+ newdata -> nestDepth = nestDepth ;
594
+ newdata -> workers = NIL ;
595
+ newdata -> parent = on_commit_stop_workers ;
596
+ on_commit_stop_workers = newdata ;
597
+ }
598
+
599
+ /*
600
+ * Finally add a new worker into the worker list of the current
601
+ * subtransaction.
602
+ */
567
603
wid = palloc (sizeof (LogicalRepWorkerId ));
568
604
wid -> subid = subid ;
569
605
wid -> relid = relid ;
570
-
571
- on_commit_stop_workers = lappend (on_commit_stop_workers , wid );
606
+ on_commit_stop_workers -> workers =
607
+ lappend (on_commit_stop_workers -> workers , wid );
572
608
573
609
MemoryContextSwitchTo (oldctx );
574
610
}
@@ -820,7 +856,7 @@ ApplyLauncherShmemInit(void)
820
856
bool
821
857
XactManipulatesLogicalReplicationWorkers (void )
822
858
{
823
- return (on_commit_stop_workers != NIL );
859
+ return (on_commit_stop_workers != NULL );
824
860
}
825
861
826
862
/*
@@ -829,15 +865,25 @@ XactManipulatesLogicalReplicationWorkers(void)
829
865
void
830
866
AtEOXact_ApplyLauncher (bool isCommit )
831
867
{
868
+
869
+ Assert (on_commit_stop_workers == NULL ||
870
+ (on_commit_stop_workers -> nestDepth == 1 &&
871
+ on_commit_stop_workers -> parent == NULL ));
872
+
832
873
if (isCommit )
833
874
{
834
875
ListCell * lc ;
835
876
836
- foreach ( lc , on_commit_stop_workers )
877
+ if ( on_commit_stop_workers != NULL )
837
878
{
838
- LogicalRepWorkerId * wid = lfirst (lc );
879
+ List * workers = on_commit_stop_workers -> workers ;
880
+
881
+ foreach (lc , workers )
882
+ {
883
+ LogicalRepWorkerId * wid = lfirst (lc );
839
884
840
- logicalrep_worker_stop (wid -> subid , wid -> relid );
885
+ logicalrep_worker_stop (wid -> subid , wid -> relid );
886
+ }
841
887
}
842
888
843
889
if (on_commit_launcher_wakeup )
@@ -848,10 +894,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
848
894
* No need to pfree on_commit_stop_workers. It was allocated in
849
895
* transaction memory context, which is going to be cleaned soon.
850
896
*/
851
- on_commit_stop_workers = NIL ;
897
+ on_commit_stop_workers = NULL ;
852
898
on_commit_launcher_wakeup = false;
853
899
}
854
900
901
+ /*
902
+ * On commit, merge the current on_commit_stop_workers list into the
903
+ * immediate parent, if present.
904
+ * On rollback, discard the current on_commit_stop_workers list.
905
+ * Pop out the stack.
906
+ */
907
+ void
908
+ AtEOSubXact_ApplyLauncher (bool isCommit , int nestDepth )
909
+ {
910
+ StopWorkersData * parent ;
911
+
912
+ /* Exit immediately if there's no work to do at this level. */
913
+ if (on_commit_stop_workers == NULL ||
914
+ on_commit_stop_workers -> nestDepth < nestDepth )
915
+ return ;
916
+
917
+ Assert (on_commit_stop_workers -> nestDepth == nestDepth );
918
+
919
+ parent = on_commit_stop_workers -> parent ;
920
+
921
+ if (isCommit )
922
+ {
923
+ /*
924
+ * If the upper stack element is not an immediate parent
925
+ * subtransaction, just decrement the notional nesting depth without
926
+ * doing any real work. Else, we need to merge the current workers
927
+ * list into the parent.
928
+ */
929
+ if (!parent || parent -> nestDepth < nestDepth - 1 )
930
+ {
931
+ on_commit_stop_workers -> nestDepth -- ;
932
+ return ;
933
+ }
934
+
935
+ parent -> workers =
936
+ list_concat (parent -> workers , on_commit_stop_workers -> workers );
937
+ }
938
+ else
939
+ {
940
+ /*
941
+ * Abandon everything that was done at this nesting level. Explicitly
942
+ * free memory to avoid a transaction-lifespan leak.
943
+ */
944
+ list_free_deep (on_commit_stop_workers -> workers );
945
+ }
946
+
947
+ /*
948
+ * We have taken care of the current subtransaction workers list for both
949
+ * abort or commit. So we are ready to pop the stack.
950
+ */
951
+ pfree (on_commit_stop_workers );
952
+ on_commit_stop_workers = parent ;
953
+ }
954
+
855
955
/*
856
956
* Request wakeup of the launcher on commit of the transaction.
857
957
*
0 commit comments