Skip to content

Commit 3bd909b

Browse files
committed
Add a Gather executor node.
A Gather executor node runs any number of copies of a plan in an equal number of workers and merges all of the results into a single tuple stream. It can also run the plan itself, if the workers are unavailable or haven't started up yet. It is intended to work with the Partial Seq Scan node which will be added in future commits. It could also be used to implement parallel query of a different sort by itself, without help from Partial Seq Scan, if the single_copy mode is used. In that mode, a worker executes the plan, and the parallel leader does not, merely collecting the worker's results. So, a Gather node could be inserted into a plan to split the execution of that plan across two processes. Nested Gather nodes aren't currently supported, but we might want to add support for that in the future. There's nothing in the planner to actually generate Gather nodes yet, so it's not quite time to break out the champagne. But we're getting close. Amit Kapila. Some designs suggestions were provided by me, and I also reviewed the patch. Single-copy mode, documentation, and other minor changes also by me.
1 parent 227d57f commit 3bd909b

26 files changed

+709
-8
lines changed

doc/src/sgml/config.sgml

+46
Original file line numberDiff line numberDiff line change
@@ -1928,6 +1928,22 @@ include_dir 'conf.d'
19281928
</para>
19291929
</listitem>
19301930
</varlistentry>
1931+
1932+
<varlistentry id="guc-max-parallel-degree" xreflabel="max_parallel_degree">
1933+
<term><varname>max_parallel_degree</varname> (<type>integer</type>)
1934+
<indexterm>
1935+
<primary><varname>max_parallel_degree</> configuration parameter</primary>
1936+
</indexterm>
1937+
</term>
1938+
<listitem>
1939+
<para>
1940+
Sets the maximum degree of parallelism for an individual parallel
1941+
operation. Note that the requested number of workers may not actually
1942+
be available at runtime. Parallel workers are taken from the pool
1943+
of processes established by <xref linkend="guc-max-worker-processes">.
1944+
</para>
1945+
</listitem>
1946+
</varlistentry>
19311947
</variablelist>
19321948
</sect2>
19331949
</sect1>
@@ -3398,6 +3414,36 @@ include_dir 'conf.d'
33983414
</listitem>
33993415
</varlistentry>
34003416

3417+
<varlistentry id="parallel-tuple-cost" xreflabel="parallel_tuple_cost">
3418+
<term><varname>parallel_tuple_cost</varname> (<type>floating point</type>)
3419+
<indexterm>
3420+
<primary><varname>parallel_tuple_cost</> configuration parameter</primary>
3421+
</indexterm>
3422+
</term>
3423+
<listitem>
3424+
<para>
3425+
Sets the planner's estimate of the cost of transferring a tuple
3426+
from a parallel worker process to another process.
3427+
The default is 0.1.
3428+
</para>
3429+
</listitem>
3430+
</varlistentry>
3431+
3432+
<varlistentry id="parallel-setup-cost" xreflabel="parallel_setup_cost">
3433+
<term><varname>parallel_setup_cost</varname> (<type>floating point</type>)
3434+
<indexterm>
3435+
<primary><varname>parallel_setup_cost</> configuration parameter</primary>
3436+
</indexterm>
3437+
</term>
3438+
<listitem>
3439+
<para>
3440+
Sets the planner's estimate of the cost of launching parallel worker
3441+
processes.
3442+
The default is 1000.
3443+
</para>
3444+
</listitem>
3445+
</varlistentry>
3446+
34013447
<varlistentry id="guc-effective-cache-size" xreflabel="effective_cache_size">
34023448
<term><varname>effective_cache_size</varname> (<type>integer</type>)
34033449
<indexterm>

src/backend/commands/explain.c

+19
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
853853
case T_SampleScan:
854854
pname = sname = "Sample Scan";
855855
break;
856+
case T_Gather:
857+
pname = sname = "Gather";
858+
break;
856859
case T_IndexScan:
857860
pname = sname = "Index Scan";
858861
break;
@@ -1276,6 +1279,22 @@ ExplainNode(PlanState *planstate, List *ancestors,
12761279
show_instrumentation_count("Rows Removed by Filter", 1,
12771280
planstate, es);
12781281
break;
1282+
case T_Gather:
1283+
{
1284+
Gather *gather = (Gather *) plan;
1285+
1286+
show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
1287+
if (plan->qual)
1288+
show_instrumentation_count("Rows Removed by Filter", 1,
1289+
planstate, es);
1290+
ExplainPropertyInteger("Number of Workers",
1291+
gather->num_workers, es);
1292+
if (gather->single_copy)
1293+
ExplainPropertyText("Single Copy",
1294+
gather->single_copy ? "true" : "false",
1295+
es);
1296+
}
1297+
break;
12791298
case T_FunctionScan:
12801299
if (es->verbose)
12811300
{

src/backend/executor/Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
1717
execScan.o execTuples.o \
1818
execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
1919
nodeBitmapAnd.o nodeBitmapOr.o \
20-
nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \
21-
nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
20+
nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeGather.o \
21+
nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
2222
nodeLimit.o nodeLockRows.o \
2323
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
2424
nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \

src/backend/executor/execAmi.c

+8
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "executor/nodeCustom.h"
2525
#include "executor/nodeForeignscan.h"
2626
#include "executor/nodeFunctionscan.h"
27+
#include "executor/nodeGather.h"
2728
#include "executor/nodeGroup.h"
2829
#include "executor/nodeGroup.h"
2930
#include "executor/nodeHash.h"
@@ -160,6 +161,10 @@ ExecReScan(PlanState *node)
160161
ExecReScanSampleScan((SampleScanState *) node);
161162
break;
162163

164+
case T_GatherState:
165+
ExecReScanGather((GatherState *) node);
166+
break;
167+
163168
case T_IndexScanState:
164169
ExecReScanIndexScan((IndexScanState *) node);
165170
break;
@@ -467,6 +472,9 @@ ExecSupportsBackwardScan(Plan *node)
467472
/* Simplify life for tablesample methods by disallowing this */
468473
return false;
469474

475+
case T_Gather:
476+
return false;
477+
470478
case T_IndexScan:
471479
return IndexSupportsBackwardScan(((IndexScan *) node)->indexid) &&
472480
TargetListSupportsBackwardScan(node->targetlist);

src/backend/executor/execMain.c

+3
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,9 @@ standard_ExecutorRun(QueryDesc *queryDesc,
347347
direction,
348348
dest);
349349

350+
/* Allow nodes to release or shut down resources. */
351+
(void) ExecShutdownNode(queryDesc->planstate);
352+
350353
/*
351354
* shutdown tuple receiver, if we started it
352355
*/

src/backend/executor/execParallel.c

+8-6
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ typedef struct ExecParallelInitializeDSMContext
7171
} ExecParallelInitializeDSMContext;
7272

7373
/* Helper functions that run in the parallel leader. */
74-
static char *ExecSerializePlan(Plan *plan, List *rangetable);
74+
static char *ExecSerializePlan(Plan *plan, EState *estate);
7575
static bool ExecParallelEstimate(PlanState *node,
7676
ExecParallelEstimateContext *e);
7777
static bool ExecParallelInitializeDSM(PlanState *node,
@@ -88,7 +88,7 @@ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
8888
* Create a serialized representation of the plan to be sent to each worker.
8989
*/
9090
static char *
91-
ExecSerializePlan(Plan *plan, List *rangetable)
91+
ExecSerializePlan(Plan *plan, EState *estate)
9292
{
9393
PlannedStmt *pstmt;
9494
ListCell *tlist;
@@ -125,13 +125,13 @@ ExecSerializePlan(Plan *plan, List *rangetable)
125125
pstmt->canSetTag = 1;
126126
pstmt->transientPlan = 0;
127127
pstmt->planTree = plan;
128-
pstmt->rtable = rangetable;
128+
pstmt->rtable = estate->es_range_table;
129129
pstmt->resultRelations = NIL;
130130
pstmt->utilityStmt = NULL;
131131
pstmt->subplans = NIL;
132132
pstmt->rewindPlanIDs = NULL;
133133
pstmt->rowMarks = NIL;
134-
pstmt->nParamExec = 0;
134+
pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
135135
pstmt->relationOids = NIL;
136136
pstmt->invalItems = NIL; /* workers can't replan anyway... */
137137
pstmt->hasRowSecurity = false;
@@ -271,7 +271,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
271271
pei->planstate = planstate;
272272

273273
/* Fix up and serialize plan to be sent to workers. */
274-
pstmt_data = ExecSerializePlan(planstate->plan, estate->es_range_table);
274+
pstmt_data = ExecSerializePlan(planstate->plan, estate);
275275

276276
/* Create a parallel context. */
277277
pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
@@ -568,7 +568,6 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
568568
ExecutorStart(queryDesc, 0);
569569
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
570570
ExecutorFinish(queryDesc);
571-
ExecutorEnd(queryDesc);
572571

573572
/* Report buffer usage during parallel execution. */
574573
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
@@ -579,6 +578,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
579578
ExecParallelReportInstrumentation(queryDesc->planstate,
580579
instrumentation);
581580

581+
/* Must do this after capturing instrumentation. */
582+
ExecutorEnd(queryDesc);
583+
582584
/* Cleanup. */
583585
FreeQueryDesc(queryDesc);
584586
(*receiver->rDestroy) (receiver);

src/backend/executor/execProcnode.c

+46
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
#include "executor/nodeMergejoin.h"
101101
#include "executor/nodeModifyTable.h"
102102
#include "executor/nodeNestloop.h"
103+
#include "executor/nodeGather.h"
103104
#include "executor/nodeRecursiveunion.h"
104105
#include "executor/nodeResult.h"
105106
#include "executor/nodeSamplescan.h"
@@ -113,6 +114,7 @@
113114
#include "executor/nodeValuesscan.h"
114115
#include "executor/nodeWindowAgg.h"
115116
#include "executor/nodeWorktablescan.h"
117+
#include "nodes/nodeFuncs.h"
116118
#include "miscadmin.h"
117119

118120

@@ -307,6 +309,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
307309
estate, eflags);
308310
break;
309311

312+
case T_Gather:
313+
result = (PlanState *) ExecInitGather((Gather *) node,
314+
estate, eflags);
315+
break;
316+
310317
case T_Hash:
311318
result = (PlanState *) ExecInitHash((Hash *) node,
312319
estate, eflags);
@@ -504,6 +511,10 @@ ExecProcNode(PlanState *node)
504511
result = ExecUnique((UniqueState *) node);
505512
break;
506513

514+
case T_GatherState:
515+
result = ExecGather((GatherState *) node);
516+
break;
517+
507518
case T_HashState:
508519
result = ExecHash((HashState *) node);
509520
break;
@@ -658,6 +669,10 @@ ExecEndNode(PlanState *node)
658669
ExecEndSampleScan((SampleScanState *) node);
659670
break;
660671

672+
case T_GatherState:
673+
ExecEndGather((GatherState *) node);
674+
break;
675+
661676
case T_IndexScanState:
662677
ExecEndIndexScan((IndexScanState *) node);
663678
break;
@@ -769,3 +784,34 @@ ExecEndNode(PlanState *node)
769784
break;
770785
}
771786
}
787+
788+
/*
789+
* ExecShutdownNode
790+
*
791+
* Give execution nodes a chance to stop asynchronous resource consumption
792+
* and release any resources still held. Currently, this is only used for
793+
* parallel query, but we might want to extend it to other cases also (e.g.
794+
* FDW). We might also want to call it sooner, as soon as it's evident that
795+
* no more rows will be needed (e.g. when a Limit is filled) rather than only
796+
* at the end of ExecutorRun.
797+
*/
798+
bool
799+
ExecShutdownNode(PlanState *node)
800+
{
801+
if (node == NULL)
802+
return false;
803+
804+
switch (nodeTag(node))
805+
{
806+
case T_GatherState:
807+
{
808+
ExecShutdownGather((GatherState *) node);
809+
return true;
810+
}
811+
break;
812+
default:
813+
break;
814+
}
815+
816+
return planstate_tree_walker(node, ExecShutdownNode, NULL);
817+
}

0 commit comments

Comments
 (0)