Skip to content

Commit 2badb5a

Browse files
committed
Report an ERROR if a parallel worker fails to start properly.
Commit 28724fd fixed things so that if a background worker fails to start due to fork() failure or because it is terminated before startup succeeds, BGWH_STOPPED will be reported. However, that only helps if the code that uses the background worker machinery notices the change in status, and the code in parallel.c did not. To fix that, do two things. First, make sure that when a worker exits, it triggers the leader to read from error queues. That way, if a worker which has attached to an error queue exits uncleanly, the leader is sure to throw some error, either the contents of the ErrorResponse sent by the worker, or "lost connection to parallel worker" if it exited without sending one. To cover the case where the worker never starts up in the first place or exits before attaching to the error queue, the ParallelContext now keeps track of which workers have sent at least one message via the error queue. A worker which sends no messages by the time the parallel operation finishes will be checked to see whether it exited before attaching to the error queue; if so, a new error message, "parallel worker failed to initialize", will be reported. If not, we'll continue to wait until it either starts up and exits cleanly, starts up and exits uncleanly, or fails to start, and then take the appropriate action. Patch by me, reviewed by Amit Kapila. Discussion: http://postgr.es/m/CA+TgmoYnBgXgdTu6wk5YPdWhmgabYc9nY_pFLq=tB=FSLYkD8Q@mail.gmail.com
1 parent 160a4f6 commit 2badb5a

File tree

2 files changed

+110
-9
lines changed

2 files changed

+110
-9
lines changed

src/backend/access/transam/parallel.c

Lines changed: 109 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ static FixedParallelState *MyFixedParallelState;
113113
/* List of active parallel contexts. */
114114
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
115115

116+
/* Backend-local copy of data from FixedParallelState. */
117+
static pid_t ParallelMasterPid;
118+
116119
/*
117120
* List of internal parallel worker entry points. We need this for
118121
* reasons explained in LookupParallelWorkerFunction(), below.
@@ -133,6 +136,7 @@ static const struct
133136
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
134137
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
135138
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
139+
static void ParallelWorkerShutdown(int code, Datum arg);
136140

137141

138142
/*
@@ -433,6 +437,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
433437
WaitForParallelWorkersToFinish(pcxt);
434438
WaitForParallelWorkersToExit(pcxt);
435439
pcxt->nworkers_launched = 0;
440+
if (pcxt->any_message_received)
441+
{
442+
pfree(pcxt->any_message_received);
443+
pcxt->any_message_received = NULL;
444+
}
436445
}
437446

438447
/* Reset a few bits of fixed parallel state to a clean state. */
@@ -531,6 +540,14 @@ LaunchParallelWorkers(ParallelContext *pcxt)
531540
}
532541
}
533542

543+
/*
544+
* Now that nworkers_launched has taken its final value, we can initialize
545+
* any_message_received.
546+
*/
547+
if (pcxt->nworkers_launched > 0)
548+
pcxt->any_message_received =
549+
palloc0(sizeof(bool) * pcxt->nworkers_launched);
550+
534551
/* Restore previous memory context. */
535552
MemoryContextSwitchTo(oldcontext);
536553
}
@@ -552,6 +569,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
552569
for (;;)
553570
{
554571
bool anyone_alive = false;
572+
int nfinished = 0;
555573
int i;
556574

557575
/*
@@ -563,15 +581,78 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
563581

564582
for (i = 0; i < pcxt->nworkers_launched; ++i)
565583
{
566-
if (pcxt->worker[i].error_mqh != NULL)
584+
/*
585+
* If error_mqh is NULL, then the worker has already exited
586+
* cleanly. If we have received a message through error_mqh from
587+
* the worker, we know it started up cleanly, and therefore we're
588+
* certain to be notified when it exits.
589+
*/
590+
if (pcxt->worker[i].error_mqh == NULL)
591+
++nfinished;
592+
else if (pcxt->any_message_received[i])
567593
{
568594
anyone_alive = true;
569595
break;
570596
}
571597
}
572598

573599
if (!anyone_alive)
574-
break;
600+
{
601+
/* If all workers are known to have finished, we're done. */
602+
if (nfinished >= pcxt->nworkers_launched)
603+
{
604+
Assert(nfinished == pcxt->nworkers_launched);
605+
break;
606+
}
607+
608+
/*
609+
* We didn't detect any living workers, but not all workers are
610+
* known to have exited cleanly. Either not all workers have
611+
* launched yet, or maybe some of them failed to start or
612+
* terminated abnormally.
613+
*/
614+
for (i = 0; i < pcxt->nworkers_launched; ++i)
615+
{
616+
pid_t pid;
617+
shm_mq *mq;
618+
619+
/*
620+
* If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
621+
* should just keep waiting. If it is BGWH_STOPPED, then
622+
* further investigation is needed.
623+
*/
624+
if (pcxt->worker[i].error_mqh == NULL ||
625+
pcxt->worker[i].bgwhandle == NULL ||
626+
GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
627+
&pid) != BGWH_STOPPED)
628+
continue;
629+
630+
/*
631+
* Check whether the worker ended up stopped without ever
632+
* attaching to the error queue. If so, the postmaster was
633+
* unable to fork the worker or it exited without initializing
634+
* properly. We must throw an error, since the caller may
635+
* have been expecting the worker to do some work before
636+
* exiting.
637+
*/
638+
mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
639+
if (shm_mq_get_sender(mq) == NULL)
640+
ereport(ERROR,
641+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
642+
errmsg("parallel worker failed to initialize"),
643+
errhint("More details may be available in the server log.")));
644+
645+
/*
646+
* The worker is stopped, but is attached to the error queue.
647+
* Unless there's a bug somewhere, this will only happen when
648+
* the worker writes messages and terminates after the
649+
* CHECK_FOR_INTERRUPTS() near the top of this function and
650+
* before the call to GetBackgroundWorkerPid(). In that case,
651+
* or latch should have been set as well and the right things
652+
* will happen on the next pass through the loop.
653+
*/
654+
}
655+
}
575656

576657
WaitLatch(MyLatch, WL_LATCH_SET, -1,
577658
WAIT_EVENT_PARALLEL_FINISH);
@@ -828,6 +909,9 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
828909
{
829910
char msgtype;
830911

912+
if (pcxt->any_message_received != NULL)
913+
pcxt->any_message_received[i] = true;
914+
831915
msgtype = pq_getmsgbyte(msg);
832916

833917
switch (msgtype)
@@ -1024,11 +1108,16 @@ ParallelWorkerMain(Datum main_arg)
10241108
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
10251109
MyFixedParallelState = fps;
10261110

1111+
/* Arrange to signal the leader if we exit. */
1112+
ParallelMasterPid = fps->parallel_master_pid;
1113+
ParallelMasterBackendId = fps->parallel_master_backend_id;
1114+
on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
1115+
10271116
/*
1028-
* Now that we have a worker number, we can find and attach to the error
1029-
* queue provided for us. That's good, because until we do that, any
1030-
* errors that happen here will not be reported back to the process that
1031-
* requested that this worker be launched.
1117+
* Now we can find and attach to the error queue provided for us. That's
1118+
* good, because until we do that, any errors that happen here will not be
1119+
* reported back to the process that requested that this worker be
1120+
* launched.
10321121
*/
10331122
error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
10341123
mq = (shm_mq *) (error_queue_space +
@@ -1146,9 +1235,6 @@ ParallelWorkerMain(Datum main_arg)
11461235
SetTempNamespaceState(fps->temp_namespace_id,
11471236
fps->temp_toast_namespace_id);
11481237

1149-
/* Set ParallelMasterBackendId so we know how to address temp relations. */
1150-
ParallelMasterBackendId = fps->parallel_master_backend_id;
1151-
11521238
/* Restore reindex state. */
11531239
reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
11541240
RestoreReindexState(reindexspace);
@@ -1197,6 +1283,20 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
11971283
SpinLockRelease(&fps->mutex);
11981284
}
11991285

1286+
/*
1287+
* Make sure the leader tries to read from our error queue one more time.
1288+
* This guards against the case where we exit uncleanly without sending an
1289+
* ErrorResponse to the leader, for example because some code calls proc_exit
1290+
* directly.
1291+
*/
1292+
static void
1293+
ParallelWorkerShutdown(int code, Datum arg)
1294+
{
1295+
SendProcSignal(ParallelMasterPid,
1296+
PROCSIG_PARALLEL_MESSAGE,
1297+
ParallelMasterBackendId);
1298+
}
1299+
12001300
/*
12011301
* Look up (and possibly load) a parallel worker entry point function.
12021302
*

src/include/access/parallel.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ typedef struct ParallelContext
4343
void *private_memory;
4444
shm_toc *toc;
4545
ParallelWorkerInfo *worker;
46+
bool *any_message_received;
4647
} ParallelContext;
4748

4849
typedef struct ParallelWorkerContext

0 commit comments

Comments
 (0)