Skip to content

Commit 0c84199

Browse files
committed
Fix parallel pg_dump/pg_restore for failure to create worker processes.
If we failed to fork a worker process, or create a communication pipe for one, WaitForTerminatingWorkers would suffer an assertion failure if assert-enabled, otherwise crash or go into an infinite loop. This was a consequence of not accounting for the startup condition where we've not yet forked all the workers. The original bug was that ParallelBackupStart would set workerStatus to WRKR_IDLE before it had successfully forked a worker. I made things worse in commit b7b8cc0 by not understanding the undocumented fact that the WRKR_TERMINATED state was also meant to represent the case where a worker hadn't been started yet: I changed enum T_WorkerStatus so that *all* the worker slots were initially in WRKR_IDLE state. But this wasn't any more broken in practice, since even one slot in the wrong state would keep WaitForTerminatingWorkers from terminating. In v10 and later, introduce an explicit T_WorkerStatus value for worker-not-started, in hopes of preventing future oversights of the same ilk. Before that, just document that WRKR_TERMINATED is supposed to cover that case (partly because it wasn't actively broken, and partly because the enum is exposed outside parallel.c in those branches, so there's microscopically more risk involved in changing it). In all branches, introduce a WORKER_IS_RUNNING status test macro to hide which T_WorkerStatus values mean that, and be more careful not to access ParallelSlot fields till we're sure they're valid. Per report from Vignesh C, though this is my patch not his. Back-patch to all supported branches. Discussion: https://postgr.es/m/CALDaNm1Luv-E3sarR+-unz-BjchquHHyfP+YC+2FS2pt_J+wxg@mail.gmail.com
1 parent 706ad6a commit 0c84199

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

src/bin/pg_dump/parallel.c

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
*
4343
* In the master process, the workerStatus field for each worker has one of
4444
* the following values:
45+
* WRKR_NOT_STARTED: we've not yet forked this worker
4546
* WRKR_IDLE: it's waiting for a command
4647
* WRKR_WORKING: it's working on a command
4748
* WRKR_TERMINATED: process ended
@@ -76,11 +77,15 @@
7677
/* Worker process statuses */
7778
typedef enum
7879
{
80+
WRKR_NOT_STARTED = 0,
7981
WRKR_IDLE,
8082
WRKR_WORKING,
8183
WRKR_TERMINATED
8284
} T_WorkerStatus;
8385

86+
#define WORKER_IS_RUNNING(workerStatus) \
87+
((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
88+
8489
/*
8590
* Private per-parallel-worker state (typedef for this is in parallel.h).
8691
*
@@ -413,7 +418,9 @@ ShutdownWorkersHard(ParallelState *pstate)
413418

414419
/*
415420
* Close our write end of the sockets so that any workers waiting for
416-
* commands know they can exit.
421+
* commands know they can exit. (Note: some of the pipeWrite fields might
422+
* still be zero, if we failed to initialize all the workers. Hence, just
423+
* ignore errors here.)
417424
*/
418425
for (i = 0; i < pstate->numWorkers; i++)
419426
closesocket(pstate->parallelSlot[i].pipeWrite);
@@ -487,7 +494,7 @@ WaitForTerminatingWorkers(ParallelState *pstate)
487494

488495
for (j = 0; j < pstate->numWorkers; j++)
489496
{
490-
if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
497+
if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
491498
{
492499
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
493500
nrun++;
@@ -923,6 +930,7 @@ ParallelBackupStart(ArchiveHandle *AH)
923930
if (AH->public.numWorkers == 1)
924931
return pstate;
925932

933+
/* Create status arrays, being sure to initialize all fields to 0 */
926934
pstate->te = (TocEntry **)
927935
pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
928936
pstate->parallelSlot = (ParallelSlot *)
@@ -970,13 +978,6 @@ ParallelBackupStart(ArchiveHandle *AH)
970978
if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
971979
fatal("could not create communication channels: %m");
972980

973-
pstate->te[i] = NULL; /* just for safety */
974-
975-
slot->workerStatus = WRKR_IDLE;
976-
slot->AH = NULL;
977-
slot->callback = NULL;
978-
slot->callback_data = NULL;
979-
980981
/* master's ends of the pipes */
981982
slot->pipeRead = pipeWM[PIPE_READ];
982983
slot->pipeWrite = pipeMW[PIPE_WRITE];
@@ -994,6 +995,7 @@ ParallelBackupStart(ArchiveHandle *AH)
994995
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
995996
wi, 0, &(slot->threadId));
996997
slot->hThread = handle;
998+
slot->workerStatus = WRKR_IDLE;
997999
#else /* !WIN32 */
9981000
pid = fork();
9991001
if (pid == 0)
@@ -1036,6 +1038,7 @@ ParallelBackupStart(ArchiveHandle *AH)
10361038

10371039
/* In Master after successful fork */
10381040
slot->pid = pid;
1041+
slot->workerStatus = WRKR_IDLE;
10391042

10401043
/* close read end of Master -> Worker */
10411044
closesocket(pipeMW[PIPE_READ]);
@@ -1263,7 +1266,7 @@ GetIdleWorker(ParallelState *pstate)
12631266
}
12641267

12651268
/*
1266-
* Return true iff every worker is in the WRKR_TERMINATED state.
1269+
* Return true iff no worker is running.
12671270
*/
12681271
static bool
12691272
HasEveryWorkerTerminated(ParallelState *pstate)
@@ -1272,7 +1275,7 @@ HasEveryWorkerTerminated(ParallelState *pstate)
12721275

12731276
for (i = 0; i < pstate->numWorkers; i++)
12741277
{
1275-
if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
1278+
if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
12761279
return false;
12771280
}
12781281
return true;
@@ -1604,7 +1607,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
16041607
FD_ZERO(&workerset);
16051608
for (i = 0; i < pstate->numWorkers; i++)
16061609
{
1607-
if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
1610+
if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
16081611
continue;
16091612
FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
16101613
if (pstate->parallelSlot[i].pipeRead > maxFd)
@@ -1629,6 +1632,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
16291632
{
16301633
char *msg;
16311634

1635+
if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1636+
continue;
16321637
if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
16331638
continue;
16341639

0 commit comments

Comments
 (0)