Skip to content

Commit 185279d

Browse files
committed
Fix crash when logical decoding is invoked from a PL function.
The logical decoding functions do BeginInternalSubTransaction and RollbackAndReleaseCurrentSubTransaction to clean up after themselves. It turns out that AtEOSubXact_SPI has an unrecognized assumption that we always need to cancel the active SPI operation in the SPI context that surrounds the subtransaction (if there is one). That's true when the RollbackAndReleaseCurrentSubTransaction call is coming from the SPI-using function itself, but not when it's happening inside some unrelated function invoked by a SPI query. In practice the affected callers are the various PLs. To fix, record the current subtransaction ID when we begin a SPI operation, and clean up only if that ID is the subtransaction being canceled. Also, remove AtEOSubXact_SPI's assertion that it must have cleaned up the surrounding SPI context's active tuptable. That's proven wrong by the same test case. Also clarify (or, if you prefer, reinterpret) the calling conventions for _SPI_begin_call and _SPI_end_call. The memory context cleanup in the latter means that these have always had the flavor of a matched resource-management pair, but they weren't documented that way before. Per report from Ben Chobot. Back-patch to 9.4 where logical decoding came in. In principle, the SPI changes should go all the way back, since the problem dates back to commit 7ec1c5a. But given the lack of field complaints it seems few people are using internal subtransactions in this way. So I don't feel a need to take any risks in 9.2/9.3. Discussion: https://postgr.es/m/73FBA179-C68C-4540-9473-71E865408B15@silentmedia.com
1 parent 69e931f commit 185279d

File tree

4 files changed

+70
-11
lines changed

4 files changed

+70
-11
lines changed

contrib/test_decoding/expected/decoding_into_rel.out

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,31 @@ SELECT * FROM changeresult;
5959

6060
DROP TABLE changeresult;
6161
DROP TABLE somechange;
62+
-- check calling logical decoding from pl/pgsql
63+
CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
64+
BEGIN
65+
RETURN QUERY
66+
SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
67+
END$$ LANGUAGE plpgsql;
68+
SELECT * FROM slot_changes_wrapper('regression_slot');
69+
slot_changes_wrapper
70+
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
71+
BEGIN
72+
table public.changeresult: INSERT: data[text]:'BEGIN'
73+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
74+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
75+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
76+
table public.changeresult: INSERT: data[text]:'COMMIT'
77+
table public.changeresult: INSERT: data[text]:'BEGIN'
78+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
79+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
80+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
81+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
82+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
83+
table public.changeresult: INSERT: data[text]:'COMMIT'
84+
COMMIT
85+
(14 rows)
86+
6287
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
6388
data
6489
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

contrib/test_decoding/sql/decoding_into_rel.sql

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,16 @@ INSERT INTO changeresult
2727
SELECT * FROM changeresult;
2828
DROP TABLE changeresult;
2929
DROP TABLE somechange;
30+
31+
-- check calling logical decoding from pl/pgsql
32+
CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
33+
BEGIN
34+
RETURN QUERY
35+
SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
36+
END$$ LANGUAGE plpgsql;
37+
38+
SELECT * FROM slot_changes_wrapper('regression_slot');
39+
3040
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
41+
3142
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');

src/backend/executor/spi.c

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ static void _SPI_cursor_operation(Portal portal,
7272
static SPIPlanPtr _SPI_make_plan_non_temp(SPIPlanPtr plan);
7373
static SPIPlanPtr _SPI_save_plan(SPIPlanPtr plan);
7474

75-
static int _SPI_begin_call(bool execmem);
76-
static int _SPI_end_call(bool procmem);
75+
static int _SPI_begin_call(bool use_exec);
76+
static int _SPI_end_call(bool use_exec);
7777
static MemoryContext _SPI_execmem(void);
7878
static MemoryContext _SPI_procmem(void);
7979
static bool _SPI_checktuples(void);
@@ -127,6 +127,7 @@ SPI_connect(void)
127127
_SPI_current->processed = 0;
128128
_SPI_current->lastoid = InvalidOid;
129129
_SPI_current->tuptable = NULL;
130+
_SPI_current->execSubid = InvalidSubTransactionId;
130131
slist_init(&_SPI_current->tuptables);
131132
_SPI_current->procCxt = NULL; /* in case we fail to create 'em */
132133
_SPI_current->execCxt = NULL;
@@ -157,7 +158,7 @@ SPI_finish(void)
157158
{
158159
int res;
159160

160-
res = _SPI_begin_call(false); /* live in procedure memory */
161+
res = _SPI_begin_call(false); /* just check we're connected */
161162
if (res < 0)
162163
return res;
163164

@@ -282,8 +283,15 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
282283
{
283284
slist_mutable_iter siter;
284285

285-
/* free Executor memory the same as _SPI_end_call would do */
286-
MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
286+
/*
287+
* Throw away executor state if current executor operation was started
288+
* within current subxact (essentially, force a _SPI_end_call(true)).
289+
*/
290+
if (_SPI_current->execSubid >= mySubid)
291+
{
292+
_SPI_current->execSubid = InvalidSubTransactionId;
293+
MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
294+
}
287295

288296
/* throw away any tuple tables created within current subxact */
289297
slist_foreach_modify(siter, &_SPI_current->tuptables)
@@ -307,8 +315,6 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
307315
MemoryContextDelete(tuptable->tuptabcxt);
308316
}
309317
}
310-
/* in particular we should have gotten rid of any in-progress table */
311-
Assert(_SPI_current->tuptable == NULL);
312318
}
313319
}
314320

@@ -2529,38 +2535,52 @@ _SPI_procmem(void)
25292535

25302536
/*
25312537
* _SPI_begin_call: begin a SPI operation within a connected procedure
2538+
*
2539+
* use_exec is true if we intend to make use of the procedure's execCxt
2540+
* during this SPI operation. We'll switch into that context, and arrange
2541+
* for it to be cleaned up at _SPI_end_call or if an error occurs.
25322542
*/
25332543
static int
2534-
_SPI_begin_call(bool execmem)
2544+
_SPI_begin_call(bool use_exec)
25352545
{
25362546
if (_SPI_curid + 1 != _SPI_connected)
25372547
return SPI_ERROR_UNCONNECTED;
25382548
_SPI_curid++;
25392549
if (_SPI_current != &(_SPI_stack[_SPI_curid]))
25402550
elog(ERROR, "SPI stack corrupted");
25412551

2542-
if (execmem) /* switch to the Executor memory context */
2552+
if (use_exec)
2553+
{
2554+
/* remember when the Executor operation started */
2555+
_SPI_current->execSubid = GetCurrentSubTransactionId();
2556+
/* switch to the Executor memory context */
25432557
_SPI_execmem();
2558+
}
25442559

25452560
return 0;
25462561
}
25472562

25482563
/*
25492564
* _SPI_end_call: end a SPI operation within a connected procedure
25502565
*
2566+
* use_exec must be the same as in the previous _SPI_begin_call
2567+
*
25512568
* Note: this currently has no failure return cases, so callers don't check
25522569
*/
25532570
static int
2554-
_SPI_end_call(bool procmem)
2571+
_SPI_end_call(bool use_exec)
25552572
{
25562573
/*
25572574
* We're returning to procedure where _SPI_curid == _SPI_connected - 1
25582575
*/
25592576
_SPI_curid--;
25602577

2561-
if (procmem) /* switch to the procedure memory context */
2578+
if (use_exec)
25622579
{
2580+
/* switch to the procedure memory context */
25632581
_SPI_procmem();
2582+
/* mark Executor context no longer in use */
2583+
_SPI_current->execSubid = InvalidSubTransactionId;
25642584
/* and free Executor memory */
25652585
MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
25662586
}

src/include/executor/spi_priv.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ typedef struct
3131
MemoryContext execCxt; /* executor context */
3232
MemoryContext savedcxt; /* context of SPI_connect's caller */
3333
SubTransactionId connectSubid; /* ID of connecting subtransaction */
34+
35+
/* subtransaction in which current Executor call was started */
36+
SubTransactionId execSubid;
3437
} _SPI_connection;
3538

3639
/*

0 commit comments

Comments
 (0)