Skip to content

Commit 8b97208

Browse files
committed
Clean up thread management in parallel pg_dump for Windows.
Since we start the worker threads with _beginthreadex(), we should use _endthreadex() to terminate them. We got this right in the normal-exit code path, but not so much during an error exit from a worker. In addition, be sure to apply CloseHandle to the thread handle after each thread exits. It's not clear that these oversights cause any user-visible problems, since the pg_dump run is about to terminate anyway. Still, it's clearly better to follow Microsoft's API specifications than ignore them. Also a few cosmetic cleanups in WaitForTerminatingWorkers(), including being a bit less random about where to cast between uintptr_t and HANDLE, and being sure to clear the worker identity field for each dead worker (not that false matches should be possible later, but let's be careful). Original observation and patch by Armin Schöffmann, cosmetic improvements by Michael Paquier and me. (Armin's patch also included closing sockets in ShutdownWorkersHard(), but that's been dealt with already in commit df8d2d8.) Back-patch to 9.3 where parallel pg_dump was introduced. Discussion: <zarafa.570306bd.3418.074bf1420d8f2ba2@root.aegaeon.de>
1 parent 1f1e70a commit 8b97208

File tree

2 files changed

+37
-18
lines changed

2 files changed

+37
-18
lines changed

src/bin/pg_dump/parallel.c

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ ShutdownWorkersHard(ParallelState *pstate)
328328
}
329329

330330
/*
331-
* Wait for the termination of the processes using the OS-specific method.
331+
* Wait for all workers to terminate.
332332
*/
333333
static void
334334
WaitForTerminatingWorkers(ParallelState *pstate)
@@ -339,39 +339,58 @@ WaitForTerminatingWorkers(ParallelState *pstate)
339339
int j;
340340

341341
#ifndef WIN32
342+
/* On non-Windows, use wait() to wait for next worker to end */
342343
int status;
343344
pid_t pid = wait(&status);
344345

346+
/* Find dead worker's slot, and clear the PID field */
345347
for (j = 0; j < pstate->numWorkers; j++)
346-
if (pstate->parallelSlot[j].pid == pid)
347-
slot = &(pstate->parallelSlot[j]);
348-
#else
349-
uintptr_t hThread;
350-
DWORD ret;
351-
uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
348+
{
349+
slot = &(pstate->parallelSlot[j]);
350+
if (slot->pid == pid)
351+
{
352+
slot->pid = 0;
353+
break;
354+
}
355+
}
356+
#else /* WIN32 */
357+
/* On Windows, we must use WaitForMultipleObjects() */
358+
HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
352359
int nrun = 0;
360+
DWORD ret;
361+
uintptr_t hThread;
353362

354363
for (j = 0; j < pstate->numWorkers; j++)
364+
{
355365
if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
356366
{
357-
lpHandles[nrun] = pstate->parallelSlot[j].hThread;
367+
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
358368
nrun++;
359369
}
360-
ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
370+
}
371+
ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
361372
Assert(ret != WAIT_FAILED);
362-
hThread = lpHandles[ret - WAIT_OBJECT_0];
373+
hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
374+
free(lpHandles);
363375

376+
/* Find dead worker's slot, and clear the hThread field */
364377
for (j = 0; j < pstate->numWorkers; j++)
365-
if (pstate->parallelSlot[j].hThread == hThread)
366-
slot = &(pstate->parallelSlot[j]);
367-
368-
free(lpHandles);
369-
#endif
370-
Assert(slot);
378+
{
379+
slot = &(pstate->parallelSlot[j]);
380+
if (slot->hThread == hThread)
381+
{
382+
/* For cleanliness, close handles for dead threads */
383+
CloseHandle((HANDLE) slot->hThread);
384+
slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
385+
break;
386+
}
387+
}
388+
#endif /* WIN32 */
371389

390+
/* On all platforms, update workerStatus as well */
391+
Assert(j < pstate->numWorkers);
372392
slot->workerStatus = WRKR_TERMINATED;
373393
}
374-
Assert(HasEveryWorkerTerminated(pstate));
375394
}
376395

377396
#ifndef WIN32

src/bin/pg_dump/pg_backup_utils.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ exit_nicely(int code)
149149

150150
#ifdef WIN32
151151
if (parallel_init_done && GetCurrentThreadId() != mainThreadId)
152-
ExitThread(code);
152+
_endthreadex(code);
153153
#endif
154154

155155
exit(code);

0 commit comments

Comments
 (0)