Skip to content

Commit 474338a

Browse files
committed
make binary heap work for ArrangeAppend
1 parent 9275817 commit 474338a

File tree

2 files changed

+55
-19
lines changed

2 files changed

+55
-19
lines changed

arrangeappend.c

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,6 @@ create_arrangeappend_plan(PlannerInfo *root, RelOptInfo *rel,
276276
lfirst(plan_cell) = subplan;
277277
}
278278

279-
/* TODO: write node_XXX wariables to custom_private */
280-
281279
pack_arrangeappend_private(node, &mag);
282280

283281
return plan;
@@ -299,46 +297,75 @@ arrangeappend_create_scan_state(CustomScan *node)
299297
void
300298
arrangeappend_begin(CustomScanState *node, EState *estate, int eflags)
301299
{
302-
ArrangeAppendState *scan_state = (ArrangeAppendState *) node;
303-
304300
begin_append_common(node, estate, eflags);
305301
}
306302

307303
TupleTableSlot *
308304
arrangeappend_exec(CustomScanState *node)
309305
{
310-
ArrangeAppendState *scan_state = (ArrangeAppendState *) node;
311-
RuntimeAppendState *rstate = &scan_state->rstate;
306+
ArrangeAppendState *scan_state = (ArrangeAppendState *) node;
307+
RuntimeAppendState *rstate = &scan_state->rstate;
308+
PlanState *ps;
309+
int i;
312310

313311
if (scan_state->rstate.ncur_plans == 0)
314312
ExecReScan(&node->ss.ps);
315313

316-
while (rstate->running_idx < rstate->ncur_plans)
314+
if (!scan_state->ms_initialized)
315+
{
316+
for (i = 0; i < scan_state->rstate.ncur_plans; i++)
317+
{
318+
ChildScanCommon child = scan_state->rstate.cur_plans[i];
319+
PlanState *ps = child->content.plan_state;
320+
321+
Assert(child->content_type == CHILD_PLAN_STATE);
322+
323+
scan_state->ms_slots[i] = ExecProcNode(ps);
324+
if (!TupIsNull(scan_state->ms_slots[i]))
325+
binaryheap_add_unordered(scan_state->ms_heap, Int32GetDatum(i));
326+
}
327+
binaryheap_build(scan_state->ms_heap);
328+
scan_state->ms_initialized = true;
329+
}
330+
else
317331
{
318-
ChildScanCommon child = rstate->cur_plans[rstate->running_idx];
319-
PlanState *state = child->content.plan_state;
320-
TupleTableSlot *slot = NULL;
321-
bool quals;
332+
i = DatumGetInt32(binaryheap_first(scan_state->ms_heap));
333+
ps = scan_state->rstate.cur_plans[i]->content.plan_state;
322334

323335
for (;;)
324336
{
325-
slot = ExecProcNode(state);
337+
bool quals;
326338

327-
if (TupIsNull(slot))
339+
scan_state->ms_slots[i] = ExecProcNode(ps);
340+
341+
if (TupIsNull(scan_state->ms_slots[i]))
342+
{
343+
(void) binaryheap_remove_first(scan_state->ms_heap);
328344
break;
345+
}
329346

330-
node->ss.ps.ps_ExprContext->ecxt_scantuple = slot;
347+
node->ss.ps.ps_ExprContext->ecxt_scantuple = scan_state->ms_slots[i];
331348
quals = ExecQual(rstate->custom_expr_states,
332349
node->ss.ps.ps_ExprContext, false);
333350

334351
if (quals)
335-
return slot;
352+
{
353+
binaryheap_replace_first(scan_state->ms_heap, Int32GetDatum(i));
354+
break;
355+
}
336356
}
337-
338-
rstate->running_idx++;
339357
}
340358

341-
return NULL;
359+
if (binaryheap_empty(scan_state->ms_heap))
360+
{
361+
/* All the subplans are exhausted, and so is the heap */
362+
return NULL;
363+
}
364+
else
365+
{
366+
i = DatumGetInt32(binaryheap_first(scan_state->ms_heap));
367+
return scan_state->ms_slots[i];
368+
}
342369
}
343370

344371
void
@@ -347,17 +374,22 @@ arrangeappend_end(CustomScanState *node)
347374
ArrangeAppendState *scan_state = (ArrangeAppendState *) node;
348375

349376
end_append_common(node);
377+
378+
if (scan_state->ms_heap)
379+
binaryheap_free(scan_state->ms_heap);
350380
}
351381

352382
void
353383
arrangeappend_rescan(CustomScanState *node)
354384
{
355385
ArrangeAppendState *scan_state = (ArrangeAppendState *) node;
356-
int nplans = scan_state->rstate.ncur_plans;
386+
int nplans;
357387
int i;
358388

359389
rescan_append_common(node);
360390

391+
nplans = scan_state->rstate.ncur_plans;
392+
361393
scan_state->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
362394
scan_state->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, scan_state);
363395

@@ -387,6 +419,9 @@ arrangeappend_rescan(CustomScanState *node)
387419

388420
PrepareSortSupportFromOrderingOp(scan_state->sortOperators[i], sortKey);
389421
}
422+
423+
binaryheap_reset(scan_state->ms_heap);
424+
scan_state->ms_initialized = false;
390425
}
391426

392427
void

arrangeappend.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ typedef struct
2727
SortSupport ms_sortkeys;
2828
TupleTableSlot **ms_slots;
2929
struct binaryheap *ms_heap;
30+
bool ms_initialized;
3031
} ArrangeAppendState;
3132

3233

0 commit comments

Comments
 (0)