45
45
*/
46
46
typedef struct GMReaderTupleBuffer
47
47
{
48
- HeapTuple * tuple ; /* array of length MAX_TUPLE_STORE */
48
+ MinimalTuple * tuple ; /* array of length MAX_TUPLE_STORE */
49
49
int nTuples ; /* number of tuples currently stored */
50
50
int readCounter ; /* index of next tuple to extract */
51
51
bool done ; /* true if reader is known exhausted */
@@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer
54
54
static TupleTableSlot * ExecGatherMerge (PlanState * pstate );
55
55
static int32 heap_compare_slots (Datum a , Datum b , void * arg );
56
56
static TupleTableSlot * gather_merge_getnext (GatherMergeState * gm_state );
57
- static HeapTuple gm_readnext_tuple (GatherMergeState * gm_state , int nreader ,
58
- bool nowait , bool * done );
57
+ static MinimalTuple gm_readnext_tuple (GatherMergeState * gm_state , int nreader ,
58
+ bool nowait , bool * done );
59
59
static void ExecShutdownGatherMergeWorkers (GatherMergeState * node );
60
60
static void gather_merge_setup (GatherMergeState * gm_state );
61
61
static void gather_merge_init (GatherMergeState * gm_state );
@@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state)
419
419
{
420
420
/* Allocate the tuple array with length MAX_TUPLE_STORE */
421
421
gm_state -> gm_tuple_buffers [i ].tuple =
422
- (HeapTuple * ) palloc0 (sizeof (HeapTuple ) * MAX_TUPLE_STORE );
422
+ (MinimalTuple * ) palloc0 (sizeof (MinimalTuple ) * MAX_TUPLE_STORE );
423
423
424
424
/* Initialize tuple slot for worker */
425
425
gm_state -> gm_slots [i + 1 ] =
426
426
ExecInitExtraTupleSlot (gm_state -> ps .state , gm_state -> tupDesc ,
427
- & TTSOpsHeapTuple );
427
+ & TTSOpsMinimalTuple );
428
428
}
429
429
430
430
/* Allocate the resources for the merge */
@@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state)
533
533
GMReaderTupleBuffer * tuple_buffer = & gm_state -> gm_tuple_buffers [i ];
534
534
535
535
while (tuple_buffer -> readCounter < tuple_buffer -> nTuples )
536
- heap_freetuple (tuple_buffer -> tuple [tuple_buffer -> readCounter ++ ]);
536
+ pfree (tuple_buffer -> tuple [tuple_buffer -> readCounter ++ ]);
537
537
538
538
ExecClearTuple (gm_state -> gm_slots [i + 1 ]);
539
539
}
@@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
613
613
/* Try to fill additional slots in the array. */
614
614
for (i = tuple_buffer -> nTuples ; i < MAX_TUPLE_STORE ; i ++ )
615
615
{
616
- HeapTuple tuple ;
616
+ MinimalTuple tuple ;
617
617
618
618
tuple = gm_readnext_tuple (gm_state ,
619
619
reader ,
620
620
true,
621
621
& tuple_buffer -> done );
622
- if (!HeapTupleIsValid ( tuple ) )
622
+ if (!tuple )
623
623
break ;
624
624
tuple_buffer -> tuple [i ] = tuple ;
625
625
tuple_buffer -> nTuples ++ ;
@@ -637,7 +637,7 @@ static bool
637
637
gather_merge_readnext (GatherMergeState * gm_state , int reader , bool nowait )
638
638
{
639
639
GMReaderTupleBuffer * tuple_buffer ;
640
- HeapTuple tup ;
640
+ MinimalTuple tup ;
641
641
642
642
/*
643
643
* If we're being asked to generate a tuple from the leader, then we just
@@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
687
687
reader ,
688
688
nowait ,
689
689
& tuple_buffer -> done );
690
- if (!HeapTupleIsValid ( tup ) )
690
+ if (!tup )
691
691
return false;
692
692
693
693
/*
@@ -697,26 +697,26 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
697
697
load_tuple_array (gm_state , reader );
698
698
}
699
699
700
- Assert (HeapTupleIsValid ( tup ) );
700
+ Assert (tup );
701
701
702
702
/* Build the TupleTableSlot for the given tuple */
703
- ExecStoreHeapTuple (tup , /* tuple to store */
704
- gm_state -> gm_slots [reader ], /* slot in which to store
705
- * the tuple */
706
- true); /* pfree tuple when done with it */
703
+ ExecStoreMinimalTuple (tup , /* tuple to store */
704
+ gm_state -> gm_slots [reader ], /* slot in which to store
705
+ * the tuple */
706
+ true); /* pfree tuple when done with it */
707
707
708
708
return true;
709
709
}
710
710
711
711
/*
712
712
* Attempt to read a tuple from given worker.
713
713
*/
714
- static HeapTuple
714
+ static MinimalTuple
715
715
gm_readnext_tuple (GatherMergeState * gm_state , int nreader , bool nowait ,
716
716
bool * done )
717
717
{
718
718
TupleQueueReader * reader ;
719
- HeapTuple tup ;
719
+ MinimalTuple tup ;
720
720
721
721
/* Check for async events, particularly messages from workers. */
722
722
CHECK_FOR_INTERRUPTS ();
@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
732
732
reader = gm_state -> reader [nreader - 1 ];
733
733
tup = TupleQueueReaderNext (reader , nowait , done );
734
734
735
- return tup ;
735
+ /*
736
+ * Since we'll be buffering these across multiple calls, we need to make a
737
+ * copy.
738
+ */
739
+ return tup ? heap_copy_minimal_tuple (tup ) : NULL ;
736
740
}
737
741
738
742
/*
0 commit comments