Skip to content

Commit fccbb2c

Browse files
committed
Add dummy parallel join
1 parent 03a11dd commit fccbb2c

File tree

11 files changed

+382
-120
lines changed

11 files changed

+382
-120
lines changed

contrib/pg_exchange/exchange.c

Lines changed: 82 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ static void EXCHANGE_InitializeWorker(CustomScanState *node,
6565
static Node *EXCHANGE_Create_state(CustomScan *node);
6666

6767

68+
#define END_OF_TUPLES 'E'
69+
#define END_OF_EXCHANGE 'Q'
6870
void
6971
EXCHANGE_Init_methods(void)
7072
{
@@ -95,6 +97,7 @@ EXCHANGE_Init_methods(void)
9597
DistExec_Init_methods();
9698
}
9799

100+
#include "nodes/relation.h"
98101
/*
99102
* Add one path for a base relation target: replace all ForeignScan nodes by
100103
* local Scan nodes.
@@ -116,7 +119,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
116119
*/
117120
return;
118121

119-
elog(INFO, "INSERT EXCHANGE");
122+
elog(INFO, "INSERT EXCHANGE. paths: %d", list_length(rel->pathlist));
120123

121124
/* Traverse all possible paths and search for APPEND */
122125
foreach(lc, rel->pathlist)
@@ -125,16 +128,15 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
125128
Path *tmpLocalScanPath = NULL;
126129
AppendPath *appendPath = NULL;
127130
ListCell *lc1;
128-
List *private_data = NIL;
129-
130-
Assert(path->pathtype != T_MergeAppend); /* Do it later */
131+
Bitmapset *servers = NULL;
132+
List *subpaths = NIL;
131133

132134
if (path->pathtype != T_Append)
133135
continue;
134136

135-
appendPath = makeNode(AppendPath);
136-
memcpy(appendPath, path, sizeof(AppendPath));
137-
appendPath->subpaths = NIL;
137+
// elog(INFO, "-> IE. path params: %hhu, ptype: %d, tcost: %f, scost: %f",
138+
// path->param_info != NULL, path->pathtype,
139+
// path->total_cost, path->startup_cost);
138140

139141
/*
140142
* Traverse all APPEND subpaths, check for scan-type and search for
@@ -145,7 +147,9 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
145147
Path *subpath = (Path *) lfirst(lc1);
146148
Path *tmpPath;
147149
Oid serverid = InvalidOid;
148-
150+
elog(INFO, "--> IE. subpath params: %hhu, ptype: %d, tcost: %f, scost: %f",
151+
subpath->param_info != NULL, subpath->pathtype,
152+
subpath->total_cost, subpath->startup_cost);
149153
if ((subpath->pathtype != T_ForeignScan) && (tmpLocalScanPath))
150154
/* Check assumption No.1 */
151155
Assert(tmpLocalScanPath->pathtype == subpath->pathtype);
@@ -159,8 +163,11 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
159163

160164
case T_ForeignScan:
161165
serverid = subpath->parent->serverid;
166+
if (PATH_REQ_OUTER(subpath) != NULL)
167+
continue;
162168
tmpPath = (Path *) makeNode(SeqScan);
163-
tmpPath = create_seqscan_path(root, subpath->parent, subpath->parent->lateral_relids, 0);
169+
tmpPath = create_seqscan_path(root, subpath->parent,
170+
PATH_REQ_OUTER(subpath), 0);
164171
break;
165172

166173
default:
@@ -170,22 +177,25 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
170177
if (!tmpLocalScanPath)
171178
tmpLocalScanPath = tmpPath;
172179

173-
appendPath->subpaths = lappend(appendPath->subpaths, tmpPath);
174-
if (OidIsValid(serverid))
175-
private_data = lappend_oid(private_data, serverid);
180+
subpaths = lappend(subpaths, tmpPath);
181+
// appendPath->subpaths = lappend(appendPath->subpaths, tmpPath);
182+
if (OidIsValid(serverid) && !bms_is_member((int)serverid, servers))
183+
servers = bms_add_member(servers, serverid);
176184
}
177185

178-
if (private_data == NIL)
186+
if (servers == NULL)
179187
{
180-
pfree(appendPath);
181188
elog(INFO, "NO one foreign source found");
182189
continue;
183190
}
184191
else
185-
elog(INFO, "Source found: %d", list_length(private_data));
192+
elog(INFO, "Source found: %d", bms_num_members(servers));
186193

194+
appendPath = create_append_path(root, rel, subpaths, NIL,
195+
PATH_REQ_OUTER(tmpLocalScanPath), 0, false,
196+
((AppendPath *)path)->partitioned_rels, -1);
187197
path = create_exchange_path(root, rel, (Path *) appendPath);
188-
path = create_distexec_path(root, rel, path, private_data);
198+
path = create_distexec_path(root, rel, path, servers);
189199
add_path(rel, path);
190200
}
191201
}
@@ -206,8 +216,8 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, Path *path)
206216

207217
/* Now I do not want to think about cost estimations. */
208218
path->rows = baserel->tuples;
209-
path->startup_cost = 0.0001;
210-
path->total_cost = path->startup_cost + 0.0001 * path->rows;
219+
path->startup_cost = 10000.0001;
220+
path->total_cost = path->startup_cost + 100000.0001 * path->rows;
211221
}
212222

213223
/* XXX: Need to be placed in shared memory */
@@ -342,7 +352,7 @@ EXCHANGE_Begin(CustomScanState *node, EState *estate, int eflags)
342352
{
343353
CustomScan *cscan = (CustomScan *) node->ss.ps.plan;
344354
Plan *scan_plan;
345-
bool explain_only = ((eflags & EXEC_FLAG_EXPLAIN_ONLY) != 0);
355+
// bool explain_only = ((eflags & EXEC_FLAG_EXPLAIN_ONLY) != 0);
346356
PlanState *planState;
347357
ExchangeState *state = (ExchangeState *) node;
348358
TupleDesc scan_tupdesc;
@@ -353,7 +363,7 @@ EXCHANGE_Begin(CustomScanState *node, EState *estate, int eflags)
353363
planState = (PlanState *) ExecInitNode(scan_plan, estate, eflags);
354364
node->custom_ps = lappend(node->custom_ps, planState);
355365

356-
Assert(Stream_subscribe(state->stream));
366+
Stream_subscribe(state->stream);
357367

358368
state->init = false;
359369
state->ltuples = 0;
@@ -375,23 +385,33 @@ distribution_fn_gather(TupleTableSlot *slot, DMQDestCont *dcont)
375385
return -1;
376386
}
377387

378-
static TupleTableSlot *
379-
EXCHANGE_Execute(CustomScanState *node)
388+
static void
389+
init_state_ifany(ExchangeState *state)
380390
{
381-
ScanState *ss = &node->ss;
382-
ScanState *subPlanState = linitial(node->custom_ps);
383-
ExchangeState *state = (ExchangeState *) node;
384-
bool readRemote = true;
385-
386391
if (!state->init)
387392
{
388393
EphemeralNamedRelation enr = get_ENR(state->estate->es_queryEnv, destsName);
394+
395+
Assert(enr != NULL && enr->reldata != NULL);
389396
state->dests = (DMQDestCont *) enr->reldata;
390-
state->init = true;
391397
state->hasLocal = true;
392398
state->activeRemotes = state->dests->nservers;
399+
state->init = true;
400+
// elog(INFO, "[%d] EXCHANGE Init", getpid());
393401
}
394402

403+
}
404+
405+
static TupleTableSlot *
406+
EXCHANGE_Execute(CustomScanState *node)
407+
{
408+
ScanState *ss = &node->ss;
409+
ScanState *subPlanState = linitial(node->custom_ps);
410+
ExchangeState *state = (ExchangeState *) node;
411+
bool readRemote = true;
412+
413+
init_state_ifany(state);
414+
395415
for(;;)
396416
{
397417
TupleTableSlot *slot = NULL;
@@ -405,22 +425,23 @@ EXCHANGE_Execute(CustomScanState *node)
405425

406426
slot = RecvTuple(ss->ss_ScanTupleSlot->tts_tupleDescriptor,
407427
state->stream, &status);
408-
if (status == 0)
428+
switch (status)
409429
{
410-
if (TupIsNull(slot))
411-
{
412-
state->activeRemotes--;
413-
// elog(LOG, "Finish remote receiving. r=%d", state->rtuples);
414-
}
415-
else
416-
{
417-
state->rtuples++;
418-
// elog(LOG, "GOT tuple from another node. r=%d", state->rtuples);
419-
return slot;
420-
}
430+
case -1:
431+
/* No tuples currently */
432+
break;
433+
case 0:
434+
Assert(!TupIsNull(slot));
435+
state->rtuples++;
436+
return slot;
437+
case 1:
438+
state->activeRemotes--;
439+
break;
440+
case 2: /* Close EXCHANGE channel */
441+
break;
442+
default:
443+
Assert(0);
421444
}
422-
// else
423-
// elog(LOG, "No remote tuples for now");
424445
}
425446

426447
if ((state->hasLocal) && (!readRemote))
@@ -429,9 +450,9 @@ EXCHANGE_Execute(CustomScanState *node)
429450
if (TupIsNull(slot))
430451
{
431452
int i;
432-
//elog(LOG, "FINISH Local store: l=%d, r=%d", state->ltuples, state->rtuples);
453+
// elog(LOG, "[%s] FINISH Local store: l=%d, r=%d", state->stream, state->ltuples, state->rtuples);
433454
for (i = 0; i < state->dests->nservers; i++)
434-
SendTuple(state->dests->dests[i].dest_id, state->stream, NULL);
455+
SendByteMessage(state->dests->dests[i].dest_id, state->stream, END_OF_TUPLES);
435456
state->hasLocal = false;
436457
continue;
437458
}
@@ -444,7 +465,8 @@ EXCHANGE_Execute(CustomScanState *node)
444465

445466
if ((state->activeRemotes == 0) && (!state->hasLocal))
446467
{
447-
elog(LOG, "Exchange returns NULL: %d %d", state->ltuples, state->rtuples);
468+
elog(LOG, "[%s] Exchange returns NULL: %d %d", state->stream,
469+
state->ltuples, state->rtuples);
448470
return NULL;
449471
}
450472

@@ -457,7 +479,6 @@ EXCHANGE_Execute(CustomScanState *node)
457479
return slot;
458480
else
459481
{
460-
// elog(LOG, "Send real tuple");
461482
SendTuple(dest, state->stream, slot);
462483
}
463484
}
@@ -471,20 +492,25 @@ EXCHANGE_End(CustomScanState *node)
471492

472493
Assert(list_length(node->custom_ps) == 1);
473494
ExecEndNode(linitial(node->custom_ps));
474-
Assert(Stream_unsubscribe(state->stream));
475-
elog(LOG, "EXCHANGE_END");
476-
/*
477-
* Clean out exchange state
478-
*/
495+
Stream_unsubscribe(state->stream);
496+
497+
elog(INFO, "EXCHANGE_END");
479498
}
480499

481500
static void
482501
EXCHANGE_Rescan(CustomScanState *node)
483502
{
484-
PlanState *outerPlan = outerPlanState(node);
503+
ExchangeState *state = (ExchangeState *) node;
504+
PlanState *subPlan = (PlanState *) linitial(node->custom_ps);
485505

486-
if (outerPlan->chgParam == NULL)
487-
ExecReScan(outerPlan);
506+
init_state_ifany(state);
507+
elog(INFO, "Rescan exchange! %d", getpid());
508+
if (subPlan->chgParam == NULL)
509+
ExecReScan(subPlan);
510+
state->activeRemotes = state->dests->nservers;
511+
state->ltuples = 0;
512+
state->rtuples = 0;
513+
state->hasLocal = true;
488514
}
489515

490516
static void
@@ -500,8 +526,10 @@ static void
500526
EXCHANGE_Explain(CustomScanState *node, List *ancestors, ExplainState *es)
501527
{
502528
StringInfoData str;
529+
ExchangeState *state = (ExchangeState *) node;
503530

504531
initStringInfo(&str);
532+
appendStringInfo(&str, "stream: %s. ", state->stream);
505533
ExplainPropertyText("Exchange", str.data, es);
506534
}
507535

0 commit comments

Comments
 (0)