36
36
#include "executor/nodeGather.h"
37
37
#include "executor/nodeSubplan.h"
38
38
#include "executor/tqueue.h"
39
+ #include "utils/memutils.h"
39
40
#include "utils/rel.h"
40
41
41
42
@@ -50,6 +51,9 @@ GatherState *
50
51
ExecInitGather (Gather * node , EState * estate , int eflags )
51
52
{
52
53
GatherState * gatherstate ;
54
+ Plan * outerNode ;
55
+ bool hasoid ;
56
+ TupleDesc tupDesc ;
53
57
54
58
/* Gather node doesn't have innerPlan node. */
55
59
Assert (innerPlan (node ) == NULL );
@@ -82,13 +86,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
82
86
/*
83
87
* tuple table initialization
84
88
*/
89
+ gatherstate -> funnel_slot = ExecInitExtraTupleSlot (estate );
85
90
ExecInitResultTupleSlot (estate , & gatherstate -> ps );
86
91
87
92
/*
88
93
* now initialize outer plan
89
94
*/
90
- outerPlanState ( gatherstate ) = ExecInitNode ( outerPlan (node ), estate , eflags );
91
-
95
+ outerNode = outerPlan (node );
96
+ outerPlanState ( gatherstate ) = ExecInitNode ( outerNode , estate , eflags );
92
97
93
98
gatherstate -> ps .ps_TupFromTlist = false;
94
99
@@ -98,6 +103,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
98
103
ExecAssignResultTypeFromTL (& gatherstate -> ps );
99
104
ExecAssignProjectionInfo (& gatherstate -> ps , NULL );
100
105
106
+ /*
107
+ * Initialize funnel slot to same tuple descriptor as outer plan.
108
+ */
109
+ if (!ExecContextForcesOids (& gatherstate -> ps , & hasoid ))
110
+ hasoid = false;
111
+ tupDesc = ExecTypeFromTL (outerNode -> targetlist , hasoid );
112
+ ExecSetSlotDescriptor (gatherstate -> funnel_slot , tupDesc );
113
+
101
114
return gatherstate ;
102
115
}
103
116
@@ -113,6 +126,9 @@ ExecGather(GatherState *node)
113
126
{
114
127
int i ;
115
128
TupleTableSlot * slot ;
129
+ TupleTableSlot * resultSlot ;
130
+ ExprDoneCond isDone ;
131
+ ExprContext * econtext ;
116
132
117
133
/*
118
134
* Initialize the parallel context and workers on first execution. We do
@@ -169,7 +185,53 @@ ExecGather(GatherState *node)
169
185
node -> initialized = true;
170
186
}
171
187
172
- slot = gather_getnext (node );
188
+ /*
189
+ * Check to see if we're still projecting out tuples from a previous scan
190
+ * tuple (because there is a function-returning-set in the projection
191
+ * expressions). If so, try to project another one.
192
+ */
193
+ if (node -> ps .ps_TupFromTlist )
194
+ {
195
+ resultSlot = ExecProject (node -> ps .ps_ProjInfo , & isDone );
196
+ if (isDone == ExprMultipleResult )
197
+ return resultSlot ;
198
+ /* Done with that source tuple... */
199
+ node -> ps .ps_TupFromTlist = false;
200
+ }
201
+
202
+ /*
203
+ * Reset per-tuple memory context to free any expression evaluation
204
+ * storage allocated in the previous tuple cycle. Note we can't do this
205
+ * until we're done projecting.
206
+ */
207
+ econtext = node -> ps .ps_ExprContext ;
208
+ ResetExprContext (econtext );
209
+
210
+ /* Get and return the next tuple, projecting if necessary. */
211
+ for (;;)
212
+ {
213
+ /*
214
+ * Get next tuple, either from one of our workers, or by running the
215
+ * plan ourselves.
216
+ */
217
+ slot = gather_getnext (node );
218
+ if (TupIsNull (slot ))
219
+ return NULL ;
220
+
221
+ /*
222
+ * form the result tuple using ExecProject(), and return it --- unless
223
+ * the projection produces an empty set, in which case we must loop
224
+ * back around for another tuple
225
+ */
226
+ econtext -> ecxt_outertuple = slot ;
227
+ resultSlot = ExecProject (node -> ps .ps_ProjInfo , & isDone );
228
+
229
+ if (isDone != ExprEndResult )
230
+ {
231
+ node -> ps .ps_TupFromTlist = (isDone == ExprMultipleResult );
232
+ return resultSlot ;
233
+ }
234
+ }
173
235
174
236
return slot ;
175
237
}
@@ -201,18 +263,11 @@ ExecEndGather(GatherState *node)
201
263
static TupleTableSlot *
202
264
gather_getnext (GatherState * gatherstate )
203
265
{
204
- PlanState * outerPlan ;
266
+ PlanState * outerPlan = outerPlanState ( gatherstate ) ;
205
267
TupleTableSlot * outerTupleSlot ;
206
- TupleTableSlot * slot ;
268
+ TupleTableSlot * fslot = gatherstate -> funnel_slot ;
207
269
HeapTuple tup ;
208
270
209
- /*
210
- * We can use projection info of Gather for the tuples received from
211
- * worker backends as currently for all cases worker backends sends the
212
- * projected tuple as required by Gather node.
213
- */
214
- slot = gatherstate -> ps .ps_ProjInfo -> pi_slot ;
215
-
216
271
while (gatherstate -> funnel != NULL || gatherstate -> need_to_scan_locally )
217
272
{
218
273
if (gatherstate -> funnel != NULL )
@@ -229,19 +284,17 @@ gather_getnext(GatherState *gatherstate)
229
284
if (HeapTupleIsValid (tup ))
230
285
{
231
286
ExecStoreTuple (tup , /* tuple to store */
232
- slot , /* slot to store in */
287
+ fslot , /* slot in which to store the tuple */
233
288
InvalidBuffer , /* buffer associated with this
234
289
* tuple */
235
290
true); /* pfree this pointer if not from heap */
236
291
237
- return slot ;
292
+ return fslot ;
238
293
}
239
294
}
240
295
241
296
if (gatherstate -> need_to_scan_locally )
242
297
{
243
- outerPlan = outerPlanState (gatherstate );
244
-
245
298
outerTupleSlot = ExecProcNode (outerPlan );
246
299
247
300
if (!TupIsNull (outerTupleSlot ))
@@ -251,7 +304,7 @@ gather_getnext(GatherState *gatherstate)
251
304
}
252
305
}
253
306
254
- return ExecClearTuple (slot );
307
+ return ExecClearTuple (fslot );
255
308
}
256
309
257
310
/* ----------------------------------------------------------------
0 commit comments