Skip to content

Commit 5e6d8d2

Browse files
committed
Allow parallel workers to execute subplans.
This doesn't do anything to make Param nodes anything other than parallel-restricted, so this only helps with uncorrelated subplans, and it's not necessarily very cheap because each worker will run the subplan separately (just as a Hash Join will build a separate copy of the hash table in each participating process), but it's a first step toward supporting cases that are more likely to help in practice, and is occasionally useful on its own. Amit Kapila, reviewed and tested by Rafia Sabih, Dilip Kumar, and me. Discussion: http://postgr.es/m/CAA4eK1+e8Z45D2n+rnDMDYsVEb5iW7jqaCH_tvPMYau=1Rru9w@mail.gmail.com
1 parent 8da9a22 commit 5e6d8d2

File tree

10 files changed

+61
-14
lines changed

10 files changed

+61
-14
lines changed

src/backend/executor/execParallel.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ ExecSerializePlan(Plan *plan, EState *estate)
156156
pstmt->planTree = plan;
157157
pstmt->rtable = estate->es_range_table;
158158
pstmt->resultRelations = NIL;
159-
pstmt->subplans = NIL;
159+
pstmt->subplans = estate->es_plannedstmt->subplans;
160160
pstmt->rewindPlanIDs = NULL;
161161
pstmt->rowMarks = NIL;
162162
pstmt->relationOids = NIL;

src/backend/nodes/copyfuncs.c

+1
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,7 @@ _copySubPlan(const SubPlan *from)
14951495
COPY_SCALAR_FIELD(firstColCollation);
14961496
COPY_SCALAR_FIELD(useHashTable);
14971497
COPY_SCALAR_FIELD(unknownEqFalse);
1498+
COPY_SCALAR_FIELD(parallel_safe);
14981499
COPY_NODE_FIELD(setParam);
14991500
COPY_NODE_FIELD(parParam);
15001501
COPY_NODE_FIELD(args);

src/backend/nodes/equalfuncs.c

+1
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ _equalSubPlan(const SubPlan *a, const SubPlan *b)
423423
COMPARE_SCALAR_FIELD(firstColCollation);
424424
COMPARE_SCALAR_FIELD(useHashTable);
425425
COMPARE_SCALAR_FIELD(unknownEqFalse);
426+
COMPARE_SCALAR_FIELD(parallel_safe);
426427
COMPARE_NODE_FIELD(setParam);
427428
COMPARE_NODE_FIELD(parParam);
428429
COMPARE_NODE_FIELD(args);

src/backend/nodes/outfuncs.c

+1
Original file line numberDiff line numberDiff line change
@@ -1226,6 +1226,7 @@ _outSubPlan(StringInfo str, const SubPlan *node)
12261226
WRITE_OID_FIELD(firstColCollation);
12271227
WRITE_BOOL_FIELD(useHashTable);
12281228
WRITE_BOOL_FIELD(unknownEqFalse);
1229+
WRITE_BOOL_FIELD(parallel_safe);
12291230
WRITE_NODE_FIELD(setParam);
12301231
WRITE_NODE_FIELD(parParam);
12311232
WRITE_NODE_FIELD(args);

src/backend/nodes/readfuncs.c

+1
Original file line numberDiff line numberDiff line change
@@ -2233,6 +2233,7 @@ _readSubPlan(void)
22332233
READ_OID_FIELD(firstColCollation);
22342234
READ_BOOL_FIELD(useHashTable);
22352235
READ_BOOL_FIELD(unknownEqFalse);
2236+
READ_BOOL_FIELD(parallel_safe);
22362237
READ_NODE_FIELD(setParam);
22372238
READ_NODE_FIELD(parParam);
22382239
READ_NODE_FIELD(args);

src/backend/optimizer/plan/subselect.c

+13-4
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ static Node *build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot,
5858
List *plan_params,
5959
SubLinkType subLinkType, int subLinkId,
6060
Node *testexpr, bool adjust_testexpr,
61-
bool unknownEqFalse);
61+
bool unknownEqFalse, bool parallel_safe);
6262
static List *generate_subquery_params(PlannerInfo *root, List *tlist,
6363
List **paramIds);
6464
static List *generate_subquery_vars(PlannerInfo *root, List *tlist,
@@ -551,7 +551,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery,
551551
/* And convert to SubPlan or InitPlan format. */
552552
result = build_subplan(root, plan, subroot, plan_params,
553553
subLinkType, subLinkId,
554-
testexpr, true, isTopQual);
554+
testexpr, true, isTopQual,
555+
best_path->parallel_safe);
555556

556557
/*
557558
* If it's a correlated EXISTS with an unimportant targetlist, we might be
@@ -604,7 +605,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery,
604605
plan_params,
605606
ANY_SUBLINK, 0,
606607
newtestexpr,
607-
false, true);
608+
false, true,
609+
best_path->parallel_safe);
608610
/* Check we got what we expected */
609611
Assert(IsA(hashplan, SubPlan));
610612
Assert(hashplan->parParam == NIL);
@@ -634,7 +636,7 @@ build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot,
634636
List *plan_params,
635637
SubLinkType subLinkType, int subLinkId,
636638
Node *testexpr, bool adjust_testexpr,
637-
bool unknownEqFalse)
639+
bool unknownEqFalse, bool parallel_safe)
638640
{
639641
Node *result;
640642
SubPlan *splan;
@@ -653,6 +655,7 @@ build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot,
653655
&splan->firstColCollation);
654656
splan->useHashTable = false;
655657
splan->unknownEqFalse = unknownEqFalse;
658+
splan->parallel_safe = parallel_safe;
656659
splan->setParam = NIL;
657660
splan->parParam = NIL;
658661
splan->args = NIL;
@@ -1213,6 +1216,12 @@ SS_process_ctes(PlannerInfo *root)
12131216
&splan->firstColCollation);
12141217
splan->useHashTable = false;
12151218
splan->unknownEqFalse = false;
1219+
1220+
/*
1221+
* CTE scans are not considered for parallelism (cf
1222+
* set_rel_consider_parallel).
1223+
*/
1224+
splan->parallel_safe = false;
12161225
splan->setParam = NIL;
12171226
splan->parParam = NIL;
12181227
splan->args = NIL;

src/backend/optimizer/util/clauses.c

+7-9
Original file line numberDiff line numberDiff line change
@@ -1162,21 +1162,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
11621162
}
11631163

11641164
/*
1165-
* Since we don't have the ability to push subplans down to workers at
1166-
* present, we treat subplan references as parallel-restricted. We need
1167-
* not worry about examining their contents; if they are unsafe, we would
1168-
* have found that out while examining the whole tree before reduction of
1169-
* sublinks to subplans. (Really we should not see SubLink during a
1170-
* max_interesting == restricted scan, but if we do, return true.)
1165+
* Really we should not see SubLink during a max_interesting == restricted
1166+
* scan, but if we do, return true.
11711167
*/
1172-
else if (IsA(node, SubLink) ||
1173-
IsA(node, SubPlan) ||
1174-
IsA(node, AlternativeSubPlan))
1168+
else if (IsA(node, SubLink))
11751169
{
11761170
if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
11771171
return true;
11781172
}
11791173

1174+
/* We can push the subplans only if they are parallel-safe. */
1175+
else if (IsA(node, SubPlan))
1176+
return !((SubPlan *) node)->parallel_safe;
1177+
11801178
/*
11811179
* We can't pass Params to workers at the moment either, so they are also
11821180
* parallel-restricted.

src/include/nodes/primnodes.h

+1
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,7 @@ typedef struct SubPlan
677677
bool unknownEqFalse; /* TRUE if it's okay to return FALSE when the
678678
* spec result is UNKNOWN; this allows much
679679
* simpler handling of null values */
680+
bool parallel_safe; /* OK to use as part of parallel plan? */
680681
/* Information for passing params into and out of the subselect: */
681682
/* setParam and parParam are lists of integers (param IDs) */
682683
List *setParam; /* initplan subqueries have to set these

src/test/regress/expected/select_parallel.out

+26
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,32 @@ explain (costs off)
9999
-> Index Only Scan using tenk1_unique1 on tenk1
100100
(3 rows)
101101

102+
-- test parallel plans for queries containing un-correlated subplans.
103+
alter table tenk2 set (parallel_workers = 0);
104+
explain (costs off)
105+
select count(*) from tenk1 where (two, four) not in
106+
(select hundred, thousand from tenk2 where thousand > 100);
107+
QUERY PLAN
108+
------------------------------------------------------
109+
Finalize Aggregate
110+
-> Gather
111+
Workers Planned: 4
112+
-> Partial Aggregate
113+
-> Parallel Seq Scan on tenk1
114+
Filter: (NOT (hashed SubPlan 1))
115+
SubPlan 1
116+
-> Seq Scan on tenk2
117+
Filter: (thousand > 100)
118+
(9 rows)
119+
120+
select count(*) from tenk1 where (two, four) not in
121+
(select hundred, thousand from tenk2 where thousand > 100);
122+
count
123+
-------
124+
10000
125+
(1 row)
126+
127+
alter table tenk2 reset (parallel_workers);
102128
set force_parallel_mode=1;
103129
explain (costs off)
104130
select stringu1::int2 from tenk1 where unique1 = 1;

src/test/regress/sql/select_parallel.sql

+9
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ explain (costs off)
3939
select sum(parallel_restricted(unique1)) from tenk1
4040
group by(parallel_restricted(unique1));
4141

42+
-- test parallel plans for queries containing un-correlated subplans.
43+
alter table tenk2 set (parallel_workers = 0);
44+
explain (costs off)
45+
select count(*) from tenk1 where (two, four) not in
46+
(select hundred, thousand from tenk2 where thousand > 100);
47+
select count(*) from tenk1 where (two, four) not in
48+
(select hundred, thousand from tenk2 where thousand > 100);
49+
alter table tenk2 reset (parallel_workers);
50+
4251
set force_parallel_mode=1;
4352

4453
explain (costs off)

0 commit comments

Comments
 (0)