16
16
#include "postgres.h"
17
17
18
18
#include "access/relscan.h"
19
+ #include "access/xact.h"
19
20
#include "executor/execdebug.h"
20
21
#include "executor/execParallel.h"
21
22
#include "executor/nodeGather.h"
@@ -45,7 +46,6 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
45
46
gatherstate = makeNode (GatherState );
46
47
gatherstate -> ps .plan = (Plan * ) node ;
47
48
gatherstate -> ps .state = estate ;
48
- gatherstate -> need_to_scan_workers = false;
49
49
gatherstate -> need_to_scan_locally = !node -> single_copy ;
50
50
51
51
/*
@@ -106,52 +106,57 @@ ExecGather(GatherState *node)
106
106
* needs to allocate large dynamic segement, so it is better to do if it
107
107
* is really needed.
108
108
*/
109
- if (!node -> pei )
109
+ if (!node -> initialized )
110
110
{
111
111
EState * estate = node -> ps .state ;
112
-
113
- /* Initialize the workers required to execute Gather node. */
114
- node -> pei = ExecInitParallelPlan (node -> ps .lefttree ,
115
- estate ,
116
- ((Gather * ) (node -> ps .plan ))-> num_workers );
112
+ Gather * gather = (Gather * ) node -> ps .plan ;
117
113
118
114
/*
119
- * Register backend workers. If the required number of workers are not
120
- * available then we perform the scan with available workers and if
121
- * there are no more workers available, then the Gather node will just
122
- * scan locally.
115
+ * Sometimes we might have to run without parallelism; but if
116
+ * parallel mode is active then we can try to fire up some workers.
123
117
*/
124
- LaunchParallelWorkers (node -> pei -> pcxt );
125
-
126
- node -> funnel = CreateTupleQueueFunnel ();
127
-
128
- for (i = 0 ; i < node -> pei -> pcxt -> nworkers ; ++ i )
118
+ if (gather -> num_workers > 0 && IsInParallelMode ())
129
119
{
130
- if (node -> pei -> pcxt -> worker [i ].bgwhandle )
120
+ bool got_any_worker = false;
121
+
122
+ /* Initialize the workers required to execute Gather node. */
123
+ node -> pei = ExecInitParallelPlan (node -> ps .lefttree ,
124
+ estate ,
125
+ gather -> num_workers );
126
+
127
+ /*
128
+ * Register backend workers. We might not get as many as we
129
+ * requested, or indeed any at all.
130
+ */
131
+ LaunchParallelWorkers (node -> pei -> pcxt );
132
+
133
+ /* Set up a tuple queue to collect the results. */
134
+ node -> funnel = CreateTupleQueueFunnel ();
135
+ for (i = 0 ; i < node -> pei -> pcxt -> nworkers ; ++ i )
131
136
{
132
- shm_mq_set_handle (node -> pei -> tqueue [i ],
133
- node -> pei -> pcxt -> worker [i ].bgwhandle );
134
- RegisterTupleQueueOnFunnel (node -> funnel , node -> pei -> tqueue [i ]);
135
- node -> need_to_scan_workers = true;
137
+ if (node -> pei -> pcxt -> worker [i ].bgwhandle )
138
+ {
139
+ shm_mq_set_handle (node -> pei -> tqueue [i ],
140
+ node -> pei -> pcxt -> worker [i ].bgwhandle );
141
+ RegisterTupleQueueOnFunnel (node -> funnel ,
142
+ node -> pei -> tqueue [i ]);
143
+ got_any_worker = true;
144
+ }
136
145
}
146
+
147
+ /* No workers? Then never mind. */
148
+ if (!got_any_worker )
149
+ ExecShutdownGather (node );
137
150
}
138
151
139
- /* If no workers are available, we must always scan locally. */
140
- if (!node -> need_to_scan_workers )
141
- node -> need_to_scan_locally = true;
152
+ /* Run plan locally if no workers or not single-copy. */
153
+ node -> need_to_scan_locally = (node -> funnel == NULL )
154
+ || !gather -> single_copy ;
155
+ node -> initialized = true;
142
156
}
143
157
144
158
slot = gather_getnext (node );
145
159
146
- if (TupIsNull (slot ))
147
- {
148
- /*
149
- * Destroy the parallel context once we complete fetching all the
150
- * tuples. Otherwise, the DSM and workers will stick around for the
151
- * lifetime of the entire statement.
152
- */
153
- ExecShutdownGather (node );
154
- }
155
160
return slot ;
156
161
}
157
162
@@ -194,10 +199,9 @@ gather_getnext(GatherState *gatherstate)
194
199
*/
195
200
slot = gatherstate -> ps .ps_ProjInfo -> pi_slot ;
196
201
197
- while (gatherstate -> need_to_scan_workers ||
198
- gatherstate -> need_to_scan_locally )
202
+ while (gatherstate -> funnel != NULL || gatherstate -> need_to_scan_locally )
199
203
{
200
- if (gatherstate -> need_to_scan_workers )
204
+ if (gatherstate -> funnel != NULL )
201
205
{
202
206
bool done = false;
203
207
@@ -206,7 +210,7 @@ gather_getnext(GatherState *gatherstate)
206
210
gatherstate -> need_to_scan_locally ,
207
211
& done );
208
212
if (done )
209
- gatherstate -> need_to_scan_workers = false ;
213
+ ExecShutdownGather ( gatherstate ) ;
210
214
211
215
if (HeapTupleIsValid (tup ))
212
216
{
@@ -247,30 +251,20 @@ gather_getnext(GatherState *gatherstate)
247
251
void
248
252
ExecShutdownGather (GatherState * node )
249
253
{
250
- Gather * gather ;
251
-
252
- if (node -> pei == NULL || node -> pei -> pcxt == NULL )
253
- return ;
254
-
255
- /*
256
- * Ensure all workers have finished before destroying the parallel context
257
- * to ensure a clean exit.
258
- */
259
- if (node -> funnel )
254
+ /* Shut down tuple queue funnel before shutting down workers. */
255
+ if (node -> funnel != NULL )
260
256
{
261
257
DestroyTupleQueueFunnel (node -> funnel );
262
258
node -> funnel = NULL ;
263
259
}
264
260
265
- ExecParallelFinish (node -> pei );
266
-
267
- /* destroy parallel context. */
268
- DestroyParallelContext (node -> pei -> pcxt );
269
- node -> pei -> pcxt = NULL ;
270
-
271
- gather = (Gather * ) node -> ps .plan ;
272
- node -> need_to_scan_locally = !gather -> single_copy ;
273
- node -> need_to_scan_workers = false;
261
+ /* Now shut down the workers. */
262
+ if (node -> pei != NULL )
263
+ {
264
+ ExecParallelFinish (node -> pei );
265
+ ExecParallelCleanup (node -> pei );
266
+ node -> pei = NULL ;
267
+ }
274
268
}
275
269
276
270
/* ----------------------------------------------------------------
@@ -295,5 +289,7 @@ ExecReScanGather(GatherState *node)
295
289
*/
296
290
ExecShutdownGather (node );
297
291
292
+ node -> initialized = false;
293
+
298
294
ExecReScan (node -> ps .lefttree );
299
295
}
0 commit comments