Skip to content

Commit 7448baa

Browse files
committed
Cost model for EXCHANGE and DistExec nodes.
1 parent 7469e15 commit 7448baa

File tree

4 files changed

+61
-86
lines changed

4 files changed

+61
-86
lines changed

contrib/pg_exchange/exchange.c

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,13 @@
3737
#include "partutils.h"
3838
#include "stream.h"
3939

40-
#define DEFAULT_EXCHANGE_STARTUP_COST 100.0
40+
/*
41+
* Startup cost of EXCHANGE node is smaller, than DistExec node, because cost
42+
* of DistExec node contains cost of query transfer and localization at each
43+
* instance. Startup cost of EXCHANGE node includes only connection channel
44+
* establishing between instances.
45+
*/
46+
#define DEFAULT_EXCHANGE_STARTUP_COST 10.0
4147
#define DEFAULT_TRANSFER_TUPLE_COST 0.01
4248

4349

@@ -111,6 +117,7 @@ static void create_gather_dfn(EPPNode *epp, RelOptInfo *rel);
111117
static void create_stealth_dfn(EPPNode *epp, RelOptInfo *rel, PlannerInfo *root);
112118
static void create_shuffle_dfn(EPPNode *epp, RelOptInfo *rel, PlannerInfo *root);
113119
static void create_broadcast_dfn(EPPNode *epp, RelOptInfo *rel, PlannerInfo *root);
120+
static void force_add_path(RelOptInfo *rel, Path *path);
114121

115122

116123
/*
@@ -400,42 +407,52 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
400407
set_exchange_altrel(EXCH_GATHER, (ExchangePath *) path, rel, NULL, NULL,
401408
servers);
402409
path = (Path *) create_distexec_path(root, rel, path, servers);
403-
add_path(rel, path);
410+
411+
force_add_path(rel, path);
404412
}
405413
}
406414

415+
static void
416+
force_add_path(RelOptInfo *rel, Path *path)
417+
{
418+
List *pathlist = rel->pathlist;
419+
420+
rel->pathlist = NIL;
421+
rel->cheapest_parameterized_paths = NIL;
422+
rel->cheapest_startup_path = rel->cheapest_total_path =
423+
rel->cheapest_unique_path = NULL;
424+
add_path(rel, path);
425+
rel->pathlist = list_concat(rel->pathlist, pathlist);
426+
set_cheapest(rel);
427+
}
428+
407429
#include "optimizer/cost.h"
408430

409-
static void
431+
void
410432
cost_exchange(PlannerInfo *root, RelOptInfo *baserel, ExchangePath *expath)
411433
{
412434
Path *subpath;
413435

414-
if (baserel->pages == 0 && baserel->tuples == 0)
415-
{
416-
baserel->pages = 10;
417-
baserel->tuples =
418-
(10 * BLCKSZ) / (baserel->reltarget->width +
419-
MAXALIGN(SizeofHeapTupleHeader));
420-
}
421-
422436
/* Estimate baserel size as best we can with local statistics. */
423-
// set_baserel_size_estimates(root, baserel);
424437
subpath = cstmSubPath1(expath);
425-
expath->cp.path.rows = baserel->tuples;
426-
expath->cp.path.startup_cost = subpath->startup_cost;
438+
expath->cp.path.rows = subpath->rows;
439+
expath->cp.path.startup_cost = 0.;
440+
expath->cp.path.total_cost = subpath->total_cost;
441+
427442
switch (expath->mode)
428443
{
429444
case EXCH_GATHER:
430445
expath->cp.path.startup_cost += DEFAULT_EXCHANGE_STARTUP_COST;
431-
expath->cp.path.rows = subpath->rows * expath->altrel.nparts;
446+
expath->cp.path.total_cost += cpu_tuple_cost * expath->cp.path.rows;
432447
break;
433448
case EXCH_STEALTH:
434-
expath->cp.path.rows = subpath->rows;
449+
expath->cp.path.startup_cost = 0.;
450+
expath->cp.path.total_cost += cpu_tuple_cost * expath->cp.path.rows /
451+
expath->altrel.nparts;
435452
break;
436453
case EXCH_BROADCAST:
437454
expath->cp.path.startup_cost += DEFAULT_EXCHANGE_STARTUP_COST;
438-
expath->cp.path.rows = subpath->rows * expath->altrel.nparts;
455+
expath->cp.path.total_cost += cpu_tuple_cost * expath->cp.path.rows;
439456
break;
440457
case EXCH_SHUFFLE:
441458
{
@@ -446,15 +463,14 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, ExchangePath *expath)
446463
Path *path = &expath->cp.path;
447464

448465
path->startup_cost += DEFAULT_EXCHANGE_STARTUP_COST;
449-
path->total_cost += path->startup_cost;
450466

451467
/*
452-
* We count on perfect balance of tuple distribution:
453-
* If we have N instances, M tuples from subtree, than we send up to
454-
* local subtree M/N tuples, send to network [M-M/N] tuples and same to
468+
* We assume perfect balance of tuple distribution:
469+
* If we have N instances, M tuples from subtree, than we send up by
470+
* subtree M/N local tuples, send to network [M-M/N] tuples and same to
455471
* receive.
456472
*/
457-
path->rows = subpath->rows;
473+
path->rows /= expath->altrel.nparts;
458474
instances = expath->altrel.nparts > 0 ? expath->altrel.nparts : 2;
459475
send_rows = path->rows - (path->rows/instances);
460476
received_rows = send_rows;
@@ -466,8 +482,7 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, ExchangePath *expath)
466482
default:
467483
elog(FATAL, "Unknown EXCHANGE mode.");
468484
}
469-
470-
expath->cp.path.total_cost = 0.1;
485+
expath->cp.path.total_cost += expath->cp.path.startup_cost;
471486
}
472487

473488
/*
@@ -493,13 +508,6 @@ ExchangePlanCustomPath(PlannerInfo *root,
493508
exchange = make_exchange(custom_plans, tlist);
494509
private->node.extnodename = EXCHANGE_PRIVATE_NAME;
495510

496-
exchange->scan.plan.startup_cost = best_path->path.startup_cost;
497-
exchange->scan.plan.total_cost = best_path->path.total_cost;
498-
exchange->scan.plan.plan_rows = best_path->path.rows;
499-
exchange->scan.plan.plan_width = best_path->path.pathtarget->width;
500-
exchange->scan.plan.parallel_aware = best_path->path.parallel_aware;
501-
exchange->scan.plan.parallel_safe = best_path->path.parallel_safe;
502-
503511
/* Add stream name into private field*/
504512
GetMyServerName(&host, &port);
505513
sprintf(streamName, "%s-%d-%d", host, port, exchange_counter++);
@@ -791,15 +799,9 @@ make_exchange(List *custom_plans, List *tlist)
791799
Plan *plan = &node->scan.plan;
792800
List *child_tlist;
793801

794-
plan->startup_cost = 1;
795-
plan->total_cost = 1;
796-
plan->plan_rows = 1;
797-
plan->plan_width =1;
798802
plan->qual = NIL;
799803
plan->lefttree = NULL;
800804
plan->righttree = NULL;
801-
plan->parallel_aware = false; /* Use Shared Memory in parallel worker */
802-
plan->parallel_safe = false;
803805
plan->targetlist = tlist;
804806

805807
/* Setup methods and child plan */
@@ -1026,7 +1028,6 @@ EXCHANGE_Execute(CustomScanState *node)
10261028
else
10271029
{
10281030
state->stuples++;
1029-
// elog(LOG, "SEND TUPLE to stream [%s]", state->stream);
10301031
SendTuple(dest, state->stream, slot, false);
10311032
}
10321033
}
@@ -1043,8 +1044,6 @@ EXCHANGE_End(CustomScanState *node)
10431044

10441045
if (state->mode != EXCH_STEALTH)
10451046
Stream_unsubscribe(state->stream);
1046-
1047-
// elog(INFO, "EXCHANGE_END");
10481047
}
10491048

10501049
static void

contrib/pg_exchange/exchange.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,7 @@ extern CustomScan *make_exchange(List *custom_plans, List *tlist);
106106
extern ExchangePath *create_exchange_path(PlannerInfo *root, RelOptInfo *rel,
107107
Path *children, ExchangeMode mode);
108108
extern void createNodeName(char *nodeName, const char *hostname, int port);
109+
extern void cost_exchange(PlannerInfo *root, RelOptInfo *baserel,
110+
ExchangePath *expath);
109111

110112
#endif /* EXCHANGE_H_ */

contrib/pg_exchange/hooks.c

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
179179
reset_cheapest(innerrel);
180180
add_paths_to_joinrel(root, joinrel, outerrel, innerrel, jointype,
181181
extra->sjinfo, extra->restrictlist);
182+
182183
set_cheapest(joinrel);
183184
Assert(!join_path_contains_distexec(joinrel->cheapest_total_path));
184185
Assert(joinrel->cheapest_total_path);
@@ -195,6 +196,8 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
195196
bms_union(inner_servers, outer_servers));
196197
set_exchange_altrel(inner_mode, inn_child, rel2, NULL, NULL,
197198
bms_union(inner_servers, outer_servers));
199+
cost_exchange(root, outerrel, out_child);
200+
cost_exchange(root, innerrel, inn_child);
198201

199202
/* Check for special case */
200203
if (path->pathtype == T_NestLoop)
@@ -209,7 +212,7 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
209212
set_exchange_altrel(EXCH_BROADCAST, out_child, NULL,
210213
&((ExchangePath *)cstmSubPath1(out_child))->altrel,
211214
NIL, bms_union(inner_servers, outer_servers));
212-
215+
cost_exchange(root, joinrel, out_child);
213216
inn_child->mode = EXCH_STEALTH;
214217
}
215218
else if (inner_mode == EXCH_SHUFFLE)
@@ -225,6 +228,7 @@ create_distributed_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
225228
set_exchange_altrel(EXCH_GATHER, gather, &out_child->altrel,
226229
&inn_child->altrel, extra->restrictlist,
227230
bms_union(inner_servers, outer_servers));
231+
cost_exchange(root, joinrel, gather);
228232
Assert(gather->altrel.part_scheme != NULL);
229233
path = (Path *) create_distexec_path(root, joinrel, (Path *) gather,
230234
bms_union(inner_servers, outer_servers));
@@ -259,7 +263,6 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
259263
{
260264
ListCell *lc;
261265
List *delpaths = NIL;
262-
List *dist_paths = NIL;
263266
List *pathlist = NIL;
264267

265268
if (recursion)
@@ -289,35 +292,21 @@ HOOK_Join_pathlist(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
289292
foreach(lc, delpaths)
290293
joinrel->pathlist = list_delete_ptr(joinrel->pathlist, lfirst(lc));
291294

292-
/* Add trivial paths */
293-
dist_paths = create_distributed_join_paths(root, joinrel, outerrel, innerrel,
294-
jointype, extra, EXCH_GATHER, EXCH_GATHER);
295-
foreach(lc, dist_paths)
296-
{
297-
Path *path = lfirst(lc);
298-
add_path(joinrel, path);
299-
}
300-
pathlist = create_distributed_join_paths(root, joinrel, outerrel, innerrel,
301-
jointype, extra, EXCH_BROADCAST, EXCH_STEALTH);
302-
foreach(lc, pathlist)
303-
{
304-
Path *path = lfirst(lc);
305-
path->total_cost -= 0.1;
306-
add_path(joinrel, path);
307-
}
308-
list_free(pathlist);
309-
pathlist = NIL;
295+
/* Add distributed paths */
296+
pathlist = list_concat(pathlist, create_distributed_join_paths(root, joinrel,
297+
outerrel, innerrel, jointype, extra, EXCH_GATHER,
298+
EXCH_GATHER));
299+
300+
pathlist = list_concat(pathlist, create_distributed_join_paths(root, joinrel,
301+
outerrel, innerrel, jointype, extra, EXCH_BROADCAST,
302+
EXCH_STEALTH));
303+
304+
pathlist = list_concat(pathlist, create_distributed_join_paths(root, joinrel,
305+
outerrel, innerrel, jointype, extra, EXCH_SHUFFLE,
306+
EXCH_SHUFFLE));
310307

311-
pathlist = create_distributed_join_paths(root, joinrel, outerrel, innerrel,
312-
jointype, extra, EXCH_SHUFFLE, EXCH_SHUFFLE);
313308
foreach(lc, pathlist)
314-
{
315-
Path *path = lfirst(lc);
316-
path->total_cost -= 0.2;
317-
add_path(joinrel, path);
318-
}
319-
list_free(pathlist);
320-
pathlist = NIL;
309+
add_path(joinrel, lfirst(lc));
321310

322311
recursion = false;
323312
}
@@ -499,4 +488,3 @@ EXEC_Hooks_init(void)
499488
PreviousShmemStartupHook = shmem_startup_hook;
500489
shmem_startup_hook = HOOK_shmem_startup;
501490
}
502-

contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -395,14 +395,6 @@ CreateDistExecPlan(PlannerInfo *root,
395395
CustomScan *distExecNode;
396396

397397
distExecNode = make_distplanexec(custom_plans, tlist, best_path->custom_private);
398-
399-
distExecNode->scan.plan.startup_cost = best_path->path.startup_cost;
400-
distExecNode->scan.plan.total_cost = best_path->path.total_cost;
401-
distExecNode->scan.plan.plan_rows = best_path->path.rows;
402-
distExecNode->scan.plan.plan_width = best_path->path.pathtarget->width;
403-
distExecNode->scan.plan.parallel_aware = best_path->path.parallel_aware;
404-
distExecNode->scan.plan.parallel_safe = best_path->path.parallel_safe;
405-
406398
return &distExecNode->scan.plan;
407399
}
408400

@@ -442,15 +434,9 @@ make_distplanexec(List *custom_plans, List *tlist, List *private_data)
442434
ListCell *lc;
443435
List *child_tlist;
444436

445-
plan->startup_cost = 10;
446-
plan->total_cost = 10;
447-
plan->plan_rows = 1000;
448-
plan->plan_width =10;
449437
plan->qual = NIL;
450438
plan->lefttree = NULL;
451439
plan->righttree = NULL;
452-
plan->parallel_aware = false;
453-
plan->parallel_safe = false;
454440
plan->targetlist = tlist;
455441

456442
/* Setup methods and child plan */
@@ -502,8 +488,8 @@ create_distexec_path(PlannerInfo *root, RelOptInfo *rel, Path *children,
502488
path->methods = &distplanexec_path_methods;
503489

504490
pathnode->rows = children->rows;
505-
pathnode->startup_cost = 100.;
506-
pathnode->total_cost = pathnode->startup_cost;
491+
pathnode->startup_cost = children->startup_cost + 100.;
492+
pathnode->total_cost = children->total_cost + pathnode->startup_cost;
507493

508494
return path;
509495
}

0 commit comments

Comments
 (0)