Skip to content

Commit 13d2ed9

Browse files
committed
Fix crash when logical decoding is invoked from a PL function.
The logical decoding functions do BeginInternalSubTransaction and RollbackAndReleaseCurrentSubTransaction to clean up after themselves. It turns out that AtEOSubXact_SPI has an unrecognized assumption that we always need to cancel the active SPI operation in the SPI context that surrounds the subtransaction (if there is one). That's true when the RollbackAndReleaseCurrentSubTransaction call is coming from the SPI-using function itself, but not when it's happening inside some unrelated function invoked by a SPI query. In practice the affected callers are the various PLs. To fix, record the current subtransaction ID when we begin a SPI operation, and clean up only if that ID is the subtransaction being canceled. Also, remove AtEOSubXact_SPI's assertion that it must have cleaned up the surrounding SPI context's active tuptable. That's proven wrong by the same test case. Also clarify (or, if you prefer, reinterpret) the calling conventions for _SPI_begin_call and _SPI_end_call. The memory context cleanup in the latter means that these have always had the flavor of a matched resource-management pair, but they weren't documented that way before. Per report from Ben Chobot. Back-patch to 9.4 where logical decoding came in. In principle, the SPI changes should go all the way back, since the problem dates back to commit 7ec1c5a. But given the lack of field complaints it seems few people are using internal subtransactions in this way. So I don't feel a need to take any risks in 9.2/9.3. Discussion: https://postgr.es/m/73FBA179-C68C-4540-9473-71E865408B15@silentmedia.com
1 parent c7c93dd commit 13d2ed9

File tree

4 files changed

+70
-11
lines changed

4 files changed

+70
-11
lines changed

contrib/test_decoding/expected/decoding_into_rel.out

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,31 @@ SELECT * FROM changeresult;
5959

6060
DROP TABLE changeresult;
6161
DROP TABLE somechange;
62+
-- check calling logical decoding from pl/pgsql
63+
CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
64+
BEGIN
65+
RETURN QUERY
66+
SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
67+
END$$ LANGUAGE plpgsql;
68+
SELECT * FROM slot_changes_wrapper('regression_slot');
69+
slot_changes_wrapper
70+
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
71+
BEGIN
72+
table public.changeresult: INSERT: data[text]:'BEGIN'
73+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
74+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
75+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
76+
table public.changeresult: INSERT: data[text]:'COMMIT'
77+
table public.changeresult: INSERT: data[text]:'BEGIN'
78+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
79+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
80+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
81+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
82+
table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
83+
table public.changeresult: INSERT: data[text]:'COMMIT'
84+
COMMIT
85+
(14 rows)
86+
6287
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
6388
data
6489
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

contrib/test_decoding/sql/decoding_into_rel.sql

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,16 @@ INSERT INTO changeresult
2727
SELECT * FROM changeresult;
2828
DROP TABLE changeresult;
2929
DROP TABLE somechange;
30+
31+
-- check calling logical decoding from pl/pgsql
32+
CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
33+
BEGIN
34+
RETURN QUERY
35+
SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
36+
END$$ LANGUAGE plpgsql;
37+
38+
SELECT * FROM slot_changes_wrapper('regression_slot');
39+
3040
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
41+
3142
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');

src/backend/executor/spi.c

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ static void _SPI_cursor_operation(Portal portal,
7272
static SPIPlanPtr _SPI_make_plan_non_temp(SPIPlanPtr plan);
7373
static SPIPlanPtr _SPI_save_plan(SPIPlanPtr plan);
7474

75-
static int _SPI_begin_call(bool execmem);
76-
static int _SPI_end_call(bool procmem);
75+
static int _SPI_begin_call(bool use_exec);
76+
static int _SPI_end_call(bool use_exec);
7777
static MemoryContext _SPI_execmem(void);
7878
static MemoryContext _SPI_procmem(void);
7979
static bool _SPI_checktuples(void);
@@ -127,6 +127,7 @@ SPI_connect(void)
127127
_SPI_current->processed = 0;
128128
_SPI_current->lastoid = InvalidOid;
129129
_SPI_current->tuptable = NULL;
130+
_SPI_current->execSubid = InvalidSubTransactionId;
130131
slist_init(&_SPI_current->tuptables);
131132
_SPI_current->procCxt = NULL; /* in case we fail to create 'em */
132133
_SPI_current->execCxt = NULL;
@@ -161,7 +162,7 @@ SPI_finish(void)
161162
{
162163
int res;
163164

164-
res = _SPI_begin_call(false); /* live in procedure memory */
165+
res = _SPI_begin_call(false); /* just check we're connected */
165166
if (res < 0)
166167
return res;
167168

@@ -286,8 +287,15 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
286287
{
287288
slist_mutable_iter siter;
288289

289-
/* free Executor memory the same as _SPI_end_call would do */
290-
MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
290+
/*
291+
* Throw away executor state if current executor operation was started
292+
* within current subxact (essentially, force a _SPI_end_call(true)).
293+
*/
294+
if (_SPI_current->execSubid >= mySubid)
295+
{
296+
_SPI_current->execSubid = InvalidSubTransactionId;
297+
MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
298+
}
291299

292300
/* throw away any tuple tables created within current subxact */
293301
slist_foreach_modify(siter, &_SPI_current->tuptables)
@@ -311,8 +319,6 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
311319
MemoryContextDelete(tuptable->tuptabcxt);
312320
}
313321
}
314-
/* in particular we should have gotten rid of any in-progress table */
315-
Assert(_SPI_current->tuptable == NULL);
316322
}
317323
}
318324

@@ -2532,38 +2538,52 @@ _SPI_procmem(void)
25322538

25332539
/*
25342540
* _SPI_begin_call: begin a SPI operation within a connected procedure
2541+
*
2542+
* use_exec is true if we intend to make use of the procedure's execCxt
2543+
* during this SPI operation. We'll switch into that context, and arrange
2544+
* for it to be cleaned up at _SPI_end_call or if an error occurs.
25352545
*/
25362546
static int
2537-
_SPI_begin_call(bool execmem)
2547+
_SPI_begin_call(bool use_exec)
25382548
{
25392549
if (_SPI_curid + 1 != _SPI_connected)
25402550
return SPI_ERROR_UNCONNECTED;
25412551
_SPI_curid++;
25422552
if (_SPI_current != &(_SPI_stack[_SPI_curid]))
25432553
elog(ERROR, "SPI stack corrupted");
25442554

2545-
if (execmem) /* switch to the Executor memory context */
2555+
if (use_exec)
2556+
{
2557+
/* remember when the Executor operation started */
2558+
_SPI_current->execSubid = GetCurrentSubTransactionId();
2559+
/* switch to the Executor memory context */
25462560
_SPI_execmem();
2561+
}
25472562

25482563
return 0;
25492564
}
25502565

25512566
/*
25522567
* _SPI_end_call: end a SPI operation within a connected procedure
25532568
*
2569+
* use_exec must be the same as in the previous _SPI_begin_call
2570+
*
25542571
* Note: this currently has no failure return cases, so callers don't check
25552572
*/
25562573
static int
2557-
_SPI_end_call(bool procmem)
2574+
_SPI_end_call(bool use_exec)
25582575
{
25592576
/*
25602577
* We're returning to procedure where _SPI_curid == _SPI_connected - 1
25612578
*/
25622579
_SPI_curid--;
25632580

2564-
if (procmem) /* switch to the procedure memory context */
2581+
if (use_exec)
25652582
{
2583+
/* switch to the procedure memory context */
25662584
_SPI_procmem();
2585+
/* mark Executor context no longer in use */
2586+
_SPI_current->execSubid = InvalidSubTransactionId;
25672587
/* and free Executor memory */
25682588
MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
25692589
}

src/include/executor/spi_priv.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ typedef struct
3131
MemoryContext execCxt; /* executor context */
3232
MemoryContext savedcxt; /* context of SPI_connect's caller */
3333
SubTransactionId connectSubid; /* ID of connecting subtransaction */
34+
35+
/* subtransaction in which current Executor call was started */
36+
SubTransactionId execSubid;
3437
} _SPI_connection;
3538

3639
/*

0 commit comments

Comments
 (0)