37
37
#include "partutils.h"
38
38
#include "stream.h"
39
39
40
- #define DEFAULT_EXCHANGE_STARTUP_COST 100.0
40
+ /*
41
+ * Startup cost of EXCHANGE node is smaller, than DistExec node, because cost
42
+ * of DistExec node contains cost of query transfer and localization at each
43
+ * instance. Startup cost of EXCHANGE node includes only connection channel
44
+ * establishing between instances.
45
+ */
46
+ #define DEFAULT_EXCHANGE_STARTUP_COST 10.0
41
47
#define DEFAULT_TRANSFER_TUPLE_COST 0.01
42
48
43
49
@@ -111,6 +117,7 @@ static void create_gather_dfn(EPPNode *epp, RelOptInfo *rel);
111
117
static void create_stealth_dfn (EPPNode * epp , RelOptInfo * rel , PlannerInfo * root );
112
118
static void create_shuffle_dfn (EPPNode * epp , RelOptInfo * rel , PlannerInfo * root );
113
119
static void create_broadcast_dfn (EPPNode * epp , RelOptInfo * rel , PlannerInfo * root );
120
+ static void force_add_path (RelOptInfo * rel , Path * path );
114
121
115
122
116
123
/*
@@ -400,42 +407,52 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
400
407
set_exchange_altrel (EXCH_GATHER , (ExchangePath * ) path , rel , NULL , NULL ,
401
408
servers );
402
409
path = (Path * ) create_distexec_path (root , rel , path , servers );
403
- add_path (rel , path );
410
+
411
+ force_add_path (rel , path );
404
412
}
405
413
}
406
414
415
+ static void
416
+ force_add_path (RelOptInfo * rel , Path * path )
417
+ {
418
+ List * pathlist = rel -> pathlist ;
419
+
420
+ rel -> pathlist = NIL ;
421
+ rel -> cheapest_parameterized_paths = NIL ;
422
+ rel -> cheapest_startup_path = rel -> cheapest_total_path =
423
+ rel -> cheapest_unique_path = NULL ;
424
+ add_path (rel , path );
425
+ rel -> pathlist = list_concat (rel -> pathlist , pathlist );
426
+ set_cheapest (rel );
427
+ }
428
+
407
429
#include "optimizer/cost.h"
408
430
409
- static void
431
+ void
410
432
cost_exchange (PlannerInfo * root , RelOptInfo * baserel , ExchangePath * expath )
411
433
{
412
434
Path * subpath ;
413
435
414
- if (baserel -> pages == 0 && baserel -> tuples == 0 )
415
- {
416
- baserel -> pages = 10 ;
417
- baserel -> tuples =
418
- (10 * BLCKSZ ) / (baserel -> reltarget -> width +
419
- MAXALIGN (SizeofHeapTupleHeader ));
420
- }
421
-
422
436
/* Estimate baserel size as best we can with local statistics. */
423
- // set_baserel_size_estimates(root, baserel);
424
437
subpath = cstmSubPath1 (expath );
425
- expath -> cp .path .rows = baserel -> tuples ;
426
- expath -> cp .path .startup_cost = subpath -> startup_cost ;
438
+ expath -> cp .path .rows = subpath -> rows ;
439
+ expath -> cp .path .startup_cost = 0. ;
440
+ expath -> cp .path .total_cost = subpath -> total_cost ;
441
+
427
442
switch (expath -> mode )
428
443
{
429
444
case EXCH_GATHER :
430
445
expath -> cp .path .startup_cost += DEFAULT_EXCHANGE_STARTUP_COST ;
431
- expath -> cp .path .rows = subpath -> rows * expath -> altrel . nparts ;
446
+ expath -> cp .path .total_cost += cpu_tuple_cost * expath -> cp . path . rows ;
432
447
break ;
433
448
case EXCH_STEALTH :
434
- expath -> cp .path .rows = subpath -> rows ;
449
+ expath -> cp .path .startup_cost = 0. ;
450
+ expath -> cp .path .total_cost += cpu_tuple_cost * expath -> cp .path .rows /
451
+ expath -> altrel .nparts ;
435
452
break ;
436
453
case EXCH_BROADCAST :
437
454
expath -> cp .path .startup_cost += DEFAULT_EXCHANGE_STARTUP_COST ;
438
- expath -> cp .path .rows = subpath -> rows * expath -> altrel . nparts ;
455
+ expath -> cp .path .total_cost += cpu_tuple_cost * expath -> cp . path . rows ;
439
456
break ;
440
457
case EXCH_SHUFFLE :
441
458
{
@@ -446,15 +463,14 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, ExchangePath *expath)
446
463
Path * path = & expath -> cp .path ;
447
464
448
465
path -> startup_cost += DEFAULT_EXCHANGE_STARTUP_COST ;
449
- path -> total_cost += path -> startup_cost ;
450
466
451
467
/*
452
- * We count on perfect balance of tuple distribution:
453
- * If we have N instances, M tuples from subtree, than we send up to
454
- * local subtree M/N tuples, send to network [M-M/N] tuples and same to
468
+ * We assume perfect balance of tuple distribution:
469
+ * If we have N instances, M tuples from subtree, than we send up by
470
+ * subtree M/N local tuples, send to network [M-M/N] tuples and same to
455
471
* receive.
456
472
*/
457
- path -> rows = subpath -> rows ;
473
+ path -> rows /= expath -> altrel . nparts ;
458
474
instances = expath -> altrel .nparts > 0 ? expath -> altrel .nparts : 2 ;
459
475
send_rows = path -> rows - (path -> rows /instances );
460
476
received_rows = send_rows ;
@@ -466,8 +482,7 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, ExchangePath *expath)
466
482
default :
467
483
elog (FATAL , "Unknown EXCHANGE mode." );
468
484
}
469
-
470
- expath -> cp .path .total_cost = 0.1 ;
485
+ expath -> cp .path .total_cost += expath -> cp .path .startup_cost ;
471
486
}
472
487
473
488
/*
@@ -493,13 +508,6 @@ ExchangePlanCustomPath(PlannerInfo *root,
493
508
exchange = make_exchange (custom_plans , tlist );
494
509
private -> node .extnodename = EXCHANGE_PRIVATE_NAME ;
495
510
496
- exchange -> scan .plan .startup_cost = best_path -> path .startup_cost ;
497
- exchange -> scan .plan .total_cost = best_path -> path .total_cost ;
498
- exchange -> scan .plan .plan_rows = best_path -> path .rows ;
499
- exchange -> scan .plan .plan_width = best_path -> path .pathtarget -> width ;
500
- exchange -> scan .plan .parallel_aware = best_path -> path .parallel_aware ;
501
- exchange -> scan .plan .parallel_safe = best_path -> path .parallel_safe ;
502
-
503
511
/* Add stream name into private field*/
504
512
GetMyServerName (& host , & port );
505
513
sprintf (streamName , "%s-%d-%d" , host , port , exchange_counter ++ );
@@ -791,15 +799,9 @@ make_exchange(List *custom_plans, List *tlist)
791
799
Plan * plan = & node -> scan .plan ;
792
800
List * child_tlist ;
793
801
794
- plan -> startup_cost = 1 ;
795
- plan -> total_cost = 1 ;
796
- plan -> plan_rows = 1 ;
797
- plan -> plan_width = 1 ;
798
802
plan -> qual = NIL ;
799
803
plan -> lefttree = NULL ;
800
804
plan -> righttree = NULL ;
801
- plan -> parallel_aware = false; /* Use Shared Memory in parallel worker */
802
- plan -> parallel_safe = false;
803
805
plan -> targetlist = tlist ;
804
806
805
807
/* Setup methods and child plan */
@@ -1026,7 +1028,6 @@ EXCHANGE_Execute(CustomScanState *node)
1026
1028
else
1027
1029
{
1028
1030
state -> stuples ++ ;
1029
- // elog(LOG, "SEND TUPLE to stream [%s]", state->stream);
1030
1031
SendTuple (dest , state -> stream , slot , false);
1031
1032
}
1032
1033
}
@@ -1043,8 +1044,6 @@ EXCHANGE_End(CustomScanState *node)
1043
1044
1044
1045
if (state -> mode != EXCH_STEALTH )
1045
1046
Stream_unsubscribe (state -> stream );
1046
-
1047
- // elog(INFO, "EXCHANGE_END");
1048
1047
}
1049
1048
1050
1049
static void
0 commit comments