@@ -169,30 +169,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
169
169
}
170
170
171
171
/*
172
- * Make sure that we started local transaction.
172
+ * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
173
173
*
174
- * Also switches to ApplyMessageContext as necessary.
174
+ * Start a transaction, if this is the first step (else we keep using the
175
+ * existing transaction).
176
+ * Also provide a global snapshot and ensure we run in ApplyMessageContext.
175
177
*/
176
- static bool
177
- ensure_transaction (void )
178
+ static void
179
+ begin_replication_step (void )
178
180
{
179
- if (IsTransactionState ())
180
- {
181
- SetCurrentStatementStartTimestamp ();
182
-
183
- if (CurrentMemoryContext != ApplyMessageContext )
184
- MemoryContextSwitchTo (ApplyMessageContext );
181
+ SetCurrentStatementStartTimestamp ();
185
182
186
- return false;
183
+ if (!IsTransactionState ())
184
+ {
185
+ StartTransactionCommand ();
186
+ maybe_reread_subscription ();
187
187
}
188
188
189
- SetCurrentStatementStartTimestamp ();
190
- StartTransactionCommand ();
191
-
192
- maybe_reread_subscription ();
189
+ PushActiveSnapshot (GetTransactionSnapshot ());
193
190
194
191
MemoryContextSwitchTo (ApplyMessageContext );
195
- return true;
192
+ }
193
+
194
+ /*
195
+ * Finish up one step of a replication transaction.
196
+ * Callers of begin_replication_step() must also call this.
197
+ *
198
+ * We don't close out the transaction here, but we should increment
199
+ * the command counter to make the effects of this step visible.
200
+ */
201
+ static void
202
+ end_replication_step (void )
203
+ {
204
+ PopActiveSnapshot ();
205
+
206
+ CommandCounterIncrement ();
196
207
}
197
208
198
209
@@ -210,13 +221,6 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel)
210
221
ResultRelInfo * resultRelInfo ;
211
222
RangeTblEntry * rte ;
212
223
213
- /*
214
- * Input functions may need an active snapshot, as may AFTER triggers
215
- * invoked during finish_edata. For safety, ensure an active snapshot
216
- * exists throughout all our usage of the executor.
217
- */
218
- PushActiveSnapshot (GetTransactionSnapshot ());
219
-
220
224
edata = (ApplyExecutionData * ) palloc0 (sizeof (ApplyExecutionData ));
221
225
edata -> targetRel = rel ;
222
226
@@ -277,8 +281,6 @@ finish_edata(ApplyExecutionData *edata)
277
281
ExecResetTupleTable (estate -> es_tupleTable , false);
278
282
FreeExecutorState (estate );
279
283
pfree (edata );
280
-
281
- PopActiveSnapshot ();
282
284
}
283
285
284
286
/*
@@ -673,7 +675,7 @@ apply_handle_insert(StringInfo s)
673
675
TupleTableSlot * remoteslot ;
674
676
MemoryContext oldctx ;
675
677
676
- ensure_transaction ();
678
+ begin_replication_step ();
677
679
678
680
relid = logicalrep_read_insert (s , & newtup );
679
681
rel = logicalrep_rel_open (relid , RowExclusiveLock );
@@ -684,6 +686,7 @@ apply_handle_insert(StringInfo s)
684
686
* transaction so it's safe to unlock it.
685
687
*/
686
688
logicalrep_rel_close (rel , RowExclusiveLock );
689
+ end_replication_step ();
687
690
return ;
688
691
}
689
692
@@ -712,7 +715,7 @@ apply_handle_insert(StringInfo s)
712
715
713
716
logicalrep_rel_close (rel , NoLock );
714
717
715
- CommandCounterIncrement ();
718
+ end_replication_step ();
716
719
}
717
720
718
721
/* Workhorse for apply_handle_insert() */
@@ -781,7 +784,7 @@ apply_handle_update(StringInfo s)
781
784
RangeTblEntry * target_rte ;
782
785
MemoryContext oldctx ;
783
786
784
- ensure_transaction ();
787
+ begin_replication_step ();
785
788
786
789
relid = logicalrep_read_update (s , & has_oldtup , & oldtup ,
787
790
& newtup );
@@ -793,6 +796,7 @@ apply_handle_update(StringInfo s)
793
796
* transaction so it's safe to unlock it.
794
797
*/
795
798
logicalrep_rel_close (rel , RowExclusiveLock );
799
+ end_replication_step ();
796
800
return ;
797
801
}
798
802
@@ -849,7 +853,7 @@ apply_handle_update(StringInfo s)
849
853
850
854
logicalrep_rel_close (rel , NoLock );
851
855
852
- CommandCounterIncrement ();
856
+ end_replication_step ();
853
857
}
854
858
855
859
/* Workhorse for apply_handle_update() */
@@ -925,7 +929,7 @@ apply_handle_delete(StringInfo s)
925
929
TupleTableSlot * remoteslot ;
926
930
MemoryContext oldctx ;
927
931
928
- ensure_transaction ();
932
+ begin_replication_step ();
929
933
930
934
relid = logicalrep_read_delete (s , & oldtup );
931
935
rel = logicalrep_rel_open (relid , RowExclusiveLock );
@@ -936,6 +940,7 @@ apply_handle_delete(StringInfo s)
936
940
* transaction so it's safe to unlock it.
937
941
*/
938
942
logicalrep_rel_close (rel , RowExclusiveLock );
943
+ end_replication_step ();
939
944
return ;
940
945
}
941
946
@@ -966,7 +971,7 @@ apply_handle_delete(StringInfo s)
966
971
967
972
logicalrep_rel_close (rel , NoLock );
968
973
969
- CommandCounterIncrement ();
974
+ end_replication_step ();
970
975
}
971
976
972
977
/* Workhorse for apply_handle_delete() */
@@ -1291,7 +1296,7 @@ apply_handle_truncate(StringInfo s)
1291
1296
ListCell * lc ;
1292
1297
LOCKMODE lockmode = AccessExclusiveLock ;
1293
1298
1294
- ensure_transaction ();
1299
+ begin_replication_step ();
1295
1300
1296
1301
remote_relids = logicalrep_read_truncate (s , & cascade , & restart_seqs );
1297
1302
@@ -1379,7 +1384,7 @@ apply_handle_truncate(StringInfo s)
1379
1384
table_close (rel , NoLock );
1380
1385
}
1381
1386
1382
- CommandCounterIncrement ();
1387
+ end_replication_step ();
1383
1388
}
1384
1389
1385
1390
0 commit comments