Skip to content

Commit 807b453

Browse files
committed
Clean up thread management in parallel pg_dump for Windows.
Since we start the worker threads with _beginthreadex(), we should use _endthreadex() to terminate them. We got this right in the normal-exit code path, but not so much during an error exit from a worker. In addition, be sure to apply CloseHandle to the thread handle after each thread exits. It's not clear that these oversights cause any user-visible problems, since the pg_dump run is about to terminate anyway. Still, it's clearly better to follow Microsoft's API specifications than ignore them. Also a few cosmetic cleanups in WaitForTerminatingWorkers(), including being a bit less random about where to cast between uintptr_t and HANDLE, and being sure to clear the worker identity field for each dead worker (not that false matches should be possible later, but let's be careful). Original observation and patch by Armin Schöffmann, cosmetic improvements by Michael Paquier and me. (Armin's patch also included closing sockets in ShutdownWorkersHard(), but that's been dealt with already in commit df8d2d8.) Back-patch to 9.3 where parallel pg_dump was introduced. Discussion: <zarafa.570306bd.3418.074bf1420d8f2ba2@root.aegaeon.de>
1 parent d81ecb9 commit 807b453

File tree

2 files changed

+37
-18
lines changed

2 files changed

+37
-18
lines changed

src/bin/pg_dump/parallel.c

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ ShutdownWorkersHard(ParallelState *pstate)
327327
}
328328

329329
/*
330-
* Wait for the termination of the processes using the OS-specific method.
330+
* Wait for all workers to terminate.
331331
*/
332332
static void
333333
WaitForTerminatingWorkers(ParallelState *pstate)
@@ -338,39 +338,58 @@ WaitForTerminatingWorkers(ParallelState *pstate)
338338
int j;
339339

340340
#ifndef WIN32
341+
/* On non-Windows, use wait() to wait for next worker to end */
341342
int status;
342343
pid_t pid = wait(&status);
343344

345+
/* Find dead worker's slot, and clear the PID field */
344346
for (j = 0; j < pstate->numWorkers; j++)
345-
if (pstate->parallelSlot[j].pid == pid)
346-
slot = &(pstate->parallelSlot[j]);
347-
#else
348-
uintptr_t hThread;
349-
DWORD ret;
350-
uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
347+
{
348+
slot = &(pstate->parallelSlot[j]);
349+
if (slot->pid == pid)
350+
{
351+
slot->pid = 0;
352+
break;
353+
}
354+
}
355+
#else /* WIN32 */
356+
/* On Windows, we must use WaitForMultipleObjects() */
357+
HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
351358
int nrun = 0;
359+
DWORD ret;
360+
uintptr_t hThread;
352361

353362
for (j = 0; j < pstate->numWorkers; j++)
363+
{
354364
if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
355365
{
356-
lpHandles[nrun] = pstate->parallelSlot[j].hThread;
366+
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
357367
nrun++;
358368
}
359-
ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
369+
}
370+
ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
360371
Assert(ret != WAIT_FAILED);
361-
hThread = lpHandles[ret - WAIT_OBJECT_0];
372+
hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
373+
free(lpHandles);
362374

375+
/* Find dead worker's slot, and clear the hThread field */
363376
for (j = 0; j < pstate->numWorkers; j++)
364-
if (pstate->parallelSlot[j].hThread == hThread)
365-
slot = &(pstate->parallelSlot[j]);
366-
367-
free(lpHandles);
368-
#endif
369-
Assert(slot);
377+
{
378+
slot = &(pstate->parallelSlot[j]);
379+
if (slot->hThread == hThread)
380+
{
381+
/* For cleanliness, close handles for dead threads */
382+
CloseHandle((HANDLE) slot->hThread);
383+
slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
384+
break;
385+
}
386+
}
387+
#endif /* WIN32 */
370388

389+
/* On all platforms, update workerStatus as well */
390+
Assert(j < pstate->numWorkers);
371391
slot->workerStatus = WRKR_TERMINATED;
372392
}
373-
Assert(HasEveryWorkerTerminated(pstate));
374393
}
375394

376395
#ifndef WIN32

src/bin/pg_dump/pg_backup_utils.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ exit_nicely(int code)
149149

150150
#ifdef WIN32
151151
if (parallel_init_done && GetCurrentThreadId() != mainThreadId)
152-
ExitThread(code);
152+
_endthreadex(code);
153153
#endif
154154

155155
exit(code);

0 commit comments

Comments
 (0)