Skip to content

Commit 5d60df8

Browse files
committed
Fix parallel pg_dump/pg_restore for failure to create worker processes.
If we failed to fork a worker process, or create a communication pipe for one, WaitForTerminatingWorkers would suffer an assertion failure if assert-enabled, otherwise crash or go into an infinite loop. This was a consequence of not accounting for the startup condition where we've not yet forked all the workers. The original bug was that ParallelBackupStart would set workerStatus to WRKR_IDLE before it had successfully forked a worker. I made things worse in commit b7b8cc0 by not understanding the undocumented fact that the WRKR_TERMINATED state was also meant to represent the case where a worker hadn't been started yet: I changed enum T_WorkerStatus so that *all* the worker slots were initially in WRKR_IDLE state. But this wasn't any more broken in practice, since even one slot in the wrong state would keep WaitForTerminatingWorkers from terminating. In v10 and later, introduce an explicit T_WorkerStatus value for worker-not-started, in hopes of preventing future oversights of the same ilk. Before that, just document that WRKR_TERMINATED is supposed to cover that case (partly because it wasn't actively broken, and partly because the enum is exposed outside parallel.c in those branches, so there's microscopically more risk involved in changing it). In all branches, introduce a WORKER_IS_RUNNING status test macro to hide which T_WorkerStatus values mean that, and be more careful not to access ParallelSlot fields till we're sure they're valid. Per report from Vignesh C, though this is my patch not his. Back-patch to all supported branches. Discussion: https://postgr.es/m/CALDaNm1Luv-E3sarR+-unz-BjchquHHyfP+YC+2FS2pt_J+wxg@mail.gmail.com
1 parent 56bc82a commit 5d60df8

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

src/bin/pg_dump/parallel.c

+17-11
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
* WRKR_IDLE: it's waiting for a command
5151
* WRKR_WORKING: it's been sent a command
5252
* WRKR_FINISHED: it's returned a result
53-
* WRKR_TERMINATED: process ended
53+
* WRKR_TERMINATED: process ended (or not started yet)
5454
* The FINISHED state indicates that the worker is idle, but we've not yet
5555
* dealt with the status code it returned from the prior command.
5656
* ReapWorkerStatus() extracts the unhandled command status value and sets
@@ -381,7 +381,9 @@ ShutdownWorkersHard(ParallelState *pstate)
381381

382382
/*
383383
* Close our write end of the sockets so that any workers waiting for
384-
* commands know they can exit.
384+
* commands know they can exit. (Note: some of the pipeWrite fields might
385+
* still be zero, if we failed to initialize all the workers. Hence, just
386+
* ignore errors here.)
385387
*/
386388
for (i = 0; i < pstate->numWorkers; i++)
387389
closesocket(pstate->parallelSlot[i].pipeWrite);
@@ -455,7 +457,7 @@ WaitForTerminatingWorkers(ParallelState *pstate)
455457

456458
for (j = 0; j < pstate->numWorkers; j++)
457459
{
458-
if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
460+
if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
459461
{
460462
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
461463
nrun++;
@@ -891,6 +893,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
891893
if (AH->public.numWorkers == 1)
892894
return pstate;
893895

896+
/* Create status array, being sure to initialize all fields to 0 */
894897
pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
895898
memset((void *) pstate->parallelSlot, 0, slotSize);
896899

@@ -932,17 +935,16 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
932935
int pipeMW[2],
933936
pipeWM[2];
934937

938+
slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
939+
slot->args->AH = NULL;
940+
slot->args->te = NULL;
941+
935942
/* Create communication pipes for this worker */
936943
if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
937944
exit_horribly(modulename,
938945
"could not create communication channels: %s\n",
939946
strerror(errno));
940947

941-
slot->workerStatus = WRKR_IDLE;
942-
slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
943-
slot->args->AH = NULL;
944-
slot->args->te = NULL;
945-
946948
/* master's ends of the pipes */
947949
slot->pipeRead = pipeWM[PIPE_READ];
948950
slot->pipeWrite = pipeMW[PIPE_WRITE];
@@ -961,6 +963,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
961963
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
962964
wi, 0, &(slot->threadId));
963965
slot->hThread = handle;
966+
slot->workerStatus = WRKR_IDLE;
964967
#else /* !WIN32 */
965968
pid = fork();
966969
if (pid == 0)
@@ -1005,6 +1008,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
10051008

10061009
/* In Master after successful fork */
10071010
slot->pid = pid;
1011+
slot->workerStatus = WRKR_IDLE;
10081012

10091013
/* close read end of Master -> Worker */
10101014
closesocket(pipeMW[PIPE_READ]);
@@ -1121,7 +1125,7 @@ GetIdleWorker(ParallelState *pstate)
11211125
}
11221126

11231127
/*
1124-
* Return true iff every worker is in the WRKR_TERMINATED state.
1128+
* Return true iff no worker is running.
11251129
*/
11261130
static bool
11271131
HasEveryWorkerTerminated(ParallelState *pstate)
@@ -1130,7 +1134,7 @@ HasEveryWorkerTerminated(ParallelState *pstate)
11301134

11311135
for (i = 0; i < pstate->numWorkers; i++)
11321136
{
1133-
if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
1137+
if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
11341138
return false;
11351139
}
11361140
return true;
@@ -1530,7 +1534,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
15301534
FD_ZERO(&workerset);
15311535
for (i = 0; i < pstate->numWorkers; i++)
15321536
{
1533-
if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
1537+
if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
15341538
continue;
15351539
FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
15361540
if (pstate->parallelSlot[i].pipeRead > maxFd)
@@ -1555,6 +1559,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
15551559
{
15561560
char *msg;
15571561

1562+
if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1563+
continue;
15581564
if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
15591565
continue;
15601566

src/bin/pg_dump/parallel.h

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ typedef enum
3232
WRKR_FINISHED
3333
} T_WorkerStatus;
3434

35+
#define WORKER_IS_RUNNING(workerStatus) \
36+
((workerStatus) != WRKR_TERMINATED)
37+
3538
/* Arguments needed for a worker process */
3639
typedef struct ParallelArgs
3740
{

0 commit comments

Comments
 (0)