Skip to content

Commit 6e43f1c

Browse files
committed
Rearrange logrep worker's snapshot handling some more.
It turns out that worker.c's code path for TRUNCATE was also careless about establishing a snapshot while executing user-defined code, allowing the checks added by commit 84f5c29 to fail when a trigger is fired in that context. We could just wrap Push/PopActiveSnapshot around the truncate call, but it seems better to establish a policy of holding a snapshot throughout execution of a replication step. To help with that and possible future requirements, replace the previous ensure_transaction calls with pairs of begin/end_replication_step calls. Per report from Mark Dilger. Back-patch to v11, like the previous changes. Discussion: https://postgr.es/m/B4A3AF82-79ED-4F4C-A4E5-CD2622098972@enterprisedb.com
1 parent 3465328 commit 6e43f1c

File tree

1 file changed

+38
-33
lines changed

1 file changed

+38
-33
lines changed

src/backend/replication/logical/worker.c

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -169,30 +169,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
169169
}
170170

171171
/*
172-
* Make sure that we started local transaction.
172+
* Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
173173
*
174-
* Also switches to ApplyMessageContext as necessary.
174+
* Start a transaction, if this is the first step (else we keep using the
175+
* existing transaction).
176+
* Also provide a global snapshot and ensure we run in ApplyMessageContext.
175177
*/
176-
static bool
177-
ensure_transaction(void)
178+
static void
179+
begin_replication_step(void)
178180
{
179-
if (IsTransactionState())
180-
{
181-
SetCurrentStatementStartTimestamp();
182-
183-
if (CurrentMemoryContext != ApplyMessageContext)
184-
MemoryContextSwitchTo(ApplyMessageContext);
181+
SetCurrentStatementStartTimestamp();
185182

186-
return false;
183+
if (!IsTransactionState())
184+
{
185+
StartTransactionCommand();
186+
maybe_reread_subscription();
187187
}
188188

189-
SetCurrentStatementStartTimestamp();
190-
StartTransactionCommand();
191-
192-
maybe_reread_subscription();
189+
PushActiveSnapshot(GetTransactionSnapshot());
193190

194191
MemoryContextSwitchTo(ApplyMessageContext);
195-
return true;
192+
}
193+
194+
/*
195+
* Finish up one step of a replication transaction.
196+
* Callers of begin_replication_step() must also call this.
197+
*
198+
* We don't close out the transaction here, but we should increment
199+
* the command counter to make the effects of this step visible.
200+
*/
201+
static void
202+
end_replication_step(void)
203+
{
204+
PopActiveSnapshot();
205+
206+
CommandCounterIncrement();
196207
}
197208

198209

@@ -210,13 +221,6 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel)
210221
ResultRelInfo *resultRelInfo;
211222
RangeTblEntry *rte;
212223

213-
/*
214-
* Input functions may need an active snapshot, as may AFTER triggers
215-
* invoked during finish_edata. For safety, ensure an active snapshot
216-
* exists throughout all our usage of the executor.
217-
*/
218-
PushActiveSnapshot(GetTransactionSnapshot());
219-
220224
edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
221225
edata->targetRel = rel;
222226

@@ -277,8 +281,6 @@ finish_edata(ApplyExecutionData *edata)
277281
ExecResetTupleTable(estate->es_tupleTable, false);
278282
FreeExecutorState(estate);
279283
pfree(edata);
280-
281-
PopActiveSnapshot();
282284
}
283285

284286
/*
@@ -673,7 +675,7 @@ apply_handle_insert(StringInfo s)
673675
TupleTableSlot *remoteslot;
674676
MemoryContext oldctx;
675677

676-
ensure_transaction();
678+
begin_replication_step();
677679

678680
relid = logicalrep_read_insert(s, &newtup);
679681
rel = logicalrep_rel_open(relid, RowExclusiveLock);
@@ -684,6 +686,7 @@ apply_handle_insert(StringInfo s)
684686
* transaction so it's safe to unlock it.
685687
*/
686688
logicalrep_rel_close(rel, RowExclusiveLock);
689+
end_replication_step();
687690
return;
688691
}
689692

@@ -712,7 +715,7 @@ apply_handle_insert(StringInfo s)
712715

713716
logicalrep_rel_close(rel, NoLock);
714717

715-
CommandCounterIncrement();
718+
end_replication_step();
716719
}
717720

718721
/* Workhorse for apply_handle_insert() */
@@ -781,7 +784,7 @@ apply_handle_update(StringInfo s)
781784
RangeTblEntry *target_rte;
782785
MemoryContext oldctx;
783786

784-
ensure_transaction();
787+
begin_replication_step();
785788

786789
relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
787790
&newtup);
@@ -793,6 +796,7 @@ apply_handle_update(StringInfo s)
793796
* transaction so it's safe to unlock it.
794797
*/
795798
logicalrep_rel_close(rel, RowExclusiveLock);
799+
end_replication_step();
796800
return;
797801
}
798802

@@ -849,7 +853,7 @@ apply_handle_update(StringInfo s)
849853

850854
logicalrep_rel_close(rel, NoLock);
851855

852-
CommandCounterIncrement();
856+
end_replication_step();
853857
}
854858

855859
/* Workhorse for apply_handle_update() */
@@ -925,7 +929,7 @@ apply_handle_delete(StringInfo s)
925929
TupleTableSlot *remoteslot;
926930
MemoryContext oldctx;
927931

928-
ensure_transaction();
932+
begin_replication_step();
929933

930934
relid = logicalrep_read_delete(s, &oldtup);
931935
rel = logicalrep_rel_open(relid, RowExclusiveLock);
@@ -936,6 +940,7 @@ apply_handle_delete(StringInfo s)
936940
* transaction so it's safe to unlock it.
937941
*/
938942
logicalrep_rel_close(rel, RowExclusiveLock);
943+
end_replication_step();
939944
return;
940945
}
941946

@@ -966,7 +971,7 @@ apply_handle_delete(StringInfo s)
966971

967972
logicalrep_rel_close(rel, NoLock);
968973

969-
CommandCounterIncrement();
974+
end_replication_step();
970975
}
971976

972977
/* Workhorse for apply_handle_delete() */
@@ -1291,7 +1296,7 @@ apply_handle_truncate(StringInfo s)
12911296
ListCell *lc;
12921297
LOCKMODE lockmode = AccessExclusiveLock;
12931298

1294-
ensure_transaction();
1299+
begin_replication_step();
12951300

12961301
remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
12971302

@@ -1379,7 +1384,7 @@ apply_handle_truncate(StringInfo s)
13791384
table_close(rel, NoLock);
13801385
}
13811386

1382-
CommandCounterIncrement();
1387+
end_replication_step();
13831388
}
13841389

13851390

0 commit comments

Comments
 (0)