Skip to content

Commit 691b8d5

Browse files
committed
Allow for parallel execution whenever ExecutorRun() is done only once.
Previously, it was unsafe to execute a plan in parallel if ExecutorRun() might be called with a non-zero row count. However, it's quite easy to fix things up so that we can support that case, provided that it is known that we will never call ExecutorRun() a second time for the same QueryDesc. Add infrastructure to signal this, and cross-checks to make sure that a caller who claims this is true doesn't later reneg. While that pattern never happens with queries received directly from a client -- there's no way to know whether multiple Execute messages will be sent unless the first one requests all the rows -- it's pretty common for queries originating from procedural languages, which often limit the result to a single tuple or to a user-specified number of tuples. This commit doesn't actually enable parallelism in any additional cases, because currently none of the places that would be able to benefit from this infrastructure pass CURSOR_OPT_PARALLEL_OK in the first place, but it makes it much more palatable to pass CURSOR_OPT_PARALLEL_OK in places where we currently don't, because it eliminates some cases where we'd end up having to run the parallel plan serially. Patch by me, based on some ideas from Rafia Sabih and corrected by Rafia Sabih based on feedback from Dilip Kumar and myself. Discussion: http://postgr.es/m/CA+TgmobXEhvHbJtWDuPZM9bVSLiTj-kShxQJ2uM5GPDze9fRYA@mail.gmail.com
1 parent 218f515 commit 691b8d5

File tree

19 files changed

+73
-38
lines changed

19 files changed

+73
-38
lines changed

contrib/auto_explain/auto_explain.c

+5-4
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ void _PG_fini(void);
6161
static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags);
6262
static void explain_ExecutorRun(QueryDesc *queryDesc,
6363
ScanDirection direction,
64-
uint64 count);
64+
uint64 count, bool execute_once);
6565
static void explain_ExecutorFinish(QueryDesc *queryDesc);
6666
static void explain_ExecutorEnd(QueryDesc *queryDesc);
6767

@@ -257,15 +257,16 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
257257
* ExecutorRun hook: all we need do is track nesting depth
258258
*/
259259
static void
260-
explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
260+
explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
261+
uint64 count, bool execute_once)
261262
{
262263
nesting_level++;
263264
PG_TRY();
264265
{
265266
if (prev_ExecutorRun)
266-
prev_ExecutorRun(queryDesc, direction, count);
267+
prev_ExecutorRun(queryDesc, direction, count, execute_once);
267268
else
268-
standard_ExecutorRun(queryDesc, direction, count);
269+
standard_ExecutorRun(queryDesc, direction, count, execute_once);
269270
nesting_level--;
270271
}
271272
PG_CATCH();

contrib/pg_stat_statements/pg_stat_statements.c

+5-4
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query);
290290
static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
291291
static void pgss_ExecutorRun(QueryDesc *queryDesc,
292292
ScanDirection direction,
293-
uint64 count);
293+
uint64 count, bool execute_once);
294294
static void pgss_ExecutorFinish(QueryDesc *queryDesc);
295295
static void pgss_ExecutorEnd(QueryDesc *queryDesc);
296296
static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
@@ -871,15 +871,16 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
871871
* ExecutorRun hook: all we need do is track nesting depth
872872
*/
873873
static void
874-
pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
874+
pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
875+
bool execute_once)
875876
{
876877
nested_level++;
877878
PG_TRY();
878879
{
879880
if (prev_ExecutorRun)
880-
prev_ExecutorRun(queryDesc, direction, count);
881+
prev_ExecutorRun(queryDesc, direction, count, execute_once);
881882
else
882-
standard_ExecutorRun(queryDesc, direction, count);
883+
standard_ExecutorRun(queryDesc, direction, count, execute_once);
883884
nested_level--;
884885
}
885886
PG_CATCH();

src/backend/commands/copy.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -2074,7 +2074,7 @@ CopyTo(CopyState cstate)
20742074
else
20752075
{
20762076
/* run the plan --- the dest receiver will send tuples */
2077-
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
2077+
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
20782078
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
20792079
}
20802080

src/backend/commands/createas.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString,
347347
ExecutorStart(queryDesc, GetIntoRelEFlags(into));
348348

349349
/* run the plan to completion */
350-
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
350+
ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
351351

352352
/* save the rowcount if we're given a completionTag to fill */
353353
if (completionTag)

src/backend/commands/explain.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
530530
dir = ForwardScanDirection;
531531

532532
/* run the plan */
533-
ExecutorRun(queryDesc, dir, 0L);
533+
ExecutorRun(queryDesc, dir, 0L, true);
534534

535535
/* run cleanup too */
536536
ExecutorFinish(queryDesc);

src/backend/commands/extension.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ execute_sql_string(const char *sql, const char *filename)
742742
dest, NULL, 0);
743743

744744
ExecutorStart(qdesc, 0);
745-
ExecutorRun(qdesc, ForwardScanDirection, 0);
745+
ExecutorRun(qdesc, ForwardScanDirection, 0, true);
746746
ExecutorFinish(qdesc);
747747
ExecutorEnd(qdesc);
748748

src/backend/commands/matview.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
424424
ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
425425

426426
/* run the plan */
427-
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
427+
ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
428428

429429
processed = queryDesc->estate->es_processed;
430430

src/backend/commands/portalcmds.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ PersistHoldablePortal(Portal portal)
395395
true);
396396

397397
/* Fetch the result set into the tuplestore */
398-
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
398+
ExecutorRun(queryDesc, ForwardScanDirection, 0L, false);
399399

400400
(*queryDesc->dest->rDestroy) (queryDesc->dest);
401401
queryDesc->dest = NULL;

src/backend/commands/prepare.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause,
301301
*/
302302
PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
303303

304-
(void) PortalRun(portal, count, false, dest, dest, completionTag);
304+
(void) PortalRun(portal, count, false, true, dest, dest, completionTag);
305305

306306
PortalDrop(portal, false);
307307

src/backend/executor/execMain.c

+26-12
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ static void ExecutePlan(EState *estate, PlanState *planstate,
8585
bool sendTuples,
8686
uint64 numberTuples,
8787
ScanDirection direction,
88-
DestReceiver *dest);
88+
DestReceiver *dest,
89+
bool execute_once);
8990
static bool ExecCheckRTEPerms(RangeTblEntry *rte);
9091
static bool ExecCheckRTEPermsModified(Oid relOid, Oid userid,
9192
Bitmapset *modifiedCols,
@@ -288,17 +289,18 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
288289
*/
289290
void
290291
ExecutorRun(QueryDesc *queryDesc,
291-
ScanDirection direction, uint64 count)
292+
ScanDirection direction, uint64 count,
293+
bool execute_once)
292294
{
293295
if (ExecutorRun_hook)
294-
(*ExecutorRun_hook) (queryDesc, direction, count);
296+
(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
295297
else
296-
standard_ExecutorRun(queryDesc, direction, count);
298+
standard_ExecutorRun(queryDesc, direction, count, execute_once);
297299
}
298300

299301
void
300302
standard_ExecutorRun(QueryDesc *queryDesc,
301-
ScanDirection direction, uint64 count)
303+
ScanDirection direction, uint64 count, bool execute_once)
302304
{
303305
EState *estate;
304306
CmdType operation;
@@ -345,14 +347,21 @@ standard_ExecutorRun(QueryDesc *queryDesc,
345347
* run plan
346348
*/
347349
if (!ScanDirectionIsNoMovement(direction))
350+
{
351+
if (execute_once && queryDesc->already_executed)
352+
elog(ERROR, "can't re-execute query flagged for single execution");
353+
queryDesc->already_executed = true;
354+
348355
ExecutePlan(estate,
349356
queryDesc->planstate,
350357
queryDesc->plannedstmt->parallelModeNeeded,
351358
operation,
352359
sendTuples,
353360
count,
354361
direction,
355-
dest);
362+
dest,
363+
execute_once);
364+
}
356365

357366
/*
358367
* shutdown tuple receiver, if we started it
@@ -1595,7 +1604,8 @@ ExecutePlan(EState *estate,
15951604
bool sendTuples,
15961605
uint64 numberTuples,
15971606
ScanDirection direction,
1598-
DestReceiver *dest)
1607+
DestReceiver *dest,
1608+
bool execute_once)
15991609
{
16001610
TupleTableSlot *slot;
16011611
uint64 current_tuple_count;
@@ -1611,12 +1621,12 @@ ExecutePlan(EState *estate,
16111621
estate->es_direction = direction;
16121622

16131623
/*
1614-
* If a tuple count was supplied, we must force the plan to run without
1615-
* parallelism, because we might exit early. Also disable parallelism
1616-
* when writing into a relation, because no database changes are allowed
1617-
* in parallel mode.
1624+
* If the plan might potentially be executed multiple times, we must force
1625+
* it to run without parallelism, because we might exit early. Also
1626+
* disable parallelism when writing into a relation, because no database
1627+
* changes are allowed in parallel mode.
16181628
*/
1619-
if (numberTuples || dest->mydest == DestIntoRel)
1629+
if (!execute_once || dest->mydest == DestIntoRel)
16201630
use_parallel_mode = false;
16211631

16221632
if (use_parallel_mode)
@@ -1687,7 +1697,11 @@ ExecutePlan(EState *estate,
16871697
*/
16881698
current_tuple_count++;
16891699
if (numberTuples && numberTuples == current_tuple_count)
1700+
{
1701+
/* Allow nodes to release or shut down resources. */
1702+
(void) ExecShutdownNode(planstate);
16901703
break;
1704+
}
16911705
}
16921706

16931707
if (use_parallel_mode)

src/backend/executor/execParallel.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
853853
ExecParallelInitializeWorker(queryDesc->planstate, toc);
854854

855855
/* Run the plan */
856-
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
856+
ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
857857

858858
/* Shut down the executor */
859859
ExecutorFinish(queryDesc);

src/backend/executor/functions.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
855855
/* Run regular commands to completion unless lazyEval */
856856
uint64 count = (es->lazyEval) ? 1 : 0;
857857

858-
ExecutorRun(es->qd, ForwardScanDirection, count);
858+
ExecutorRun(es->qd, ForwardScanDirection, count, !fcache->returnsSet || !es->lazyEval);
859859

860860
/*
861861
* If we requested run to completion OR there was no tuple returned,

src/backend/executor/spi.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -2305,7 +2305,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
23052305

23062306
ExecutorStart(queryDesc, eflags);
23072307

2308-
ExecutorRun(queryDesc, ForwardScanDirection, tcount);
2308+
ExecutorRun(queryDesc, ForwardScanDirection, tcount, true);
23092309

23102310
_SPI_current->processed = queryDesc->estate->es_processed;
23112311
_SPI_current->lastoid = queryDesc->estate->es_lastoid;

src/backend/tcop/postgres.c

+2
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,7 @@ exec_simple_query(const char *query_string)
11011101
(void) PortalRun(portal,
11021102
FETCH_ALL,
11031103
isTopLevel,
1104+
true,
11041105
receiver,
11051106
receiver,
11061107
completionTag);
@@ -1985,6 +1986,7 @@ exec_execute_message(const char *portal_name, long max_rows)
19851986
completed = PortalRun(portal,
19861987
max_rows,
19871988
true, /* always top level */
1989+
!execute_is_fetch && max_rows == FETCH_ALL,
19881990
receiver,
19891991
receiver,
19901992
completionTag);

src/backend/tcop/pquery.c

+16-4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt,
9090
qd->planstate = NULL;
9191
qd->totaltime = NULL;
9292

93+
/* not yet executed */
94+
qd->already_executed = false;
95+
9396
return qd;
9497
}
9598

@@ -152,7 +155,7 @@ ProcessQuery(PlannedStmt *plan,
152155
/*
153156
* Run the plan to completion.
154157
*/
155-
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
158+
ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
156159

157160
/*
158161
* Build command completion status string, if caller wants one.
@@ -679,7 +682,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
679682
* suspended due to exhaustion of the count parameter.
680683
*/
681684
bool
682-
PortalRun(Portal portal, long count, bool isTopLevel,
685+
PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
683686
DestReceiver *dest, DestReceiver *altdest,
684687
char *completionTag)
685688
{
@@ -712,6 +715,10 @@ PortalRun(Portal portal, long count, bool isTopLevel,
712715
*/
713716
MarkPortalActive(portal);
714717

718+
/* Set run_once flag. Shouldn't be clear if previously set. */
719+
Assert(!portal->run_once || run_once);
720+
portal->run_once = run_once;
721+
715722
/*
716723
* Set up global portal context pointers.
717724
*
@@ -918,7 +925,8 @@ PortalRunSelect(Portal portal,
918925
else
919926
{
920927
PushActiveSnapshot(queryDesc->snapshot);
921-
ExecutorRun(queryDesc, direction, (uint64) count);
928+
ExecutorRun(queryDesc, direction, (uint64) count,
929+
portal->run_once);
922930
nprocessed = queryDesc->estate->es_processed;
923931
PopActiveSnapshot();
924932
}
@@ -957,7 +965,8 @@ PortalRunSelect(Portal portal,
957965
else
958966
{
959967
PushActiveSnapshot(queryDesc->snapshot);
960-
ExecutorRun(queryDesc, direction, (uint64) count);
968+
ExecutorRun(queryDesc, direction, (uint64) count,
969+
portal->run_once);
961970
nprocessed = queryDesc->estate->es_processed;
962971
PopActiveSnapshot();
963972
}
@@ -1394,6 +1403,9 @@ PortalRunFetch(Portal portal,
13941403
*/
13951404
MarkPortalActive(portal);
13961405

1406+
/* If supporting FETCH, portal can't be run-once. */
1407+
Assert(!portal->run_once);
1408+
13971409
/*
13981410
* Set up global portal context pointers.
13991411
*/

src/include/executor/execdesc.h

+3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ typedef struct QueryDesc
4747
EState *estate; /* executor's query-wide state */
4848
PlanState *planstate; /* tree of per-plan-node state */
4949

50+
/* This field is set by ExecutorRun */
51+
bool already_executed; /* true if previously executed */
52+
5053
/* This is always set NULL by the core system, but plugins can change it */
5154
struct Instrumentation *totaltime; /* total time spent in ExecutorRun */
5255
} QueryDesc;

src/include/executor/executor.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
8181
/* Hook for plugins to get control in ExecutorRun() */
8282
typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc,
8383
ScanDirection direction,
84-
uint64 count);
84+
uint64 count,
85+
bool execute_once);
8586
extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
8687

8788
/* Hook for plugins to get control in ExecutorFinish() */
@@ -176,9 +177,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter,
176177
extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
177178
extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
178179
extern void ExecutorRun(QueryDesc *queryDesc,
179-
ScanDirection direction, uint64 count);
180+
ScanDirection direction, uint64 count, bool execute_once);
180181
extern void standard_ExecutorRun(QueryDesc *queryDesc,
181-
ScanDirection direction, uint64 count);
182+
ScanDirection direction, uint64 count, bool execute_once);
182183
extern void ExecutorFinish(QueryDesc *queryDesc);
183184
extern void standard_ExecutorFinish(QueryDesc *queryDesc);
184185
extern void ExecutorEnd(QueryDesc *queryDesc);

src/include/tcop/pquery.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ extern void PortalSetResultFormat(Portal portal, int nFormats,
3434
int16 *formats);
3535

3636
extern bool PortalRun(Portal portal, long count, bool isTopLevel,
37-
DestReceiver *dest, DestReceiver *altdest,
37+
bool run_once, DestReceiver *dest, DestReceiver *altdest,
3838
char *completionTag);
3939

4040
extern uint64 PortalRunFetch(Portal portal,

src/include/utils/portal.h

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ typedef struct PortalData
141141
/* Features/options */
142142
PortalStrategy strategy; /* see above */
143143
int cursorOptions; /* DECLARE CURSOR option bits */
144+
bool run_once; /* portal will only be run once */
144145

145146
/* Status data */
146147
PortalStatus status; /* see above */

0 commit comments

Comments
 (0)