@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
110
110
static void ParallelErrorContext (void * arg );
111
111
static void ParallelExtensionTrampoline (dsm_segment * seg , shm_toc * toc );
112
112
static void ParallelWorkerMain (Datum main_arg );
113
+ static void WaitForParallelWorkersToExit (ParallelContext * pcxt );
113
114
114
115
/*
115
116
* Establish a new parallel context. This should be done after entering
@@ -383,6 +384,46 @@ InitializeParallelDSM(ParallelContext *pcxt)
383
384
MemoryContextSwitchTo (oldcontext );
384
385
}
385
386
387
+ /*
388
+ * Reinitialize the dynamic shared memory segment for a parallel context such
389
+ * that we could launch workers for it again.
390
+ */
391
+ void
392
+ ReinitializeParallelDSM (ParallelContext * pcxt )
393
+ {
394
+ FixedParallelState * fps ;
395
+ char * error_queue_space ;
396
+ int i ;
397
+
398
+ if (pcxt -> nworkers_launched == 0 )
399
+ return ;
400
+
401
+ WaitForParallelWorkersToFinish (pcxt );
402
+ WaitForParallelWorkersToExit (pcxt );
403
+
404
+ /* Reset a few bits of fixed parallel state to a clean state. */
405
+ fps = shm_toc_lookup (pcxt -> toc , PARALLEL_KEY_FIXED );
406
+ fps -> workers_attached = 0 ;
407
+ fps -> last_xlog_end = 0 ;
408
+
409
+ /* Recreate error queues. */
410
+ error_queue_space =
411
+ shm_toc_lookup (pcxt -> toc , PARALLEL_KEY_ERROR_QUEUE );
412
+ for (i = 0 ; i < pcxt -> nworkers ; ++ i )
413
+ {
414
+ char * start ;
415
+ shm_mq * mq ;
416
+
417
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE ;
418
+ mq = shm_mq_create (start , PARALLEL_ERROR_QUEUE_SIZE );
419
+ shm_mq_set_receiver (mq , MyProc );
420
+ pcxt -> worker [i ].error_mqh = shm_mq_attach (mq , pcxt -> seg , NULL );
421
+ }
422
+
423
+ /* Reset number of workers launched. */
424
+ pcxt -> nworkers_launched = 0 ;
425
+ }
426
+
386
427
/*
387
428
* Launch parallel workers.
388
429
*/
@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt)
404
445
/* We might be running in a short-lived memory context. */
405
446
oldcontext = MemoryContextSwitchTo (TopTransactionContext );
406
447
407
- /*
408
- * This function can be called for a parallel context for which it has
409
- * already been called previously, but only if all of the old workers
410
- * have already exited. When this case arises, we need to do some extra
411
- * reinitialization.
412
- */
413
- if (pcxt -> nworkers_launched > 0 )
414
- {
415
- FixedParallelState * fps ;
416
- char * error_queue_space ;
417
-
418
- /* Clean out old worker handles. */
419
- for (i = 0 ; i < pcxt -> nworkers ; ++ i )
420
- {
421
- if (pcxt -> worker [i ].error_mqh != NULL )
422
- elog (ERROR , "previously launched worker still alive" );
423
- if (pcxt -> worker [i ].bgwhandle != NULL )
424
- {
425
- pfree (pcxt -> worker [i ].bgwhandle );
426
- pcxt -> worker [i ].bgwhandle = NULL ;
427
- }
428
- }
429
-
430
- /* Reset a few bits of fixed parallel state to a clean state. */
431
- fps = shm_toc_lookup (pcxt -> toc , PARALLEL_KEY_FIXED );
432
- fps -> workers_attached = 0 ;
433
- fps -> last_xlog_end = 0 ;
434
-
435
- /* Recreate error queues. */
436
- error_queue_space =
437
- shm_toc_lookup (pcxt -> toc , PARALLEL_KEY_ERROR_QUEUE );
438
- for (i = 0 ; i < pcxt -> nworkers ; ++ i )
439
- {
440
- char * start ;
441
- shm_mq * mq ;
442
-
443
- start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE ;
444
- mq = shm_mq_create (start , PARALLEL_ERROR_QUEUE_SIZE );
445
- shm_mq_set_receiver (mq , MyProc );
446
- pcxt -> worker [i ].error_mqh = shm_mq_attach (mq , pcxt -> seg , NULL );
447
- }
448
-
449
- /* Reset number of workers launched. */
450
- pcxt -> nworkers_launched = 0 ;
451
- }
452
-
453
448
/* Configure a worker. */
454
449
snprintf (worker .bgw_name , BGW_MAXLEN , "parallel worker for PID %d" ,
455
450
MyProcPid );
@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
501
496
}
502
497
503
498
/*
504
- * Wait for all workers to exit .
499
+ * Wait for all workers to finish computing .
505
500
*
506
501
* Even if the parallel operation seems to have completed successfully, it's
507
502
* important to call this function afterwards. We must not miss any errors
@@ -552,6 +547,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
552
547
}
553
548
}
554
549
550
+ /*
551
+ * Wait for all workers to exit.
552
+ *
553
+ * This function ensures that workers have been completely shutdown. The
554
+ * difference between WaitForParallelWorkersToFinish and this function is
555
+ * that former just ensures that last message sent by worker backend is
556
+ * received by master backend whereas this ensures the complete shutdown.
557
+ */
558
+ static void
559
+ WaitForParallelWorkersToExit (ParallelContext * pcxt )
560
+ {
561
+ int i ;
562
+
563
+ /* Wait until the workers actually die. */
564
+ for (i = 0 ; i < pcxt -> nworkers ; ++ i )
565
+ {
566
+ BgwHandleStatus status ;
567
+
568
+ if (pcxt -> worker == NULL || pcxt -> worker [i ].bgwhandle == NULL )
569
+ continue ;
570
+
571
+ status = WaitForBackgroundWorkerShutdown (pcxt -> worker [i ].bgwhandle );
572
+
573
+ /*
574
+ * If the postmaster kicked the bucket, we have no chance of cleaning
575
+ * up safely -- we won't be able to tell when our workers are actually
576
+ * dead. This doesn't necessitate a PANIC since they will all abort
577
+ * eventually, but we can't safely continue this session.
578
+ */
579
+ if (status == BGWH_POSTMASTER_DIED )
580
+ ereport (FATAL ,
581
+ (errcode (ERRCODE_ADMIN_SHUTDOWN ),
582
+ errmsg ("postmaster exited during a parallel transaction" )));
583
+
584
+ /* Release memory. */
585
+ pfree (pcxt -> worker [i ].bgwhandle );
586
+ pcxt -> worker [i ].bgwhandle = NULL ;
587
+ }
588
+ }
589
+
555
590
/*
556
591
* Destroy a parallel context.
557
592
*
@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt)
578
613
{
579
614
for (i = 0 ; i < pcxt -> nworkers ; ++ i )
580
615
{
581
- if (pcxt -> worker [i ].bgwhandle != NULL )
582
- TerminateBackgroundWorker (pcxt -> worker [i ].bgwhandle );
583
616
if (pcxt -> worker [i ].error_mqh != NULL )
584
617
{
618
+ TerminateBackgroundWorker (pcxt -> worker [i ].bgwhandle );
619
+
585
620
pfree (pcxt -> worker [i ].error_mqh );
586
621
pcxt -> worker [i ].error_mqh = NULL ;
587
622
}
@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt)
609
644
pcxt -> private_memory = NULL ;
610
645
}
611
646
612
- /* Wait until the workers actually die. */
613
- for (i = 0 ; i < pcxt -> nworkers ; ++ i )
614
- {
615
- BgwHandleStatus status ;
616
-
617
- if (pcxt -> worker == NULL || pcxt -> worker [i ].bgwhandle == NULL )
618
- continue ;
619
-
620
- /*
621
- * We can't finish transaction commit or abort until all of the
622
- * workers are dead. This means, in particular, that we can't respond
623
- * to interrupts at this stage.
624
- */
625
- HOLD_INTERRUPTS ();
626
- status = WaitForBackgroundWorkerShutdown (pcxt -> worker [i ].bgwhandle );
627
- RESUME_INTERRUPTS ();
628
-
629
- /*
630
- * If the postmaster kicked the bucket, we have no chance of cleaning
631
- * up safely -- we won't be able to tell when our workers are actually
632
- * dead. This doesn't necessitate a PANIC since they will all abort
633
- * eventually, but we can't safely continue this session.
634
- */
635
- if (status == BGWH_POSTMASTER_DIED )
636
- ereport (FATAL ,
637
- (errcode (ERRCODE_ADMIN_SHUTDOWN ),
638
- errmsg ("postmaster exited during a parallel transaction" )));
639
-
640
- /* Release memory. */
641
- pfree (pcxt -> worker [i ].bgwhandle );
642
- pcxt -> worker [i ].bgwhandle = NULL ;
643
- }
647
+ /*
648
+ * We can't finish transaction commit or abort until all of the
649
+ * workers have exited. This means, in particular, that we can't respond
650
+ * to interrupts at this stage.
651
+ */
652
+ HOLD_INTERRUPTS ();
653
+ WaitForParallelWorkersToExit (pcxt );
654
+ RESUME_INTERRUPTS ();
644
655
645
656
/* Free the worker array itself. */
646
657
if (pcxt -> worker != NULL )
@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
799
810
800
811
case 'X' : /* Terminate, indicating clean exit */
801
812
{
802
- pfree (pcxt -> worker [i ].bgwhandle );
803
813
pfree (pcxt -> worker [i ].error_mqh );
804
- pcxt -> worker [i ].bgwhandle = NULL ;
805
814
pcxt -> worker [i ].error_mqh = NULL ;
806
815
break ;
807
816
}
0 commit comments