Skip to content

Commit 3b9ebfd

Browse files
committed
Raw commit
1 parent 8491aa4 commit 3b9ebfd

File tree

4 files changed

+198
-46
lines changed

4 files changed

+198
-46
lines changed

contrib/pg_exchange/exchange.c

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,18 @@ set_exchange_altrel(ExchangeMode mode, ExchangePath *path, RelOptInfo *outerrel,
263263
RelOptInfo *rel = &path->altrel;
264264

265265
Assert(rel && (outerrel || innerrel));
266-
Assert(!bms_is_empty(servers));
266+
Assert(!bms_is_empty(servers) || mode != EXCH_SHUFFLE);
267267

268-
rel->nparts = bms_num_members(servers);
269-
rel->part_rels = palloc(sizeof(RelOptInfo *) * rel->nparts);
270-
for (i = 0; i < rel->nparts; i++)
268+
if (!bms_is_empty(servers))
271269
{
272-
sid = bms_next_member(servers, sid);
273-
rel->part_rels[i] = palloc0(sizeof(RelOptInfo));
274-
rel->part_rels[i]->serverid = (Oid) sid;
270+
rel->nparts = bms_num_members(servers);
271+
rel->part_rels = palloc(sizeof(RelOptInfo *) * rel->nparts);
272+
for (i = 0; i < rel->nparts; i++)
273+
{
274+
sid = bms_next_member(servers, sid);
275+
rel->part_rels[i] = palloc0(sizeof(RelOptInfo));
276+
rel->part_rels[i]->serverid = (Oid) sid;
277+
}
275278
}
276279

277280
switch (mode)
@@ -399,6 +402,82 @@ make_local_scan_path(Path *localPath, RelOptInfo *rel,
399402
return pathnode;
400403
}
401404

405+
static Path *
406+
foreign_to_seqscan(PlannerInfo *root, RelOptInfo *rel, ForeignPath *fpath)
407+
{
408+
Path *seqScan = makeNode(Path);
409+
Cost startup_cost = 0;
410+
Cost cpu_run_cost;
411+
412+
seqScan->pathtype = T_SeqScan;
413+
seqScan->parallel_aware = fpath->path.parallel_aware;
414+
seqScan->parallel_safe = fpath->path.parallel_safe;
415+
seqScan->parallel_workers = fpath->path.parallel_workers;
416+
seqScan->param_info = get_baserel_parampathinfo(root, rel, rel->lateral_relids);
417+
418+
seqScan->parent = fpath->path.parent;
419+
seqScan->pathkeys = fpath->path.pathkeys;
420+
seqScan->pathtarget = fpath->path.pathtarget;
421+
seqScan->rows = fpath->path.rows;
422+
seqScan->startup_cost = fpath->path.startup_cost;
423+
seqScan->total_cost = fpath->path.total_cost;
424+
425+
cpu_run_cost = cpu_tuple_cost * rel->tuples;
426+
cpu_run_cost += seqScan->pathtarget->cost.per_tuple * seqScan->rows;
427+
seqScan->startup_cost = startup_cost;
428+
/* TODO: Disk run costs and qual costs? */
429+
seqScan->total_cost = startup_cost + cpu_run_cost;
430+
431+
return seqScan;
432+
}
433+
434+
static void
435+
add_trivial_distributed_path(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte)
436+
{
437+
ListCell *lc;
438+
439+
foreach(lc, rel->pathlist)
440+
{
441+
Path *path = (Path *) lfirst(lc);
442+
Bitmapset *servers = NULL;
443+
IndexOptInfo *indexinfo = NULL;
444+
445+
switch (nodeTag(path))
446+
{
447+
case T_Path:
448+
case T_BitmapHeapPath:
449+
case T_IndexPath:
450+
case T_TidPath:
451+
case T_SubqueryScanPath:
452+
break;
453+
454+
case T_ForeignPath:
455+
servers = bms_add_member(servers, (int) path->parent->serverid);
456+
path = foreign_to_seqscan(root, rel, (ForeignPath *) path);
457+
break;
458+
459+
default:
460+
Assert(0);
461+
elog(FATAL, "Unexpected path node: %d", nodeTag(path));
462+
}
463+
path = (Path *) create_exchange_path(root, rel, (Path *) path,
464+
EXCH_GATHER);
465+
set_exchange_altrel(EXCH_GATHER, (ExchangePath *) path, rel, NULL, NULL,
466+
servers);
467+
if (indexinfo)
468+
{
469+
List **private;
470+
471+
private = &((ExchangePath *) path)->cp.custom_private;
472+
*private = lappend(*private, indexinfo);
473+
}
474+
475+
path = (Path *) create_distexec_path(root, rel, path, servers);
476+
477+
force_add_path(rel, path);
478+
}
479+
}
480+
402481
/*
403482
* Add one path for a base relation target: replace all ForeignScan nodes by
404483
* local Scan nodes.
@@ -415,8 +494,11 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
415494
ListCell *lc;
416495

417496
if (!rte->inh)
497+
{
418498
/* Relation is not contain any partitions. */
499+
// add_trivial_distributed_path(root, rel, rti, rte);
419500
return;
501+
}
420502

421503
/* Traverse all possible paths and search for APPEND */
422504
foreach(lc, rel->pathlist)
@@ -427,7 +509,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
427509
ListCell *lc1;
428510
Bitmapset *servers = NULL;
429511
List *subpaths = NIL;
430-
List *append_paths;
512+
List **append_paths;
431513
IndexOptInfo *indexinfo = NULL;
432514

433515
/*
@@ -437,10 +519,10 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
437519
switch (nodeTag(path))
438520
{
439521
case T_AppendPath:
440-
append_paths = ((AppendPath *) path)->subpaths;
522+
append_paths = &((AppendPath *) path)->subpaths;
441523
break;
442524
case T_MergeAppendPath:
443-
append_paths = ((MergeAppendPath *) path)->subpaths;
525+
append_paths = &((MergeAppendPath *) path)->subpaths;
444526
break;
445527
default:
446528
elog(FATAL, "Unexpected node type %d, pathtype %d", path->type,
@@ -452,7 +534,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
452534
* at the same node has the only strategy. It is caused by symmetry of
453535
* data placement.
454536
*/
455-
for (lc1 = list_head(append_paths); lc1 != NULL; lc1 = lnext(lc1))
537+
for (lc1 = list_head(*append_paths); lc1 != NULL; lc1 = lnext(lc1))
456538
{
457539
Path *subpath = (Path *) lfirst(lc1);
458540

@@ -472,7 +554,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
472554
/*
473555
* Traverse all APPEND subpaths. Form new path list.
474556
*/
475-
foreach(lc1, append_paths)
557+
foreach(lc1, *append_paths)
476558
{
477559
Path *subpath = (Path *) lfirst(lc1);
478560
Path *tmpPath = NULL;

contrib/pg_exchange/exchange.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
#define cstmSubPath1(customPath) (Path *) linitial(((CustomPath *) \
4040
customPath)->custom_paths)
4141

42+
#define cstmSubPlan1(custom) ((Plan *) linitial(((CustomScan *) \
43+
custom)->custom_plans))
44+
4245
typedef enum ExchangeMode
4346
{
4447
EXCH_GATHER,

contrib/pg_exchange/hooks.c

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,25 @@ reset_cheapest(RelOptInfo *rel)
8888
set_cheapest(rel);
8989
}
9090

91+
static bool
92+
contain_distributed_paths(List *pathlist)
93+
{
94+
ListCell *lc;
95+
96+
foreach(lc, pathlist)
97+
{
98+
Path *path = lfirst(lc);
99+
100+
if (IsDistExecNode(path))
101+
return true;
102+
}
103+
return false;
104+
}
105+
106+
/*
107+
* TODO: We need routine cost_recalculate() that will be walk across path
108+
* and call cost function at each node from leaf to the root of path tree.
109+
*/
91110
static List *
92111
create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
93112
RelOptInfo *outerrel, RelOptInfo *innerrel,
@@ -99,13 +118,18 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
99118
List *prev_outer_pathlist;
100119
List *prev_join_pathlist;
101120
List *dist_paths = NIL;
121+
bool distributedOuter;
122+
bool distributedInner;
102123

103124
/* Save old pathlists. */
104125
prev_inner_pathlist = innerrel->pathlist;
105126
prev_outer_pathlist = outerrel->pathlist;
106127
prev_join_pathlist = joinrel->pathlist;
107128
innerrel->pathlist = outerrel->pathlist = joinrel->pathlist = NIL;
108129

130+
distributedOuter = contain_distributed_paths(prev_outer_pathlist);
131+
distributedInner = contain_distributed_paths(prev_inner_pathlist);
132+
109133
foreach(lc, prev_inner_pathlist)
110134
{
111135
Path *innpath = lfirst(lc);
@@ -114,20 +138,26 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
114138
ListCell *olc;
115139
ExchangePath *gather;
116140

117-
/* Use only distributed paths */
118-
if (!IsDistExecNode(innpath))
119-
continue;
120-
121-
inner_servers = extractForeignServers((CustomPath *) innpath);
122-
inn_child = (ExchangePath *) cstmSubPath1(innpath);
123-
Assert(inn_child->mode == EXCH_GATHER);
124-
if (IsExchangeNode(inn_child))
125-
inn_child = create_exchange_path(root, innerrel,
126-
cstmSubPath1(inn_child),
127-
inner_mode);
141+
if (IsDistExecNode(innpath))
142+
{
143+
inner_servers = extractForeignServers((CustomPath *) innpath);
144+
inn_child = (ExchangePath *) cstmSubPath1(innpath);
145+
Assert(inn_child->mode == EXCH_GATHER);
146+
if (IsExchangeNode(inn_child))
147+
inn_child = create_exchange_path(root, innerrel,
148+
cstmSubPath1(inn_child), inner_mode);
149+
else
150+
inn_child = create_exchange_path(root, innerrel,
151+
(Path *) inn_child, inner_mode);
152+
}
153+
else if (!distributedInner && distributedOuter)
154+
{
155+
/* The case of JOIN partitioned and simple relation */
156+
inn_child = create_exchange_path(root, innerrel, innpath, EXCH_GATHER);
157+
}
128158
else
129-
inn_child = create_exchange_path(root, innerrel, (Path *) inn_child,
130-
inner_mode);
159+
/* Use only distributed paths */
160+
continue;
131161

132162
innerrel->pathlist = lappend(innerrel->pathlist, inn_child);
133163
Assert(list_length(innerrel->pathlist) == 1);
@@ -141,20 +171,33 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
141171
bool res;
142172

143173
/* Use only distributed paths */
144-
if (!IsDistExecNode(outpath))
145-
continue;
146-
147-
outer_servers = extractForeignServers((CustomPath *) outpath);
148-
out_child = (ExchangePath *) cstmSubPath1(outpath);
149-
Assert(out_child->mode == EXCH_GATHER);
150-
if (IsExchangeNode(out_child))
151-
out_child = create_exchange_path(root, outerrel,
152-
cstmSubPath1(out_child),
153-
outer_mode);
174+
if (IsDistExecNode(outpath))
175+
{
176+
outer_servers = extractForeignServers((CustomPath *) outpath);
177+
if (!outer_servers && !outer_servers &&
178+
inner_mode ==EXCH_SHUFFLE && outer_mode ==EXCH_SHUFFLE)
179+
continue;
180+
181+
out_child = (ExchangePath *) cstmSubPath1(outpath);
182+
Assert(out_child->mode == EXCH_GATHER);
183+
if (IsExchangeNode(out_child))
184+
out_child = create_exchange_path(root, outerrel,
185+
cstmSubPath1(out_child), outer_mode);
186+
else
187+
out_child = create_exchange_path(root, outerrel,
188+
(Path *) out_child, inner_mode);
189+
if (!distributedInner)
190+
set_exchange_altrel(EXCH_GATHER, inn_child, innerrel, NULL,
191+
NULL, outer_servers);
192+
}
193+
else if (distributedInner && !distributedOuter)
194+
{
195+
out_child = create_exchange_path(root, outerrel, outpath, EXCH_GATHER);
196+
set_exchange_altrel(EXCH_GATHER, out_child, outerrel, NULL,
197+
NULL, inner_servers);
198+
}
154199
else
155-
out_child = create_exchange_path(root, outerrel,
156-
(Path *) out_child,
157-
inner_mode);
200+
continue;
158201

159202
if (inner_mode == EXCH_SHUFFLE)
160203
{
@@ -204,16 +247,18 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
204247
{
205248
JoinPath *jp = (JoinPath *) path;
206249

250+
set_exchange_altrel(EXCH_BROADCAST, out_child, NULL,
251+
&((ExchangePath *)cstmSubPath1(out_child))->altrel,
252+
NIL, bms_union(inner_servers, outer_servers));
253+
inn_child->mode = EXCH_STEALTH;
254+
cost_exchange(root, outerrel, out_child);
255+
cost_exchange(root, innerrel, inn_child);
256+
207257
if (jp->innerjoinpath->pathtype != T_Material)
208258
jp->innerjoinpath = (Path *) create_material_path(innerrel,
209259
jp->innerjoinpath);
210260
Assert(jp->innerjoinpath->pathtype == T_Material);
211-
212-
set_exchange_altrel(EXCH_BROADCAST, out_child, NULL,
213-
&((ExchangePath *)cstmSubPath1(out_child))->altrel,
214-
NIL, bms_union(inner_servers, outer_servers));
215261
cost_exchange(root, joinrel, out_child);
216-
inn_child->mode = EXCH_STEALTH;
217262
}
218263
else if (inner_mode == EXCH_SHUFFLE)
219264
{

contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -572,12 +572,26 @@ localize_plan(Plan *node, lcontext *context)
572572
if (IsExchangePlanNode(node))
573573
{
574574
List *private = ((CustomScan *) node)->custom_private;
575-
575+
elog(LOG, "LOCALIZE: exchange");
576576
if (lnext(lnext(list_head(private))))
577577
context->indexinfo = (IndexOptInfo *) lthird(private);
578578
}
579579

580+
context->foreign_scans = NIL;
580581
plan_tree_walker(node, localize_plan, context);
582+
if (context->foreign_scans != NIL)
583+
{
584+
CustomScan *css = (CustomScan *) node;
585+
// Index scanrelid = ((Scan *) cstmSubPlan1(node))->scanrelid;
586+
587+
Assert(list_length(context->foreign_scans) == 1);
588+
css->custom_plans = list_delete_ptr(css->custom_plans,
589+
cstmSubPlan1(node));
590+
css->custom_plans = lappend(css->custom_plans,
591+
make_dummyscan(0));
592+
list_free(context->foreign_scans);
593+
context->foreign_scans = NIL;
594+
}
581595
context->indexinfo = NULL;
582596
break;
583597

@@ -602,6 +616,9 @@ localize_plan(Plan *node, lcontext *context)
602616
*plans = list_delete_ptr(*plans, lfirst(lc));
603617
*plans = lappend(*plans, make_dummyscan(scanrelid));
604618
}
619+
620+
list_free(context->foreign_scans);
621+
context->foreign_scans = NIL;
605622
}
606623
break;
607624

@@ -616,8 +633,8 @@ localize_plan(Plan *node, lcontext *context)
616633
Oid reloid;
617634

618635
reloid = getrelid(scan->scanrelid, context->pstmt->rtable);
619-
rel = heap_open(reloid, NoLock);
620-
if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
636+
rel = try_relation_open(reloid, NoLock);
637+
if (rel && rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
621638
{
622639
Oid serverid;
623640

@@ -628,6 +645,11 @@ localize_plan(Plan *node, lcontext *context)
628645
relation_close(rel, NoLock);
629646
break;
630647
}
648+
else if (!rel)
649+
{
650+
context->foreign_scans = lappend(context->foreign_scans, node);
651+
break;
652+
}
631653

632654
/* Need to localize scan */
633655
if (IsA(node, IndexScan) || IsA(node, IndexOnlyScan) ||

0 commit comments

Comments
 (0)