Skip to content

Commit 1ec7fca

Browse files
author
Etsuro Fujita
committed
postgres_fdw: Fix handling of pending asynchronous requests.
A pending asynchronous request is handled by process_pending_request(), which previously not only processed an in-progress remote query but performed ExecForeignScan() to produce a tuple to return to the local server asynchronously from the result of the remote query. But that led to a server crash when executing a query or led to an "InstrStartNode called twice in a row" or "InstrEndLoop called on running node" failure when doing EXPLAIN ANALYZE of it, in cases where the plan tree for it contained multiple async-capable nodes accessing the same initplan/subplan that contained multiple async-capable nodes scanning the same foreign tables as for the parent async-capable nodes, as reported by Andrey Lepikhov. The reason is that the second step in process_pending_request() invoked when executing the initplan/subplan for one of the parent async-capable nodes caused recursive execution of the initplan/subplan for another of the parent async-capable nodes. To fix, split process_pending_request() into the two steps and postpone the second step until ForeignAsyncConfigureWait() is called for each of the pending asynchronous requests. Also, in ExecAppendAsyncEventWait() we assumed that FDWs would register at least one wait event in a WaitEventSet created there when they were called from ForeignAsyncConfigureWait() in that function, but allow FDWs to register zero wait events in the WaitEventSet; modify ExecAppendAsyncEventWait() to just return in that case. Oversight in commit 27e1f14. Back-patch to v14 where that commit went in. Andrey Lepikhov and Etsuro Fujita Discussion: https://postgr.es/m/fe5eaa19-1704-e4a4-76ee-3b9d37ade399@postgrespro.ru
1 parent 16bd4be commit 1ec7fca

File tree

4 files changed

+126
-18
lines changed

4 files changed

+126
-18
lines changed

contrib/postgres_fdw/expected/postgres_fdw.out

+53-2
Original file line numberDiff line numberDiff line change
@@ -10300,6 +10300,59 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
1030010300
2505 | 505 | 0505 | 2505 | 505 | 0505
1030110301
(1 row)
1030210302

10303+
CREATE TABLE local_tbl (a int, b int, c text);
10304+
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
10305+
ANALYZE local_tbl;
10306+
EXPLAIN (VERBOSE, COSTS OFF)
10307+
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
10308+
QUERY PLAN
10309+
----------------------------------------------------------------------------------------
10310+
Nested Loop Left Join
10311+
Output: t1.a, t1.b, t1.c, async_pt.a, async_pt.b, async_pt.c, ($0)
10312+
Join Filter: (t1.a = async_pt.a)
10313+
InitPlan 1 (returns $0)
10314+
-> Aggregate
10315+
Output: count(*)
10316+
-> Append
10317+
-> Async Foreign Scan on public.async_p1 async_pt_4
10318+
Remote SQL: SELECT NULL FROM public.base_tbl1 WHERE ((a < 3000))
10319+
-> Async Foreign Scan on public.async_p2 async_pt_5
10320+
Remote SQL: SELECT NULL FROM public.base_tbl2 WHERE ((a < 3000))
10321+
-> Seq Scan on public.local_tbl t1
10322+
Output: t1.a, t1.b, t1.c
10323+
-> Append
10324+
-> Async Foreign Scan on public.async_p1 async_pt_1
10325+
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c, $0
10326+
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000))
10327+
-> Async Foreign Scan on public.async_p2 async_pt_2
10328+
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c, $0
10329+
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000))
10330+
(20 rows)
10331+
10332+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
10333+
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
10334+
QUERY PLAN
10335+
-----------------------------------------------------------------------------------------
10336+
Nested Loop Left Join (actual rows=1 loops=1)
10337+
Join Filter: (t1.a = async_pt.a)
10338+
Rows Removed by Join Filter: 399
10339+
InitPlan 1 (returns $0)
10340+
-> Aggregate (actual rows=1 loops=1)
10341+
-> Append (actual rows=400 loops=1)
10342+
-> Async Foreign Scan on async_p1 async_pt_4 (actual rows=200 loops=1)
10343+
-> Async Foreign Scan on async_p2 async_pt_5 (actual rows=200 loops=1)
10344+
-> Seq Scan on local_tbl t1 (actual rows=1 loops=1)
10345+
-> Append (actual rows=400 loops=1)
10346+
-> Async Foreign Scan on async_p1 async_pt_1 (actual rows=200 loops=1)
10347+
-> Async Foreign Scan on async_p2 async_pt_2 (actual rows=200 loops=1)
10348+
(12 rows)
10349+
10350+
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
10351+
a | b | c | a | b | c | count
10352+
------+-----+-----+------+-----+------+-------
10353+
1505 | 505 | foo | 1505 | 505 | 0505 | 400
10354+
(1 row)
10355+
1030310356
EXPLAIN (VERBOSE, COSTS OFF)
1030410357
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
1030510358
QUERY PLAN
@@ -10342,8 +10395,6 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
1034210395
(1 row)
1034310396

1034410397
-- Check with foreign modify
10345-
CREATE TABLE local_tbl (a int, b int, c text);
10346-
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
1034710398
CREATE TABLE base_tbl3 (a int, b int, c text);
1034810399
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
1034910400
SERVER loopback OPTIONS (table_name 'base_tbl3');

contrib/postgres_fdw/postgres_fdw.c

+52-13
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ static void analyze_row_processor(PGresult *res, int row,
503503
PgFdwAnalyzeState *astate);
504504
static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
505505
static void fetch_more_data_begin(AsyncRequest *areq);
506+
static void complete_pending_request(AsyncRequest *areq);
506507
static HeapTuple make_tuple_from_result_row(PGresult *res,
507508
int row,
508509
Relation rel,
@@ -6826,6 +6827,22 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
68266827
/* This should not be called unless callback_pending */
68276828
Assert(areq->callback_pending);
68286829

6830+
/*
6831+
* If process_pending_request() has been invoked on the given request
6832+
* before we get here, we might have some tuples already; in which case
6833+
* complete the request
6834+
*/
6835+
if (fsstate->next_tuple < fsstate->num_tuples)
6836+
{
6837+
complete_pending_request(areq);
6838+
if (areq->request_complete)
6839+
return;
6840+
Assert(areq->callback_pending);
6841+
}
6842+
6843+
/* We must have run out of tuples */
6844+
Assert(fsstate->next_tuple >= fsstate->num_tuples);
6845+
68296846
/* The core code would have registered postmaster death event */
68306847
Assert(GetNumRegisteredWaitEvents(set) >= 1);
68316848

@@ -6838,12 +6855,15 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
68386855
* This is the case when the in-process request was made by another
68396856
* Append. Note that it might be useless to process the request,
68406857
* because the query might not need tuples from that Append anymore.
6841-
* Skip the given request if there are any configured events other
6842-
* than the postmaster death event; otherwise process the request,
6843-
* then begin a fetch to configure the event below, because otherwise
6844-
* we might end up with no configured events other than the postmaster
6845-
* death event.
6858+
* If there are any child subplans of the same parent that are ready
6859+
* for new requests, skip the given request. Likewise, if there are
6860+
* any configured events other than the postmaster death event, skip
6861+
* it. Otherwise, process the in-process request, then begin a fetch
6862+
* to configure the event below, because we might otherwise end up
6863+
* with no configured events other than the postmaster death event.
68466864
*/
6865+
if (!bms_is_empty(requestor->as_needrequest))
6866+
return;
68476867
if (GetNumRegisteredWaitEvents(set) > 1)
68486868
return;
68496869
process_pending_request(pendingAreq);
@@ -6995,23 +7015,44 @@ process_pending_request(AsyncRequest *areq)
69957015
{
69967016
ForeignScanState *node = (ForeignScanState *) areq->requestee;
69977017
PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state;
6998-
EState *estate = node->ss.ps.state;
6999-
MemoryContext oldcontext;
7018+
7019+
/* The request would have been pending for a callback */
7020+
Assert(areq->callback_pending);
70007021

70017022
/* The request should be currently in-process */
70027023
Assert(fsstate->conn_state->pendingAreq == areq);
70037024

7004-
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
7025+
fetch_more_data(node);
70057026

7027+
/*
7028+
* If we didn't get any tuples, must be end of data; complete the request
7029+
* now. Otherwise, we postpone completing the request until we are called
7030+
* from postgresForeignAsyncConfigureWait().
7031+
*/
7032+
if (fsstate->next_tuple >= fsstate->num_tuples)
7033+
{
7034+
/* Unlike AsyncNotify, we unset callback_pending ourselves */
7035+
areq->callback_pending = false;
7036+
/* Mark the request as complete */
7037+
ExecAsyncRequestDone(areq, NULL);
7038+
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7039+
ExecAsyncResponse(areq);
7040+
}
7041+
}
7042+
7043+
/*
7044+
* Complete a pending asynchronous request.
7045+
*/
7046+
static void
7047+
complete_pending_request(AsyncRequest *areq)
7048+
{
70067049
/* The request would have been pending for a callback */
70077050
Assert(areq->callback_pending);
70087051

70097052
/* Unlike AsyncNotify, we unset callback_pending ourselves */
70107053
areq->callback_pending = false;
70117054

7012-
fetch_more_data(node);
7013-
7014-
/* We need to send a new query afterwards; don't fetch */
7055+
/* We begin a fetch afterwards if necessary; don't fetch */
70157056
produce_tuple_asynchronously(areq, false);
70167057

70177058
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
@@ -7021,8 +7062,6 @@ process_pending_request(AsyncRequest *areq)
70217062
if (areq->requestee->instrument)
70227063
InstrUpdateTupleCount(areq->requestee->instrument,
70237064
TupIsNull(areq->result) ? 0.0 : 1.0);
7024-
7025-
MemoryContextSwitchTo(oldcontext);
70267065
}
70277066

70287067
/*

contrib/postgres_fdw/sql/postgres_fdw.sql

+10-3
Original file line numberDiff line numberDiff line change
@@ -3274,16 +3274,23 @@ EXPLAIN (VERBOSE, COSTS OFF)
32743274
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
32753275
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
32763276

3277+
CREATE TABLE local_tbl (a int, b int, c text);
3278+
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
3279+
ANALYZE local_tbl;
3280+
3281+
EXPLAIN (VERBOSE, COSTS OFF)
3282+
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
3283+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
3284+
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
3285+
SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
3286+
32773287
EXPLAIN (VERBOSE, COSTS OFF)
32783288
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
32793289
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
32803290
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
32813291
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
32823292

32833293
-- Check with foreign modify
3284-
CREATE TABLE local_tbl (a int, b int, c text);
3285-
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
3286-
32873294
CREATE TABLE base_tbl3 (a int, b int, c text);
32883295
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
32893296
SERVER loopback OPTIONS (table_name 'base_tbl3');

src/backend/executor/nodeAppend.c

+11
Original file line numberDiff line numberDiff line change
@@ -1043,6 +1043,17 @@ ExecAppendAsyncEventWait(AppendState *node)
10431043
ExecAsyncConfigureWait(areq);
10441044
}
10451045

1046+
/*
1047+
* No need for further processing if there are no configured events other
1048+
* than the postmaster death event.
1049+
*/
1050+
if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1051+
{
1052+
FreeWaitEventSet(node->as_eventset);
1053+
node->as_eventset = NULL;
1054+
return;
1055+
}
1056+
10461057
/* We wait on at most EVENT_BUFFER_SIZE events. */
10471058
if (nevents > EVENT_BUFFER_SIZE)
10481059
nevents = EVENT_BUFFER_SIZE;

0 commit comments

Comments
 (0)