Skip to content

Commit a363bc6

Browse files
author
Etsuro Fujita
committed
Fix EXPLAIN ANALYZE for async-capable nodes.
EXPLAIN ANALYZE for an async-capable ForeignScan node associated with postgres_fdw is done just by using instrumentation for ExecProcNode() called from the node's callbacks, causing the following problems: 1) If the remote table to scan is empty, the node is incorrectly considered as "never executed" by the command even if the node is executed, as ExecProcNode() isn't called from the node's callbacks at all in that case. 2) The command fails to collect timings for things other than ExecProcNode() done in the node, such as creating a cursor for the node's remote query. To fix these problems, add instrumentation for async-capable nodes, and modify postgres_fdw accordingly. My oversight in commit 27e1f14. While at it, update a comment for the AsyncRequest struct in execnodes.h and the documentation for the ForeignAsyncRequest API in fdwhandler.sgml to match the code in ExecAsyncAppendResponse() in nodeAppend.c, and fix typos in comments in nodeAppend.c. Per report from Andrey Lepikhov, though I didn't use his patch. Reviewed-by: Andrey Lepikhov Discussion: https://postgr.es/m/2eb662bb-105d-fc20-7412-2f027cc3ca72%40postgrespro.ru
1 parent e135743 commit a363bc6

File tree

14 files changed

+134
-26
lines changed

14 files changed

+134
-26
lines changed

contrib/auto_explain/auto_explain.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
314314
MemoryContext oldcxt;
315315

316316
oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
317-
queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL);
317+
queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false);
318318
MemoryContextSwitchTo(oldcxt);
319319
}
320320
}

contrib/pg_stat_statements/pg_stat_statements.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,7 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
974974
MemoryContext oldcxt;
975975

976976
oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
977-
queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL);
977+
queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false);
978978
MemoryContextSwitchTo(oldcxt);
979979
}
980980
}

contrib/postgres_fdw/expected/postgres_fdw.out

+34-5
Original file line numberDiff line numberDiff line change
@@ -10051,6 +10051,21 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
1005110051
Filter: (t1_3.b === 505)
1005210052
(14 rows)
1005310053

10054+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
10055+
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
10056+
QUERY PLAN
10057+
-------------------------------------------------------------------------
10058+
Limit (actual rows=1 loops=1)
10059+
-> Append (actual rows=1 loops=1)
10060+
-> Async Foreign Scan on async_p1 t1_1 (actual rows=0 loops=1)
10061+
Filter: (b === 505)
10062+
-> Async Foreign Scan on async_p2 t1_2 (actual rows=0 loops=1)
10063+
Filter: (b === 505)
10064+
-> Seq Scan on async_p3 t1_3 (actual rows=1 loops=1)
10065+
Filter: (b === 505)
10066+
Rows Removed by Filter: 101
10067+
(9 rows)
10068+
1005410069
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
1005510070
a | b | c
1005610071
------+-----+------
@@ -10132,18 +10147,32 @@ SELECT * FROM join_tbl ORDER BY a1;
1013210147
(3 rows)
1013310148

1013410149
DELETE FROM join_tbl;
10150+
DROP TABLE local_tbl;
10151+
DROP FOREIGN TABLE remote_tbl;
10152+
DROP FOREIGN TABLE insert_tbl;
10153+
DROP TABLE base_tbl3;
10154+
DROP TABLE base_tbl4;
1013510155
RESET enable_mergejoin;
1013610156
RESET enable_hashjoin;
10157+
-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
10158+
DELETE FROM async_p1;
10159+
DELETE FROM async_p2;
10160+
DELETE FROM async_p3;
10161+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
10162+
SELECT * FROM async_pt;
10163+
QUERY PLAN
10164+
-------------------------------------------------------------------------
10165+
Append (actual rows=0 loops=1)
10166+
-> Async Foreign Scan on async_p1 async_pt_1 (actual rows=0 loops=1)
10167+
-> Async Foreign Scan on async_p2 async_pt_2 (actual rows=0 loops=1)
10168+
-> Seq Scan on async_p3 async_pt_3 (actual rows=0 loops=1)
10169+
(4 rows)
10170+
1013710171
-- Clean up
1013810172
DROP TABLE async_pt;
1013910173
DROP TABLE base_tbl1;
1014010174
DROP TABLE base_tbl2;
1014110175
DROP TABLE result_tbl;
10142-
DROP TABLE local_tbl;
10143-
DROP FOREIGN TABLE remote_tbl;
10144-
DROP FOREIGN TABLE insert_tbl;
10145-
DROP TABLE base_tbl3;
10146-
DROP TABLE base_tbl4;
1014710176
DROP TABLE join_tbl;
1014810177
ALTER SERVER loopback OPTIONS (DROP async_capable);
1014910178
ALTER SERVER loopback2 OPTIONS (DROP async_capable);

contrib/postgres_fdw/postgres_fdw.c

+7-2
Original file line numberDiff line numberDiff line change
@@ -1542,7 +1542,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
15421542
&fsstate->param_values);
15431543

15441544
/* Set the async-capable flag */
1545-
fsstate->async_capable = node->ss.ps.plan->async_capable;
1545+
fsstate->async_capable = node->ss.ps.async_capable;
15461546
}
15471547

15481548
/*
@@ -6867,7 +6867,7 @@ produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
68676867
}
68686868

68696869
/* Get a tuple from the ForeignScan node */
6870-
result = ExecProcNode((PlanState *) node);
6870+
result = areq->requestee->ExecProcNodeReal(areq->requestee);
68716871
if (!TupIsNull(result))
68726872
{
68736873
/* Mark the request as complete */
@@ -6956,6 +6956,11 @@ process_pending_request(AsyncRequest *areq)
69566956
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
69576957
ExecAsyncResponse(areq);
69586958

6959+
/* Also, we do instrumentation ourselves, if required */
6960+
if (areq->requestee->instrument)
6961+
InstrUpdateTupleCount(areq->requestee->instrument,
6962+
TupIsNull(areq->result) ? 0.0 : 1.0);
6963+
69596964
MemoryContextSwitchTo(oldcontext);
69606965
}
69616966

contrib/postgres_fdw/sql/postgres_fdw.sql

+16-5
Original file line numberDiff line numberDiff line change
@@ -3195,6 +3195,8 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
31953195

31963196
EXPLAIN (VERBOSE, COSTS OFF)
31973197
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
3198+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
3199+
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
31983200
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
31993201

32003202
-- Check with foreign modify
@@ -3226,19 +3228,28 @@ INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND
32263228
SELECT * FROM join_tbl ORDER BY a1;
32273229
DELETE FROM join_tbl;
32283230

3231+
DROP TABLE local_tbl;
3232+
DROP FOREIGN TABLE remote_tbl;
3233+
DROP FOREIGN TABLE insert_tbl;
3234+
DROP TABLE base_tbl3;
3235+
DROP TABLE base_tbl4;
3236+
32293237
RESET enable_mergejoin;
32303238
RESET enable_hashjoin;
32313239

3240+
-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
3241+
DELETE FROM async_p1;
3242+
DELETE FROM async_p2;
3243+
DELETE FROM async_p3;
3244+
3245+
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
3246+
SELECT * FROM async_pt;
3247+
32323248
-- Clean up
32333249
DROP TABLE async_pt;
32343250
DROP TABLE base_tbl1;
32353251
DROP TABLE base_tbl2;
32363252
DROP TABLE result_tbl;
3237-
DROP TABLE local_tbl;
3238-
DROP FOREIGN TABLE remote_tbl;
3239-
DROP FOREIGN TABLE insert_tbl;
3240-
DROP TABLE base_tbl3;
3241-
DROP TABLE base_tbl4;
32423253
DROP TABLE join_tbl;
32433254

32443255
ALTER SERVER loopback OPTIONS (DROP async_capable);

doc/src/sgml/fdwhandler.sgml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1597,7 +1597,7 @@ ForeignAsyncRequest(AsyncRequest *areq);
15971597
<literal>areq-&gt;callback_pending</literal> to <literal>true</literal>
15981598
for the <structname>ForeignScan</structname> node to get a callback from
15991599
the callback functions described below. If no more tuples are available,
1600-
set the slot to NULL, and the
1600+
set the slot to NULL or an empty slot, and the
16011601
<literal>areq-&gt;request_complete</literal> flag to
16021602
<literal>true</literal>. It's recommended to use
16031603
<function>ExecAsyncRequestDone</function> or

src/backend/executor/execAsync.c

+30
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "postgres.h"
1616

1717
#include "executor/execAsync.h"
18+
#include "executor/executor.h"
1819
#include "executor/nodeAppend.h"
1920
#include "executor/nodeForeignscan.h"
2021

@@ -24,6 +25,13 @@
2425
void
2526
ExecAsyncRequest(AsyncRequest *areq)
2627
{
28+
if (areq->requestee->chgParam != NULL) /* something changed? */
29+
ExecReScan(areq->requestee); /* let ReScan handle this */
30+
31+
/* must provide our own instrumentation support */
32+
if (areq->requestee->instrument)
33+
InstrStartNode(areq->requestee->instrument);
34+
2735
switch (nodeTag(areq->requestee))
2836
{
2937
case T_ForeignScanState:
@@ -36,6 +44,11 @@ ExecAsyncRequest(AsyncRequest *areq)
3644
}
3745

3846
ExecAsyncResponse(areq);
47+
48+
/* must provide our own instrumentation support */
49+
if (areq->requestee->instrument)
50+
InstrStopNode(areq->requestee->instrument,
51+
TupIsNull(areq->result) ? 0.0 : 1.0);
3952
}
4053

4154
/*
@@ -48,6 +61,10 @@ ExecAsyncRequest(AsyncRequest *areq)
4861
void
4962
ExecAsyncConfigureWait(AsyncRequest *areq)
5063
{
64+
/* must provide our own instrumentation support */
65+
if (areq->requestee->instrument)
66+
InstrStartNode(areq->requestee->instrument);
67+
5168
switch (nodeTag(areq->requestee))
5269
{
5370
case T_ForeignScanState:
@@ -58,6 +75,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
5875
elog(ERROR, "unrecognized node type: %d",
5976
(int) nodeTag(areq->requestee));
6077
}
78+
79+
/* must provide our own instrumentation support */
80+
if (areq->requestee->instrument)
81+
InstrStopNode(areq->requestee->instrument, 0.0);
6182
}
6283

6384
/*
@@ -66,6 +87,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
6687
void
6788
ExecAsyncNotify(AsyncRequest *areq)
6889
{
90+
/* must provide our own instrumentation support */
91+
if (areq->requestee->instrument)
92+
InstrStartNode(areq->requestee->instrument);
93+
6994
switch (nodeTag(areq->requestee))
7095
{
7196
case T_ForeignScanState:
@@ -78,6 +103,11 @@ ExecAsyncNotify(AsyncRequest *areq)
78103
}
79104

80105
ExecAsyncResponse(areq);
106+
107+
/* must provide our own instrumentation support */
108+
if (areq->requestee->instrument)
109+
InstrStopNode(areq->requestee->instrument,
110+
TupIsNull(areq->result) ? 0.0 : 1.0);
81111
}
82112

83113
/*

src/backend/executor/execMain.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -1214,7 +1214,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
12141214
resultRelInfo->ri_TrigWhenExprs = (ExprState **)
12151215
palloc0(n * sizeof(ExprState *));
12161216
if (instrument_options)
1217-
resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options);
1217+
resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options, false);
12181218
}
12191219
else
12201220
{

src/backend/executor/execProcnode.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
407407

408408
/* Set up instrumentation for this node if requested */
409409
if (estate->es_instrument)
410-
result->instrument = InstrAlloc(1, estate->es_instrument);
410+
result->instrument = InstrAlloc(1, estate->es_instrument,
411+
result->async_capable);
411412

412413
return result;
413414
}

src/backend/executor/instrument.c

+20-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ static void WalUsageAdd(WalUsage *dst, WalUsage *add);
2828

2929
/* Allocate new instrumentation structure(s) */
3030
Instrumentation *
31-
InstrAlloc(int n, int instrument_options)
31+
InstrAlloc(int n, int instrument_options, bool async_mode)
3232
{
3333
Instrumentation *instr;
3434

@@ -46,6 +46,7 @@ InstrAlloc(int n, int instrument_options)
4646
instr[i].need_bufusage = need_buffers;
4747
instr[i].need_walusage = need_wal;
4848
instr[i].need_timer = need_timer;
49+
instr[i].async_mode = async_mode;
4950
}
5051
}
5152

@@ -82,6 +83,7 @@ InstrStartNode(Instrumentation *instr)
8283
void
8384
InstrStopNode(Instrumentation *instr, double nTuples)
8485
{
86+
double save_tuplecount = instr->tuplecount;
8587
instr_time endtime;
8688

8789
/* count the returned tuples */
@@ -114,6 +116,23 @@ InstrStopNode(Instrumentation *instr, double nTuples)
114116
instr->running = true;
115117
instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
116118
}
119+
else
120+
{
121+
/*
122+
* In async mode, if the plan node hadn't emitted any tuples before,
123+
* this might be the first tuple
124+
*/
125+
if (instr->async_mode && save_tuplecount < 1.0)
126+
instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
127+
}
128+
}
129+
130+
/* Update tuple count */
131+
void
132+
InstrUpdateTupleCount(Instrumentation *instr, double nTuples)
133+
{
134+
/* count the returned tuples */
135+
instr->tuplecount += nTuples;
117136
}
118137

119138
/* Finish a run cycle for a plan node */

src/backend/executor/nodeAppend.c

+6-6
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,9 @@ ExecAppend(PlanState *pstate)
362362
}
363363

364364
/*
365-
* wait or poll async events if any. We do this before checking for
366-
* the end of iteration, because it might drain the remaining async
367-
* subplans.
365+
* wait or poll for async events if any. We do this before checking
366+
* for the end of iteration, because it might drain the remaining
367+
* async subplans.
368368
*/
369369
if (node->as_nasyncremain > 0)
370370
ExecAppendAsyncEventWait(node);
@@ -440,7 +440,7 @@ ExecReScanAppend(AppendState *node)
440440

441441
/*
442442
* If chgParam of subnode is not null then plan will be re-scanned by
443-
* first ExecProcNode.
443+
* first ExecProcNode or by first ExecAsyncRequest.
444444
*/
445445
if (subnode->chgParam == NULL)
446446
ExecReScan(subnode);
@@ -911,7 +911,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
911911
{
912912
CHECK_FOR_INTERRUPTS();
913913

914-
/* Wait or poll async events. */
914+
/* Wait or poll for async events. */
915915
ExecAppendAsyncEventWait(node);
916916

917917
/* Request a tuple asynchronously. */
@@ -1084,7 +1084,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
10841084
/* Nothing to do if the request is pending. */
10851085
if (!areq->request_complete)
10861086
{
1087-
/* The request would have been pending for a callback */
1087+
/* The request would have been pending for a callback. */
10881088
Assert(areq->callback_pending);
10891089
return;
10901090
}

src/backend/executor/nodeForeignscan.c

+7
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,13 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
209209
scanstate->fdw_recheck_quals =
210210
ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
211211

212+
/*
213+
* Determine whether to scan the foreign relation asynchronously or not;
214+
* this has to be kept in sync with the code in ExecInitAppend().
215+
*/
216+
scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
217+
estate->es_epq_active == NULL);
218+
212219
/*
213220
* Initialize FDW-related state.
214221
*/

src/include/executor/instrument.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ typedef struct Instrumentation
5555
bool need_timer; /* true if we need timer data */
5656
bool need_bufusage; /* true if we need buffer usage data */
5757
bool need_walusage; /* true if we need WAL usage data */
58+
bool async_mode; /* true if node is in async mode */
5859
/* Info about current plan cycle: */
5960
bool running; /* true if we've completed first tuple */
6061
instr_time starttime; /* start time of current iteration of node */
@@ -84,10 +85,12 @@ typedef struct WorkerInstrumentation
8485
extern PGDLLIMPORT BufferUsage pgBufferUsage;
8586
extern PGDLLIMPORT WalUsage pgWalUsage;
8687

87-
extern Instrumentation *InstrAlloc(int n, int instrument_options);
88+
extern Instrumentation *InstrAlloc(int n, int instrument_options,
89+
bool async_mode);
8890
extern void InstrInit(Instrumentation *instr, int instrument_options);
8991
extern void InstrStartNode(Instrumentation *instr);
9092
extern void InstrStopNode(Instrumentation *instr, double nTuples);
93+
extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples);
9194
extern void InstrEndLoop(Instrumentation *instr);
9295
extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
9396
extern void InstrStartParallelQuery(void);

0 commit comments

Comments
 (0)