Skip to content

Commit 3a1f861

Browse files
committed
Update parallel executor support to reuse the same DSM.
Commit b0b0d84 purported to make it possible to relaunch workers using the same parallel context, but it had an unpleasant race condition: we might reinitialize after the workers have sent their last control message but before they have dettached the DSM, leaving to crashes. Repair by introducing a new ParallelContext operation, ReinitializeParallelDSM. Adjust execParallel.c to use this new support, so that we can rescan a Gather node by relaunching workers but without needing to recreate the DSM. Amit Kapila, with some adjustments by me. Extracted from latest parallel sequential scan patch.
1 parent c6baec9 commit 3a1f861

File tree

6 files changed

+166
-108
lines changed

6 files changed

+166
-108
lines changed

src/backend/access/transam/README.parallel

+6-4
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,9 @@ pattern looks like this:
222222

223223
ExitParallelMode();
224224

225-
If desired, after WaitForParallelWorkersToFinish() has been called, another
226-
call to LaunchParallelWorkers() can be made using the same parallel context.
227-
Calls to these two functions can be alternated any number of times before
228-
destroying the parallel context.
225+
If desired, after WaitForParallelWorkersToFinish() has been called, the
226+
context can be reset so that workers can be launched anew using the same
227+
parallel context. To do this, first call ReinitializeParallelDSM() to
228+
reinitialize state managed by the parallel context machinery itself; then,
229+
perform any other necessary resetting of state; after that, you can again
230+
call LaunchParallelWorkers.

src/backend/access/transam/parallel.c

+92-83
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
110110
static void ParallelErrorContext(void *arg);
111111
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
112112
static void ParallelWorkerMain(Datum main_arg);
113+
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
113114

114115
/*
115116
* Establish a new parallel context. This should be done after entering
@@ -383,6 +384,46 @@ InitializeParallelDSM(ParallelContext *pcxt)
383384
MemoryContextSwitchTo(oldcontext);
384385
}
385386

387+
/*
388+
* Reinitialize the dynamic shared memory segment for a parallel context such
389+
* that we could launch workers for it again.
390+
*/
391+
void
392+
ReinitializeParallelDSM(ParallelContext *pcxt)
393+
{
394+
FixedParallelState *fps;
395+
char *error_queue_space;
396+
int i;
397+
398+
if (pcxt->nworkers_launched == 0)
399+
return;
400+
401+
WaitForParallelWorkersToFinish(pcxt);
402+
WaitForParallelWorkersToExit(pcxt);
403+
404+
/* Reset a few bits of fixed parallel state to a clean state. */
405+
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
406+
fps->workers_attached = 0;
407+
fps->last_xlog_end = 0;
408+
409+
/* Recreate error queues. */
410+
error_queue_space =
411+
shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
412+
for (i = 0; i < pcxt->nworkers; ++i)
413+
{
414+
char *start;
415+
shm_mq *mq;
416+
417+
start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
418+
mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
419+
shm_mq_set_receiver(mq, MyProc);
420+
pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
421+
}
422+
423+
/* Reset number of workers launched. */
424+
pcxt->nworkers_launched = 0;
425+
}
426+
386427
/*
387428
* Launch parallel workers.
388429
*/
@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt)
404445
/* We might be running in a short-lived memory context. */
405446
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
406447

407-
/*
408-
* This function can be called for a parallel context for which it has
409-
* already been called previously, but only if all of the old workers
410-
* have already exited. When this case arises, we need to do some extra
411-
* reinitialization.
412-
*/
413-
if (pcxt->nworkers_launched > 0)
414-
{
415-
FixedParallelState *fps;
416-
char *error_queue_space;
417-
418-
/* Clean out old worker handles. */
419-
for (i = 0; i < pcxt->nworkers; ++i)
420-
{
421-
if (pcxt->worker[i].error_mqh != NULL)
422-
elog(ERROR, "previously launched worker still alive");
423-
if (pcxt->worker[i].bgwhandle != NULL)
424-
{
425-
pfree(pcxt->worker[i].bgwhandle);
426-
pcxt->worker[i].bgwhandle = NULL;
427-
}
428-
}
429-
430-
/* Reset a few bits of fixed parallel state to a clean state. */
431-
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
432-
fps->workers_attached = 0;
433-
fps->last_xlog_end = 0;
434-
435-
/* Recreate error queues. */
436-
error_queue_space =
437-
shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
438-
for (i = 0; i < pcxt->nworkers; ++i)
439-
{
440-
char *start;
441-
shm_mq *mq;
442-
443-
start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
444-
mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
445-
shm_mq_set_receiver(mq, MyProc);
446-
pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
447-
}
448-
449-
/* Reset number of workers launched. */
450-
pcxt->nworkers_launched = 0;
451-
}
452-
453448
/* Configure a worker. */
454449
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
455450
MyProcPid);
@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
501496
}
502497

503498
/*
504-
* Wait for all workers to exit.
499+
* Wait for all workers to finish computing.
505500
*
506501
* Even if the parallel operation seems to have completed successfully, it's
507502
* important to call this function afterwards. We must not miss any errors
@@ -552,6 +547,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
552547
}
553548
}
554549

550+
/*
551+
* Wait for all workers to exit.
552+
*
553+
* This function ensures that workers have been completely shutdown. The
554+
* difference between WaitForParallelWorkersToFinish and this function is
555+
* that former just ensures that last message sent by worker backend is
556+
* received by master backend whereas this ensures the complete shutdown.
557+
*/
558+
static void
559+
WaitForParallelWorkersToExit(ParallelContext *pcxt)
560+
{
561+
int i;
562+
563+
/* Wait until the workers actually die. */
564+
for (i = 0; i < pcxt->nworkers; ++i)
565+
{
566+
BgwHandleStatus status;
567+
568+
if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
569+
continue;
570+
571+
status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
572+
573+
/*
574+
* If the postmaster kicked the bucket, we have no chance of cleaning
575+
* up safely -- we won't be able to tell when our workers are actually
576+
* dead. This doesn't necessitate a PANIC since they will all abort
577+
* eventually, but we can't safely continue this session.
578+
*/
579+
if (status == BGWH_POSTMASTER_DIED)
580+
ereport(FATAL,
581+
(errcode(ERRCODE_ADMIN_SHUTDOWN),
582+
errmsg("postmaster exited during a parallel transaction")));
583+
584+
/* Release memory. */
585+
pfree(pcxt->worker[i].bgwhandle);
586+
pcxt->worker[i].bgwhandle = NULL;
587+
}
588+
}
589+
555590
/*
556591
* Destroy a parallel context.
557592
*
@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt)
578613
{
579614
for (i = 0; i < pcxt->nworkers; ++i)
580615
{
581-
if (pcxt->worker[i].bgwhandle != NULL)
582-
TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
583616
if (pcxt->worker[i].error_mqh != NULL)
584617
{
618+
TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
619+
585620
pfree(pcxt->worker[i].error_mqh);
586621
pcxt->worker[i].error_mqh = NULL;
587622
}
@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt)
609644
pcxt->private_memory = NULL;
610645
}
611646

612-
/* Wait until the workers actually die. */
613-
for (i = 0; i < pcxt->nworkers; ++i)
614-
{
615-
BgwHandleStatus status;
616-
617-
if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
618-
continue;
619-
620-
/*
621-
* We can't finish transaction commit or abort until all of the
622-
* workers are dead. This means, in particular, that we can't respond
623-
* to interrupts at this stage.
624-
*/
625-
HOLD_INTERRUPTS();
626-
status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
627-
RESUME_INTERRUPTS();
628-
629-
/*
630-
* If the postmaster kicked the bucket, we have no chance of cleaning
631-
* up safely -- we won't be able to tell when our workers are actually
632-
* dead. This doesn't necessitate a PANIC since they will all abort
633-
* eventually, but we can't safely continue this session.
634-
*/
635-
if (status == BGWH_POSTMASTER_DIED)
636-
ereport(FATAL,
637-
(errcode(ERRCODE_ADMIN_SHUTDOWN),
638-
errmsg("postmaster exited during a parallel transaction")));
639-
640-
/* Release memory. */
641-
pfree(pcxt->worker[i].bgwhandle);
642-
pcxt->worker[i].bgwhandle = NULL;
643-
}
647+
/*
648+
* We can't finish transaction commit or abort until all of the
649+
* workers have exited. This means, in particular, that we can't respond
650+
* to interrupts at this stage.
651+
*/
652+
HOLD_INTERRUPTS();
653+
WaitForParallelWorkersToExit(pcxt);
654+
RESUME_INTERRUPTS();
644655

645656
/* Free the worker array itself. */
646657
if (pcxt->worker != NULL)
@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
799810

800811
case 'X': /* Terminate, indicating clean exit */
801812
{
802-
pfree(pcxt->worker[i].bgwhandle);
803813
pfree(pcxt->worker[i].error_mqh);
804-
pcxt->worker[i].bgwhandle = NULL;
805814
pcxt->worker[i].error_mqh = NULL;
806815
break;
807816
}

src/backend/executor/execParallel.c

+26-7
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node,
8484
ExecParallelEstimateContext *e);
8585
static bool ExecParallelInitializeDSM(PlanState *node,
8686
ExecParallelInitializeDSMContext *d);
87-
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt);
87+
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
88+
bool reinitialize);
8889
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
8990
SharedExecutorInstrumentation *instrumentation);
9091

@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
217218
* to the main backend and start the workers.
218219
*/
219220
static shm_mq_handle **
220-
ExecParallelSetupTupleQueues(ParallelContext *pcxt)
221+
ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
221222
{
222223
shm_mq_handle **responseq;
223224
char *tqueuespace;
@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
231232
responseq = (shm_mq_handle **)
232233
palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
233234

234-
/* Allocate space from the DSM for the queues themselves. */
235-
tqueuespace = shm_toc_allocate(pcxt->toc,
236-
PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
235+
/*
236+
* If not reinitializing, allocate space from the DSM for the queues;
237+
* otherwise, find the already allocated space.
238+
*/
239+
if (!reinitialize)
240+
tqueuespace =
241+
shm_toc_allocate(pcxt->toc,
242+
PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
243+
else
244+
tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
237245

238246
/* Create the queues, and become the receiver for each. */
239247
for (i = 0; i < pcxt->nworkers; ++i)
@@ -248,12 +256,23 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
248256
}
249257

250258
/* Add array of queues to shm_toc, so others can find it. */
251-
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
259+
if (!reinitialize)
260+
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
252261

253262
/* Return array of handles. */
254263
return responseq;
255264
}
256265

266+
/*
267+
* Re-initialize the response queues for backend workers to return tuples
268+
* to the main backend and start the workers.
269+
*/
270+
shm_mq_handle **
271+
ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
272+
{
273+
return ExecParallelSetupTupleQueues(pcxt, true);
274+
}
275+
257276
/*
258277
* Sets up the required infrastructure for backend workers to perform
259278
* execution and return results to the main backend.
@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
363382
pei->buffer_usage = bufusage_space;
364383

365384
/* Set up tuple queues. */
366-
pei->tqueue = ExecParallelSetupTupleQueues(pcxt);
385+
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
367386

368387
/*
369388
* If instrumentation options were supplied, allocate space for the

0 commit comments

Comments
 (0)