Skip to content

Commit c2bb02b

Browse files
author
Etsuro Fujita
committed
Allow asynchronous execution in more cases.
In commit 27e1f14, create_append_plan() only allowed the subplan created from a given subpath to be executed asynchronously when it was an async-capable ForeignPath. To extend coverage, this patch handles cases when the given subpath includes some other Path types as well that can be omitted in the plan processing, such as a ProjectionPath directly atop an async-capable ForeignPath, allowing asynchronous execution in partitioned-scan/partitioned-join queries with non-Var tlist expressions and more UNION queries. Andrey Lepikhov and Etsuro Fujita, reviewed by Alexander Pyhalov and Zhihong Yu. Discussion: https://postgr.es/m/659c37a8-3e71-0ff2-394c-f04428c76f08%40postgrespro.ru
1 parent 376dc43 commit c2bb02b

File tree

9 files changed

+287
-15
lines changed

9 files changed

+287
-15
lines changed

contrib/postgres_fdw/expected/postgres_fdw.out

+170
Original file line numberDiff line numberDiff line change
@@ -10221,6 +10221,31 @@ SELECT * FROM result_tbl ORDER BY a;
1022110221
2505 | 505 | 0505
1022210222
(2 rows)
1022310223

10224+
DELETE FROM result_tbl;
10225+
EXPLAIN (VERBOSE, COSTS OFF)
10226+
INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
10227+
QUERY PLAN
10228+
---------------------------------------------------------------------------------
10229+
Insert on public.result_tbl
10230+
-> Append
10231+
-> Async Foreign Scan on public.async_p1 async_pt_1
10232+
Output: async_pt_1.a, async_pt_1.b, ('AAA'::text || async_pt_1.c)
10233+
Filter: (async_pt_1.b === 505)
10234+
Remote SQL: SELECT a, b, c FROM public.base_tbl1
10235+
-> Async Foreign Scan on public.async_p2 async_pt_2
10236+
Output: async_pt_2.a, async_pt_2.b, ('AAA'::text || async_pt_2.c)
10237+
Filter: (async_pt_2.b === 505)
10238+
Remote SQL: SELECT a, b, c FROM public.base_tbl2
10239+
(10 rows)
10240+
10241+
INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
10242+
SELECT * FROM result_tbl ORDER BY a;
10243+
a | b | c
10244+
------+-----+---------
10245+
1505 | 505 | AAA0505
10246+
2505 | 505 | AAA0505
10247+
(2 rows)
10248+
1022410249
DELETE FROM result_tbl;
1022510250
-- Check case where multiple partitions use the same connection
1022610251
CREATE TABLE base_tbl3 (a int, b int, c text);
@@ -10358,6 +10383,69 @@ SELECT * FROM join_tbl ORDER BY a1;
1035810383
3900 | 900 | 0900 | 3900 | 900 | 0900
1035910384
(30 rows)
1036010385

10386+
DELETE FROM join_tbl;
10387+
EXPLAIN (VERBOSE, COSTS OFF)
10388+
INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
10389+
QUERY PLAN
10390+
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
10391+
Insert on public.join_tbl
10392+
-> Append
10393+
-> Async Foreign Scan
10394+
Output: t1_1.a, t1_1.b, ('AAA'::text || t1_1.c), t2_1.a, t2_1.b, ('AAA'::text || t2_1.c)
10395+
Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1)
10396+
Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0))))
10397+
-> Async Foreign Scan
10398+
Output: t1_2.a, t1_2.b, ('AAA'::text || t1_2.c), t2_2.a, t2_2.b, ('AAA'::text || t2_2.c)
10399+
Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2)
10400+
Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0))))
10401+
-> Hash Join
10402+
Output: t1_3.a, t1_3.b, ('AAA'::text || t1_3.c), t2_3.a, t2_3.b, ('AAA'::text || t2_3.c)
10403+
Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b))
10404+
-> Seq Scan on public.async_p3 t2_3
10405+
Output: t2_3.a, t2_3.b, t2_3.c
10406+
-> Hash
10407+
Output: t1_3.a, t1_3.b, t1_3.c
10408+
-> Seq Scan on public.async_p3 t1_3
10409+
Output: t1_3.a, t1_3.b, t1_3.c
10410+
Filter: ((t1_3.b % 100) = 0)
10411+
(20 rows)
10412+
10413+
INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
10414+
SELECT * FROM join_tbl ORDER BY a1;
10415+
a1 | b1 | c1 | a2 | b2 | c2
10416+
------+-----+---------+------+-----+---------
10417+
1000 | 0 | AAA0000 | 1000 | 0 | AAA0000
10418+
1100 | 100 | AAA0100 | 1100 | 100 | AAA0100
10419+
1200 | 200 | AAA0200 | 1200 | 200 | AAA0200
10420+
1300 | 300 | AAA0300 | 1300 | 300 | AAA0300
10421+
1400 | 400 | AAA0400 | 1400 | 400 | AAA0400
10422+
1500 | 500 | AAA0500 | 1500 | 500 | AAA0500
10423+
1600 | 600 | AAA0600 | 1600 | 600 | AAA0600
10424+
1700 | 700 | AAA0700 | 1700 | 700 | AAA0700
10425+
1800 | 800 | AAA0800 | 1800 | 800 | AAA0800
10426+
1900 | 900 | AAA0900 | 1900 | 900 | AAA0900
10427+
2000 | 0 | AAA0000 | 2000 | 0 | AAA0000
10428+
2100 | 100 | AAA0100 | 2100 | 100 | AAA0100
10429+
2200 | 200 | AAA0200 | 2200 | 200 | AAA0200
10430+
2300 | 300 | AAA0300 | 2300 | 300 | AAA0300
10431+
2400 | 400 | AAA0400 | 2400 | 400 | AAA0400
10432+
2500 | 500 | AAA0500 | 2500 | 500 | AAA0500
10433+
2600 | 600 | AAA0600 | 2600 | 600 | AAA0600
10434+
2700 | 700 | AAA0700 | 2700 | 700 | AAA0700
10435+
2800 | 800 | AAA0800 | 2800 | 800 | AAA0800
10436+
2900 | 900 | AAA0900 | 2900 | 900 | AAA0900
10437+
3000 | 0 | AAA0000 | 3000 | 0 | AAA0000
10438+
3100 | 100 | AAA0100 | 3100 | 100 | AAA0100
10439+
3200 | 200 | AAA0200 | 3200 | 200 | AAA0200
10440+
3300 | 300 | AAA0300 | 3300 | 300 | AAA0300
10441+
3400 | 400 | AAA0400 | 3400 | 400 | AAA0400
10442+
3500 | 500 | AAA0500 | 3500 | 500 | AAA0500
10443+
3600 | 600 | AAA0600 | 3600 | 600 | AAA0600
10444+
3700 | 700 | AAA0700 | 3700 | 700 | AAA0700
10445+
3800 | 800 | AAA0800 | 3800 | 800 | AAA0800
10446+
3900 | 900 | AAA0900 | 3900 | 900 | AAA0900
10447+
(30 rows)
10448+
1036110449
DELETE FROM join_tbl;
1036210450
RESET enable_partitionwise_join;
1036310451
-- Test rescan of an async Append node with do_exec_prune=false
@@ -10536,6 +10624,88 @@ DROP TABLE local_tbl;
1053610624
DROP INDEX base_tbl1_idx;
1053710625
DROP INDEX base_tbl2_idx;
1053810626
DROP INDEX async_p3_idx;
10627+
-- UNION queries
10628+
EXPLAIN (VERBOSE, COSTS OFF)
10629+
INSERT INTO result_tbl
10630+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
10631+
UNION
10632+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
10633+
QUERY PLAN
10634+
-----------------------------------------------------------------------------------------------------------------
10635+
Insert on public.result_tbl
10636+
-> HashAggregate
10637+
Output: async_p1.a, async_p1.b, (('AAA'::text || async_p1.c))
10638+
Group Key: async_p1.a, async_p1.b, (('AAA'::text || async_p1.c))
10639+
-> Append
10640+
-> Async Foreign Scan on public.async_p1
10641+
Output: async_p1.a, async_p1.b, ('AAA'::text || async_p1.c)
10642+
Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY a ASC NULLS LAST LIMIT 10::bigint
10643+
-> Async Foreign Scan on public.async_p2
10644+
Output: async_p2.a, async_p2.b, ('AAA'::text || async_p2.c)
10645+
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((b < 10))
10646+
(11 rows)
10647+
10648+
INSERT INTO result_tbl
10649+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
10650+
UNION
10651+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
10652+
SELECT * FROM result_tbl ORDER BY a;
10653+
a | b | c
10654+
------+----+---------
10655+
1000 | 0 | AAA0000
10656+
1005 | 5 | AAA0005
10657+
1010 | 10 | AAA0010
10658+
1015 | 15 | AAA0015
10659+
1020 | 20 | AAA0020
10660+
1025 | 25 | AAA0025
10661+
1030 | 30 | AAA0030
10662+
1035 | 35 | AAA0035
10663+
1040 | 40 | AAA0040
10664+
1045 | 45 | AAA0045
10665+
2000 | 0 | AAA0000
10666+
2005 | 5 | AAA0005
10667+
(12 rows)
10668+
10669+
DELETE FROM result_tbl;
10670+
EXPLAIN (VERBOSE, COSTS OFF)
10671+
INSERT INTO result_tbl
10672+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
10673+
UNION ALL
10674+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
10675+
QUERY PLAN
10676+
-----------------------------------------------------------------------------------------------------------
10677+
Insert on public.result_tbl
10678+
-> Append
10679+
-> Async Foreign Scan on public.async_p1
10680+
Output: async_p1.a, async_p1.b, ('AAA'::text || async_p1.c)
10681+
Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY a ASC NULLS LAST LIMIT 10::bigint
10682+
-> Async Foreign Scan on public.async_p2
10683+
Output: async_p2.a, async_p2.b, ('AAA'::text || async_p2.c)
10684+
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((b < 10))
10685+
(8 rows)
10686+
10687+
INSERT INTO result_tbl
10688+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
10689+
UNION ALL
10690+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
10691+
SELECT * FROM result_tbl ORDER BY a;
10692+
a | b | c
10693+
------+----+---------
10694+
1000 | 0 | AAA0000
10695+
1005 | 5 | AAA0005
10696+
1010 | 10 | AAA0010
10697+
1015 | 15 | AAA0015
10698+
1020 | 20 | AAA0020
10699+
1025 | 25 | AAA0025
10700+
1030 | 30 | AAA0030
10701+
1035 | 35 | AAA0035
10702+
1040 | 40 | AAA0040
10703+
1045 | 45 | AAA0045
10704+
2000 | 0 | AAA0000
10705+
2005 | 5 | AAA0005
10706+
(12 rows)
10707+
10708+
DELETE FROM result_tbl;
1053910709
-- Test that pending requests are processed properly
1054010710
SET enable_mergejoin TO false;
1054110711
SET enable_hashjoin TO false;

contrib/postgres_fdw/sql/postgres_fdw.sql

+41
Original file line numberDiff line numberDiff line change
@@ -3245,6 +3245,13 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
32453245
SELECT * FROM result_tbl ORDER BY a;
32463246
DELETE FROM result_tbl;
32473247

3248+
EXPLAIN (VERBOSE, COSTS OFF)
3249+
INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
3250+
INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
3251+
3252+
SELECT * FROM result_tbl ORDER BY a;
3253+
DELETE FROM result_tbl;
3254+
32483255
-- Check case where multiple partitions use the same connection
32493256
CREATE TABLE base_tbl3 (a int, b int, c text);
32503257
CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
@@ -3286,6 +3293,13 @@ INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AN
32863293
SELECT * FROM join_tbl ORDER BY a1;
32873294
DELETE FROM join_tbl;
32883295

3296+
EXPLAIN (VERBOSE, COSTS OFF)
3297+
INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
3298+
INSERT INTO join_tbl SELECT t1.a, t1.b, 'AAA' || t1.c, t2.a, t2.b, 'AAA' || t2.c FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
3299+
3300+
SELECT * FROM join_tbl ORDER BY a1;
3301+
DELETE FROM join_tbl;
3302+
32893303
RESET enable_partitionwise_join;
32903304

32913305
-- Test rescan of an async Append node with do_exec_prune=false
@@ -3357,6 +3371,33 @@ DROP INDEX base_tbl1_idx;
33573371
DROP INDEX base_tbl2_idx;
33583372
DROP INDEX async_p3_idx;
33593373

3374+
-- UNION queries
3375+
EXPLAIN (VERBOSE, COSTS OFF)
3376+
INSERT INTO result_tbl
3377+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
3378+
UNION
3379+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
3380+
INSERT INTO result_tbl
3381+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
3382+
UNION
3383+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
3384+
3385+
SELECT * FROM result_tbl ORDER BY a;
3386+
DELETE FROM result_tbl;
3387+
3388+
EXPLAIN (VERBOSE, COSTS OFF)
3389+
INSERT INTO result_tbl
3390+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
3391+
UNION ALL
3392+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
3393+
INSERT INTO result_tbl
3394+
(SELECT a, b, 'AAA' || c FROM async_p1 ORDER BY a LIMIT 10)
3395+
UNION ALL
3396+
(SELECT a, b, 'AAA' || c FROM async_p2 WHERE b < 10);
3397+
3398+
SELECT * FROM result_tbl ORDER BY a;
3399+
DELETE FROM result_tbl;
3400+
33603401
-- Test that pending requests are processed properly
33613402
SET enable_mergejoin TO false;
33623403
SET enable_hashjoin TO false;

src/backend/nodes/copyfuncs.c

+1
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,7 @@ _copySubqueryScan(const SubqueryScan *from)
632632
* copy remainder of node
633633
*/
634634
COPY_NODE_FIELD(subplan);
635+
COPY_SCALAR_FIELD(scanstatus);
635636

636637
return newnode;
637638
}

src/backend/nodes/outfuncs.c

+1
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ _outSubqueryScan(StringInfo str, const SubqueryScan *node)
638638
_outScanInfo(str, (const Scan *) node);
639639

640640
WRITE_NODE_FIELD(subplan);
641+
WRITE_ENUM_FIELD(scanstatus, SubqueryScanStatus);
641642
}
642643

643644
static void

src/backend/nodes/readfuncs.c

+1
Original file line numberDiff line numberDiff line change
@@ -2194,6 +2194,7 @@ _readSubqueryScan(void)
21942194
ReadCommonScan(&local_node->scan);
21952195

21962196
READ_NODE_FIELD(subplan);
2197+
READ_ENUM_FIELD(scanstatus, SubqueryScanStatus);
21972198

21982199
READ_DONE();
21992200
}

src/backend/optimizer/plan/createplan.c

+44-13
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals);
8282
static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
8383
List *gating_quals);
8484
static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
85-
static bool is_async_capable_path(Path *path);
85+
static bool mark_async_capable_plan(Plan *plan, Path *path);
8686
static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
8787
int flags);
8888
static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
@@ -1110,28 +1110,58 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
11101110
}
11111111

11121112
/*
1113-
* is_async_capable_path
1114-
* Check whether a given Path node is async-capable.
1113+
* mark_async_capable_plan
1114+
* Check whether a given Path node is async-capable, and if so, mark the
1115+
* Plan node created from it as such and return true, otherwise return
1116+
* false.
11151117
*/
11161118
static bool
1117-
is_async_capable_path(Path *path)
1119+
mark_async_capable_plan(Plan *plan, Path *path)
11181120
{
11191121
switch (nodeTag(path))
11201122
{
1123+
case T_SubqueryScanPath:
1124+
{
1125+
SubqueryScan *scan_plan = (SubqueryScan *) plan;
1126+
1127+
/*
1128+
* If a SubqueryScan node atop of an async-capable plan node
1129+
* is deletable, consider it as async-capable.
1130+
*/
1131+
if (trivial_subqueryscan(scan_plan) &&
1132+
mark_async_capable_plan(scan_plan->subplan,
1133+
((SubqueryScanPath *) path)->subpath))
1134+
break;
1135+
return false;
1136+
}
11211137
case T_ForeignPath:
11221138
{
11231139
FdwRoutine *fdwroutine = path->parent->fdwroutine;
11241140

11251141
Assert(fdwroutine != NULL);
11261142
if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
11271143
fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
1128-
return true;
1144+
break;
1145+
return false;
11291146
}
1130-
break;
1147+
case T_ProjectionPath:
1148+
1149+
/*
1150+
* If the generated plan node doesn't include a Result node,
1151+
* consider it as async-capable if the subpath is async-capable.
1152+
*/
1153+
if (!IsA(plan, Result) &&
1154+
mark_async_capable_plan(plan,
1155+
((ProjectionPath *) path)->subpath))
1156+
return true;
1157+
return false;
11311158
default:
1132-
break;
1159+
return false;
11331160
}
1134-
return false;
1161+
1162+
plan->async_capable = true;
1163+
1164+
return true;
11351165
}
11361166

11371167
/*
@@ -1294,14 +1324,14 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
12941324
}
12951325
}
12961326

1297-
subplans = lappend(subplans, subplan);
1298-
1299-
/* Check to see if subplan can be executed asynchronously */
1300-
if (consider_async && is_async_capable_path(subpath))
1327+
/* If needed, check to see if subplan can be executed asynchronously */
1328+
if (consider_async && mark_async_capable_plan(subplan, subpath))
13011329
{
1302-
subplan->async_capable = true;
1330+
Assert(subplan->async_capable);
13031331
++nasyncplans;
13041332
}
1333+
1334+
subplans = lappend(subplans, subplan);
13051335
}
13061336

13071337
/*
@@ -5598,6 +5628,7 @@ make_subqueryscan(List *qptlist,
55985628
plan->righttree = NULL;
55995629
node->scan.scanrelid = scanrelid;
56005630
node->subplan = subplan;
5631+
node->scanstatus = SUBQUERY_SCAN_UNKNOWN;
56015632

56025633
return node;
56035634
}

0 commit comments

Comments
 (0)