Skip to content

Commit cdc7169

Browse files
committed
Use MinimalTuple for tuple queues.
This representation saves 8 bytes per tuple compared to HeapTuple, and avoids the need to allocate, copy and free on the receiving side. Gather can emit the returned MinimalTuple directly, but GatherMerge now needs to make an explicit copy because it buffers multiple tuples at a time. That should be no worse than before. Reviewed-by: Soumyadeep Chakraborty <soumyadeep2007@gmail.com> Discussion: https://postgr.es/m/CA%2BhUKG%2B8T_ggoUTAE-U%3DA%2BOcPc4%3DB0nPPHcSfffuQhvXXjML6w%40mail.gmail.com
1 parent d2bddc2 commit cdc7169

File tree

5 files changed

+51
-47
lines changed

5 files changed

+51
-47
lines changed

src/backend/executor/nodeGather.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646

4747
static TupleTableSlot *ExecGather(PlanState *pstate);
4848
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
49-
static HeapTuple gather_readnext(GatherState *gatherstate);
49+
static MinimalTuple gather_readnext(GatherState *gatherstate);
5050
static void ExecShutdownGatherWorkers(GatherState *node);
5151

5252

@@ -120,7 +120,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
120120
* Initialize funnel slot to same tuple descriptor as outer plan.
121121
*/
122122
gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
123-
&TTSOpsHeapTuple);
123+
&TTSOpsMinimalTuple);
124124

125125
/*
126126
* Gather doesn't support checking a qual (it's always more efficient to
@@ -266,7 +266,7 @@ gather_getnext(GatherState *gatherstate)
266266
PlanState *outerPlan = outerPlanState(gatherstate);
267267
TupleTableSlot *outerTupleSlot;
268268
TupleTableSlot *fslot = gatherstate->funnel_slot;
269-
HeapTuple tup;
269+
MinimalTuple tup;
270270

271271
while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
272272
{
@@ -278,9 +278,9 @@ gather_getnext(GatherState *gatherstate)
278278

279279
if (HeapTupleIsValid(tup))
280280
{
281-
ExecStoreHeapTuple(tup, /* tuple to store */
282-
fslot, /* slot to store the tuple */
283-
true); /* pfree tuple when done with it */
281+
ExecStoreMinimalTuple(tup, /* tuple to store */
282+
fslot, /* slot to store the tuple */
283+
false); /* don't pfree tuple */
284284
return fslot;
285285
}
286286
}
@@ -308,15 +308,15 @@ gather_getnext(GatherState *gatherstate)
308308
/*
309309
* Attempt to read a tuple from one of our parallel workers.
310310
*/
311-
static HeapTuple
311+
static MinimalTuple
312312
gather_readnext(GatherState *gatherstate)
313313
{
314314
int nvisited = 0;
315315

316316
for (;;)
317317
{
318318
TupleQueueReader *reader;
319-
HeapTuple tup;
319+
MinimalTuple tup;
320320
bool readerdone;
321321

322322
/* Check for async events, particularly messages from workers. */

src/backend/executor/nodeGatherMerge.c

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
*/
4646
typedef struct GMReaderTupleBuffer
4747
{
48-
HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */
48+
MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
4949
int nTuples; /* number of tuples currently stored */
5050
int readCounter; /* index of next tuple to extract */
5151
bool done; /* true if reader is known exhausted */
@@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer
5454
static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
5555
static int32 heap_compare_slots(Datum a, Datum b, void *arg);
5656
static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
57-
static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
58-
bool nowait, bool *done);
57+
static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
58+
bool nowait, bool *done);
5959
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
6060
static void gather_merge_setup(GatherMergeState *gm_state);
6161
static void gather_merge_init(GatherMergeState *gm_state);
@@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state)
419419
{
420420
/* Allocate the tuple array with length MAX_TUPLE_STORE */
421421
gm_state->gm_tuple_buffers[i].tuple =
422-
(HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
422+
(MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE);
423423

424424
/* Initialize tuple slot for worker */
425425
gm_state->gm_slots[i + 1] =
426426
ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
427-
&TTSOpsHeapTuple);
427+
&TTSOpsMinimalTuple);
428428
}
429429

430430
/* Allocate the resources for the merge */
@@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state)
533533
GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
534534

535535
while (tuple_buffer->readCounter < tuple_buffer->nTuples)
536-
heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
536+
pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
537537

538538
ExecClearTuple(gm_state->gm_slots[i + 1]);
539539
}
@@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
613613
/* Try to fill additional slots in the array. */
614614
for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
615615
{
616-
HeapTuple tuple;
616+
MinimalTuple tuple;
617617

618618
tuple = gm_readnext_tuple(gm_state,
619619
reader,
620620
true,
621621
&tuple_buffer->done);
622-
if (!HeapTupleIsValid(tuple))
622+
if (!tuple)
623623
break;
624624
tuple_buffer->tuple[i] = tuple;
625625
tuple_buffer->nTuples++;
@@ -637,7 +637,7 @@ static bool
637637
gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
638638
{
639639
GMReaderTupleBuffer *tuple_buffer;
640-
HeapTuple tup;
640+
MinimalTuple tup;
641641

642642
/*
643643
* If we're being asked to generate a tuple from the leader, then we just
@@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
687687
reader,
688688
nowait,
689689
&tuple_buffer->done);
690-
if (!HeapTupleIsValid(tup))
690+
if (!tup)
691691
return false;
692692

693693
/*
@@ -697,26 +697,26 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
697697
load_tuple_array(gm_state, reader);
698698
}
699699

700-
Assert(HeapTupleIsValid(tup));
700+
Assert(tup);
701701

702702
/* Build the TupleTableSlot for the given tuple */
703-
ExecStoreHeapTuple(tup, /* tuple to store */
704-
gm_state->gm_slots[reader], /* slot in which to store
705-
* the tuple */
706-
true); /* pfree tuple when done with it */
703+
ExecStoreMinimalTuple(tup, /* tuple to store */
704+
gm_state->gm_slots[reader], /* slot in which to store
705+
* the tuple */
706+
true); /* pfree tuple when done with it */
707707

708708
return true;
709709
}
710710

711711
/*
712712
* Attempt to read a tuple from given worker.
713713
*/
714-
static HeapTuple
714+
static MinimalTuple
715715
gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
716716
bool *done)
717717
{
718718
TupleQueueReader *reader;
719-
HeapTuple tup;
719+
MinimalTuple tup;
720720

721721
/* Check for async events, particularly messages from workers. */
722722
CHECK_FOR_INTERRUPTS();
@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
732732
reader = gm_state->reader[nreader - 1];
733733
tup = TupleQueueReaderNext(reader, nowait, done);
734734

735-
return tup;
735+
/*
736+
* Since we'll be buffering these across multiple calls, we need to make a
737+
* copy.
738+
*/
739+
return tup ? heap_copy_minimal_tuple(tup) : NULL;
736740
}
737741

738742
/*

src/backend/executor/tqueue.c

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,16 @@ static bool
5454
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
5555
{
5656
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57-
HeapTuple tuple;
57+
MinimalTuple tuple;
5858
shm_mq_result result;
5959
bool should_free;
6060

6161
/* Send the tuple itself. */
62-
tuple = ExecFetchSlotHeapTuple(slot, true, &should_free);
63-
result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
62+
tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
63+
result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
6464

6565
if (should_free)
66-
heap_freetuple(tuple);
66+
pfree(tuple);
6767

6868
/* Check for failure. */
6969
if (result == SHM_MQ_DETACHED)
@@ -164,18 +164,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
164164
* nowait = true and no tuple is ready to return. *done, if not NULL,
165165
* is set to true when there are no remaining tuples and otherwise to false.
166166
*
167-
* The returned tuple, if any, is allocated in CurrentMemoryContext.
168-
* Note that this routine must not leak memory! (We used to allow that,
169-
* but not any more.)
167+
* The returned tuple, if any, is either in shared memory or a private buffer
168+
* and should not be freed. The pointer is invalid after the next call to
169+
* TupleQueueReaderNext().
170170
*
171171
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172172
* accumulate bytes from a partially-read message, so it's useful to call
173173
* this with nowait = true even if nothing is returned.
174174
*/
175-
HeapTuple
175+
MinimalTuple
176176
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
177177
{
178-
HeapTupleData htup;
178+
MinimalTuple tuple;
179179
shm_mq_result result;
180180
Size nbytes;
181181
void *data;
@@ -200,13 +200,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
200200
Assert(result == SHM_MQ_SUCCESS);
201201

202202
/*
203-
* Set up a dummy HeapTupleData pointing to the data from the shm_mq
204-
* (which had better be sufficiently aligned).
203+
* Return a pointer to the queue memory directly (which had better be
204+
* sufficiently aligned).
205205
*/
206-
ItemPointerSetInvalid(&htup.t_self);
207-
htup.t_tableOid = InvalidOid;
208-
htup.t_len = nbytes;
209-
htup.t_data = data;
206+
tuple = (MinimalTuple) data;
207+
Assert(tuple->t_len == nbytes);
210208

211-
return heap_copytuple(&htup);
209+
return tuple;
212210
}

src/backend/optimizer/plan/createplan.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1730,8 +1730,10 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
17301730
List *tlist;
17311731

17321732
/*
1733-
* Although the Gather node can project, we prefer to push down such work
1734-
* to its child node, so demand an exact tlist from the child.
1733+
* Push projection down to the child node. That way, the projection work
1734+
* is parallelized, and there can be no system columns in the result (they
1735+
* can't travel through a tuple queue because it uses MinimalTuple
1736+
* representation).
17351737
*/
17361738
subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
17371739

@@ -1766,7 +1768,7 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
17661768
List *pathkeys = best_path->path.pathkeys;
17671769
List *tlist = build_path_tlist(root, &best_path->path);
17681770

1769-
/* As with Gather, it's best to project away columns in the workers. */
1771+
/* As with Gather, project away columns in the workers. */
17701772
subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
17711773

17721774
/* Create a shell for a GatherMerge plan. */

src/include/executor/tqueue.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
2626
/* Use these to receive tuples from a shm_mq. */
2727
extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle);
2828
extern void DestroyTupleQueueReader(TupleQueueReader *reader);
29-
extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader,
30-
bool nowait, bool *done);
29+
extern MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader,
30+
bool nowait, bool *done);
3131

3232
#endif /* TQUEUE_H */

0 commit comments

Comments
 (0)