Skip to content

Commit 715db0b

Browse files
committed
Clean up thread management in parallel pg_dump for Windows.
Since we start the worker threads with _beginthreadex(), we should use _endthreadex() to terminate them. We got this right in the normal-exit code path, but not so much during an error exit from a worker. In addition, be sure to apply CloseHandle to the thread handle after each thread exits. It's not clear that these oversights cause any user-visible problems, since the pg_dump run is about to terminate anyway. Still, it's clearly better to follow Microsoft's API specifications than ignore them. Also a few cosmetic cleanups in WaitForTerminatingWorkers(), including being a bit less random about where to cast between uintptr_t and HANDLE, and being sure to clear the worker identity field for each dead worker (not that false matches should be possible later, but let's be careful). Original observation and patch by Armin Schöffmann, cosmetic improvements by Michael Paquier and me. (Armin's patch also included closing sockets in ShutdownWorkersHard(), but that's been dealt with already in commit df8d2d8.) Back-patch to 9.3 where parallel pg_dump was introduced. Discussion: <zarafa.570306bd.3418.074bf1420d8f2ba2@root.aegaeon.de>
1 parent cea17ba commit 715db0b

File tree

2 files changed

+37
-18
lines changed

2 files changed

+37
-18
lines changed

src/bin/pg_dump/parallel.c

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ ShutdownWorkersHard(ParallelState *pstate)
326326
}
327327

328328
/*
329-
* Wait for the termination of the processes using the OS-specific method.
329+
* Wait for all workers to terminate.
330330
*/
331331
static void
332332
WaitForTerminatingWorkers(ParallelState *pstate)
@@ -337,39 +337,58 @@ WaitForTerminatingWorkers(ParallelState *pstate)
337337
int j;
338338

339339
#ifndef WIN32
340+
/* On non-Windows, use wait() to wait for next worker to end */
340341
int status;
341342
pid_t pid = wait(&status);
342343

344+
/* Find dead worker's slot, and clear the PID field */
343345
for (j = 0; j < pstate->numWorkers; j++)
344-
if (pstate->parallelSlot[j].pid == pid)
345-
slot = &(pstate->parallelSlot[j]);
346-
#else
347-
uintptr_t hThread;
348-
DWORD ret;
349-
uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
346+
{
347+
slot = &(pstate->parallelSlot[j]);
348+
if (slot->pid == pid)
349+
{
350+
slot->pid = 0;
351+
break;
352+
}
353+
}
354+
#else /* WIN32 */
355+
/* On Windows, we must use WaitForMultipleObjects() */
356+
HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
350357
int nrun = 0;
358+
DWORD ret;
359+
uintptr_t hThread;
351360

352361
for (j = 0; j < pstate->numWorkers; j++)
362+
{
353363
if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
354364
{
355-
lpHandles[nrun] = pstate->parallelSlot[j].hThread;
365+
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
356366
nrun++;
357367
}
358-
ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
368+
}
369+
ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
359370
Assert(ret != WAIT_FAILED);
360-
hThread = lpHandles[ret - WAIT_OBJECT_0];
371+
hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
372+
free(lpHandles);
361373

374+
/* Find dead worker's slot, and clear the hThread field */
362375
for (j = 0; j < pstate->numWorkers; j++)
363-
if (pstate->parallelSlot[j].hThread == hThread)
364-
slot = &(pstate->parallelSlot[j]);
365-
366-
free(lpHandles);
367-
#endif
368-
Assert(slot);
376+
{
377+
slot = &(pstate->parallelSlot[j]);
378+
if (slot->hThread == hThread)
379+
{
380+
/* For cleanliness, close handles for dead threads */
381+
CloseHandle((HANDLE) slot->hThread);
382+
slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
383+
break;
384+
}
385+
}
386+
#endif /* WIN32 */
369387

388+
/* On all platforms, update workerStatus as well */
389+
Assert(j < pstate->numWorkers);
370390
slot->workerStatus = WRKR_TERMINATED;
371391
}
372-
Assert(HasEveryWorkerTerminated(pstate));
373392
}
374393

375394
#ifndef WIN32

src/bin/pg_dump/pg_backup_utils.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ exit_nicely(int code)
149149

150150
#ifdef WIN32
151151
if (parallel_init_done && GetCurrentThreadId() != mainThreadId)
152-
ExitThread(code);
152+
_endthreadex(code);
153153
#endif
154154

155155
exit(code);

0 commit comments

Comments
 (0)