Skip to content

Commit 72ba986

Browse files
committed
JOIN cascades support.
I tested this on the simple queries: 1. select * from rt, pt, st WHERE rt.id=pt.id and pt.id=st.id; 2. select * from rt, pt, st WHERE rt.id=pt.id and pt.id=st.payload;
1 parent 8af4fe0 commit 72ba986

File tree

10 files changed

+170
-60
lines changed

10 files changed

+170
-60
lines changed

contrib/pg_exchange/exchange.c

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
210210
ListCell *lc;
211211

212212
if (!rte->inh)
213-
/*
214-
* Relation is not contain any partitions.
215-
*/
213+
/* Relation is not contain any partitions. */
216214
return;
217215

218216
/* Traverse all possible paths and search for APPEND */
@@ -223,16 +221,31 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
223221
AppendPath *appendPath = NULL;
224222
ListCell *lc1;
225223
Bitmapset *servers = NULL;
226-
List *subpaths = NIL;
224+
List *subpaths = NIL;
225+
List *append_paths;
227226

228-
if (path->pathtype != T_Append)
229-
continue;
227+
/*
228+
* In the case of partitioned relation all paths will be ended by Append
229+
* or MergeAppend path node.
230+
*/
231+
switch (path->pathtype)
232+
{
233+
case T_Append:
234+
append_paths = ((AppendPath *) path)->subpaths;
235+
break;
236+
case T_MergeAppend:
237+
append_paths = ((MergeAppendPath *) path)->subpaths;
238+
break;
239+
default:
240+
elog(FATAL, "Unexpected node type %d, pathtype %d", path->type,
241+
path->pathtype);
242+
}
230243

231244
/*
232245
* Traverse all APPEND subpaths, check for scan-type and search for
233246
* foreign scans
234247
*/
235-
foreach(lc1, ((AppendPath *) path)->subpaths)
248+
foreach(lc1, append_paths)
236249
{
237250
Path *subpath = (Path *) lfirst(lc1);
238251
Path *tmpPath;
@@ -279,6 +292,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
279292
((AppendPath *) path)->partitioned_rels, -1);
280293
path = (Path *) create_exchange_path(root, rel, (Path *) appendPath, true);
281294
path = create_distexec_path(root, rel, path, servers);
295+
// elog(LOG, "Path added");
282296
add_path(rel, path);
283297
}
284298
}
@@ -740,6 +754,8 @@ EXCHANGE_Execute(CustomScanState *node)
740754
return slot;
741755
case 1:
742756
state->activeRemotes--;
757+
elog(LOG, "[%s] GOT NULL. activeRemotes: %d", state->stream,
758+
state->activeRemotes);
743759
break;
744760
case 2: /* Close EXCHANGE channel */
745761
break;

contrib/pg_exchange/hooks.c

Lines changed: 103 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
static set_rel_pathlist_hook_type prev_set_rel_pathlist_hook = NULL;
2424
static shmem_startup_hook_type PreviousShmemStartupHook = NULL;
2525
static set_join_pathlist_hook_type prev_set_join_pathlist_hook = NULL;
26-
static void second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel,
26+
static void second_stage_paths(PlannerInfo *root, RelOptInfo *joinrel,
2727
RelOptInfo *outerrel, RelOptInfo *innerrel,
2828
JoinType jointype, JoinPathExtraData *extra);
2929

@@ -95,15 +95,56 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
9595
JoinPathExtraData *extra)
9696
{
9797
ListCell *lc;
98-
List *firstStagePaths = NIL; /* Trivial paths, made with exchange */
98+
int i=0;
99+
List *newpaths = NIL;
100+
List *delpaths = NIL;
101+
List *rpaths = NIL;
102+
int distpaths = 0;
99103

100104
if (prev_set_join_pathlist_hook)
101105
prev_set_join_pathlist_hook(root, joinrel, outerrel, innerrel,
102106
jointype, extra);
103107

104108
/*
105-
* At first, traverse all paths and search for the case with Exchanges at
106-
* the left or right subtree. We need to delete DistPlanExec nodes and
109+
* Search for distributed paths for children relations.
110+
*/
111+
foreach(lc, innerrel->pathlist)
112+
{
113+
Path *path = lfirst(lc);
114+
115+
if (IsDistExecNode(path))
116+
distpaths++;
117+
}
118+
119+
if (distpaths == 0)
120+
/* Distributed query execution does not needed. */
121+
return;
122+
123+
/*
124+
* Force try to add hash joins into the pathlist. Collect any paths that
125+
* do not satisfy the exchange rules and delete it.
126+
*/
127+
foreach(lc, joinrel->pathlist)
128+
{
129+
Path *path = lfirst(lc);
130+
if ((path->pathtype != T_HashJoin) && !IsDistExecNode(path))
131+
rpaths = lappend(rpaths, path);
132+
}
133+
foreach(lc, rpaths)
134+
joinrel->pathlist = list_delete_ptr(joinrel->pathlist, lfirst(lc));
135+
136+
/*
137+
* Try to create hash join path.
138+
* XXX: Here we have a problem of path additions: hash join may be not added
139+
* if it is not cheap. But cost of the this path includes costs of child paths.
140+
* Child paths will be tuned below and costs will be changed too.
141+
*/
142+
hash_inner_and_outer(root, joinrel, outerrel, innerrel, jointype, extra);
143+
Assert(list_length(joinrel->pathlist) > 0);
144+
145+
/*
146+
* Traverse all paths and search for the case with EXCHANGE nodes
147+
* at the left or right subtree. We need to delete DistPlanExec nodes and
107148
* insert only one at the head of join.
108149
*/
109150
foreach(lc, joinrel->pathlist)
@@ -115,9 +156,15 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
115156
CustomPath *sub;
116157
Path *path = lfirst(lc);
117158

118-
if ((path->pathtype != T_NestLoop) &&
119-
(path->pathtype != T_MergeJoin) &&
120-
(path->pathtype != T_HashJoin))
159+
/*
160+
* NestLoop and MergeJoin need to change EXCHANGE node logic and
161+
* disabled for now.
162+
* For this we need to introduce remote PUSH or PULL operation for
163+
* force transfer tuples from instance to instance.
164+
*/
165+
Assert(path->pathtype != T_NestLoop && path->pathtype != T_MergeJoin);
166+
167+
if (path->pathtype != T_HashJoin)
121168
continue;
122169

123170
jp = (JoinPath *) path;
@@ -128,9 +175,7 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
128175
* If inner path contains DistExec node - save its servers list and
129176
* delete it from the path.
130177
*/
131-
if ((inner->pathtype == T_CustomScan) &&
132-
(strcmp(((CustomPath *) inner)->methods->CustomName,
133-
DISTEXECPATHNAME) == 0))
178+
if (IsDistExecNode(inner))
134179
{
135180
ListCell *lc;
136181

@@ -145,15 +190,19 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
145190
}
146191
Assert(list_length(sub->custom_paths) == 1);
147192
jp->innerjoinpath = (Path *) linitial(sub->custom_paths);
193+
} else
194+
{
195+
elog(LOG, "inner Path can't contain any FE nodes. JT=%d (innt=%d patht=%d), (outt=%d patht=%d) %d",
196+
jp->jointype, inner->type, inner->pathtype,
197+
outer->type, outer->pathtype, i++);
198+
148199
}
149200

150201
/*
151202
* If outer path contains DistExec node - save its servers list and
152203
* delete it from the path.
153204
*/
154-
if ((outer->pathtype == T_CustomScan) &&
155-
(strcmp(((CustomPath *) outer)->methods->CustomName,
156-
DISTEXECPATHNAME) == 0))
205+
if (IsDistExecNode(outer))
157206
{
158207
ListCell *lc;
159208

@@ -177,25 +226,29 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
177226
path = create_distexec_path(root, joinrel,
178227
(Path *) copy_join_pathnode(jp),
179228
servers);
180-
add_path(joinrel, path);
229+
newpaths = lappend(newpaths, path);
230+
delpaths = lappend(delpaths, jp);
231+
}
181232

182-
/*
183-
* We need guarantee, that previous JOIN path was deleted. It was
184-
* incorrect.
185-
*/
186-
list_delete_ptr(joinrel->pathlist, jp);
233+
/*
234+
* We need to guarantee, that previous JOIN path was deleted from the path
235+
* list. It was incorrect.
236+
*/
237+
foreach(lc, delpaths)
238+
{
239+
Path *path = lfirst(lc);
240+
joinrel->pathlist = list_delete_ptr(joinrel->pathlist, path);
241+
}
187242

188-
/* Save link to the path for future works. */
189-
firstStagePaths = lappend(firstStagePaths, path);
243+
foreach(lc, newpaths)
244+
{
245+
Path *path = lfirst(lc);
246+
add_path(joinrel, path);
190247
}
191248

192-
second_stage_paths(root, firstStagePaths, joinrel, outerrel, innerrel, jointype,
193-
extra);
249+
second_stage_paths(root, joinrel, outerrel, innerrel, jointype, extra);
194250
}
195251

196-
#define IsDistExecNode(pathnode) ((pathnode->path.pathtype == T_CustomScan) && \
197-
(strcmp(((CustomPath *)pathnode)->methods->CustomName, DISTEXECPATHNAME) == 0))
198-
199252
static CustomPath *
200253
duplicate_join_path(CustomPath *distExecPath)
201254
{
@@ -321,10 +374,7 @@ arrange_partitioning_attrs(RelOptInfo *rel1,
321374
part_scheme->partnatts++;
322375
ReleaseSysCache(opclasstup);
323376
}
324-
//elog(INFO, "arrange_partitioning_attrs: ");
325-
//elog(INFO, "->1: %s ", nodeToString(rel1->partexprs[0]));
326-
//elog(INFO, "->2: %s ", nodeToString(rel2->partexprs[0]));
327-
//elog(INFO, "restrictlist: %s", nodeToString(restrictlist));
377+
328378
/* Now we use hash partition only */
329379
Assert((rel1->part_scheme->strategy == PARTITION_STRATEGY_HASH) &&
330380
(rel1->part_scheme->strategy == rel2->part_scheme->strategy));
@@ -394,24 +444,38 @@ arrange_partitions(RelOptInfo *rel1,
394444
* Add Paths same as the case of partitionwise join.
395445
*/
396446
static void
397-
second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel, RelOptInfo *outerrel,
447+
second_stage_paths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
398448
RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra)
399449
{
400450
ListCell *lc;
401451

402-
if (list_length(firstStagePaths) == 0)
403-
return;
452+
elog(LOG, "List length=%d", list_length(joinrel->pathlist));
453+
foreach(lc, joinrel->pathlist)
454+
{
455+
Path *path = lfirst(lc);
456+
elog(LOG, "[%d] BEFORE SECOND STAGE: type=%d pathtype=%d",
457+
list_length(joinrel->pathlist), path->type, path->pathtype);
458+
if (path->type > 300)
459+
Assert(0);
460+
}
404461

405-
foreach(lc, firstStagePaths)
462+
foreach(lc, joinrel->pathlist)
406463
{
407-
CustomPath *path = (CustomPath *) lfirst(lc);
464+
Path *pathhead = (Path *) lfirst(lc);
465+
CustomPath *path;
408466
JoinPath *jp;
409467
ExchangePath *innerex;
410468
ExchangePath *outerex;
411469
ExchangePath *expath;
412470
int i;
413471

414-
Assert(IsDistExecNode(path));
472+
if (!IsDistExecNode(pathhead))
473+
{
474+
elog(LOG, "NO second_stage_paths. type=%d, pathtype=%d", pathhead->type, pathhead->pathtype);
475+
continue;
476+
}
477+
478+
path = (CustomPath *) pathhead;
415479

416480
/*
417481
* Add gather-type EXCHANGE node into the head of the path.
@@ -420,7 +484,9 @@ second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel
420484
if (!IsA(((Path *) linitial(path->custom_paths)), CustomScan))
421485
{
422486
jp = (JoinPath *) linitial(path->custom_paths);
423-
Assert(jp->path.pathtype == T_HashJoin);
487+
488+
if (jp->path.pathtype != T_HashJoin)
489+
continue;
424490
expath = create_exchange_path(root, joinrel, (Path *) jp, GATHER_MODE);
425491
path->custom_paths = list_delete(path->custom_paths, jp);
426492
path->custom_paths = lappend(path->custom_paths, expath);
@@ -433,7 +499,6 @@ second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel
433499
&innerex->altrel, extra->restrictlist, jointype))
434500
{
435501
/* Simple case like foreign-push-join case. */
436-
// elog(INFO, "--- MAKE SIMPLE PATH ---");
437502
innerex->mode = STEALTH_MODE;
438503
outerex->mode = STEALTH_MODE;
439504
}
@@ -442,7 +507,6 @@ second_stage_paths(PlannerInfo *root, List *firstStagePaths, RelOptInfo *joinrel
442507
CustomPath *newpath;
443508
bool res;
444509

445-
// elog(INFO, "--- MAKE SHUFFLE PATH ---");
446510
/* Get a copy of the simple path */
447511
newpath = duplicate_join_path(path);
448512
set_path_pointers(newpath, &jp, &expath, &outerex, &innerex);

contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ EstablishDMQConnections(const lcontext *context, const char *serverName, PlanSta
238238
sprintf(connstr, "host=%s port=%d "
239239
"fallback_application_name=%s",
240240
host, port, senderName);
241-
elog(LOG, "Add destination: senderName=%s, receiverName=%s, connstr=%s", senderName, receiverName, connstr);
241+
// elog(LOG, "Add destination: senderName=%s, receiverName=%s, connstr=%s", senderName, receiverName, connstr);
242242
sub->dest_id = dmq_destination_add(connstr, senderName, receiverName, 10);
243243
memcpy(sub->node, receiverName, strlen(receiverName) + 1);
244244
}
@@ -337,7 +337,7 @@ ExecEndDistPlanExec(CustomScanState *node)
337337
PGresult *result;
338338

339339
while ((result = PQgetResult(dpe->conn[i])) != NULL);
340-
elog(LOG, "ExecEndDistPlanExec: %d", PQresultStatus(result));
340+
// elog(LOG, "ExecEndDistPlanExec: %d", PQresultStatus(result));
341341
}
342342
if (dpe->conn)
343343
pfree(dpe->conn);

contrib/pg_exchange/nodeDistPlanExec.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ typedef struct
2828
extern char destsName[10];
2929
#define DISTEXECPATHNAME "DistExecPath"
3030

31+
#define IsDistExecNode(pathnode) ((((Path *) pathnode)->pathtype == T_CustomScan) && \
32+
(strcmp(((CustomPath *)pathnode)->methods->CustomName, DISTEXECPATHNAME) == 0))
33+
3134
extern void DistExec_Init_methods(void);
3235
extern CustomScan *make_distplanexec(List *custom_plans, List *tlist, List *private_data);
3336
extern Path *create_distexec_path(PlannerInfo *root, RelOptInfo *rel,

contrib/pg_execplan/tests/init_node0.sql

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ CREATE TABLE rt_0 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 0);
2525
CREATE FOREIGN TABLE rt_1 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 1) SERVER remote1;
2626
CREATE FOREIGN TABLE rt_2 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 2) SERVER remote2;
2727

28+
DROP TABLE IF EXISTS st cascade;
29+
CREATE TABLE st (
30+
id integer not null,
31+
payload integer,
32+
test integer
33+
) PARTITION BY hash (id);
34+
35+
CREATE TABLE st_0 PARTITION OF st FOR VALUES WITH (modulus 3, remainder 0);
36+
CREATE FOREIGN TABLE st_1 PARTITION OF st FOR VALUES WITH (modulus 3, remainder 1) SERVER remote1;
37+
CREATE FOREIGN TABLE st_2 PARTITION OF st FOR VALUES WITH (modulus 3, remainder 2) SERVER remote2;
38+
2839

2940
-- For local tests
3041
CREATE TABLE a (

contrib/pg_execplan/tests/init_node1.sql

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,14 @@ CREATE TABLE rt (
2424
CREATE FOREIGN TABLE rt_0 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 0) SERVER remote1;
2525
CREATE TABLE rt_1 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 1);
2626
CREATE FOREIGN TABLE rt_2 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 2) SERVER remote2;
27+
28+
DROP TABLE IF EXISTS st cascade;
29+
CREATE TABLE st (
30+
id integer not null,
31+
payload integer,
32+
test integer
33+
) PARTITION BY hash (id);
34+
35+
CREATE FOREIGN TABLE st_0 PARTITION OF st FOR VALUES WITH (modulus 3, remainder 0) SERVER remote1;
36+
CREATE TABLE st_1 PARTITION OF st FOR VALUES WITH (modulus 3, remainder 1);
37+
CREATE FOREIGN TABLE st_2 PARTITION OF st FOR VALUES WITH (modulus 3, remainder 2) SERVER remote2;

contrib/pg_execplan/tests/init_node2.sql

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,14 @@ CREATE TABLE rt (
2424
CREATE FOREIGN TABLE rt_0 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 0) SERVER remote1;
2525
CREATE FOREIGN TABLE rt_1 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 1) SERVER remote2;
2626
CREATE TABLE rt_2 PARTITION OF rt FOR VALUES WITH (modulus 3, remainder 2);
27+
28+
DROP TABLE IF EXISTS st cascade;
29+
CREATE TABLE st (
30+
id integer not null,
31+
payload integer,
32+
test integer
33+
) PARTITION BY hash (id);
34+
35+
CREATE FOREIGN TABLE st_0 PARTITION OF st FOR VALUES WITH (modulus 3, remainder 0) SERVER remote1;
36+
CREATE FOREIGN TABLE st_1 PARTITION OF st FOR VALUES WITH (modulus 3, remainder 1) SERVER remote2;
37+
CREATE TABLE st_2 PARTITION OF st FOR VALUES WITH (modulus 3, remainder 2);

0 commit comments

Comments
 (0)