Skip to content

Commit 9abd64e

Browse files
committed
Fix broken error handling in parallel pg_dump/pg_restore.
In the original design for parallel dump, worker processes reported errors by sending them up to the master process, which would print the messages. This is unworkably fragile for a couple of reasons: it risks deadlock if a worker sends an error at an unexpected time, and if the master has already died for some reason, the user will never get to see the error at all. Revert that idea and go back to just always printing messages to stderr. This approach means that if all the workers fail for similar reasons (eg, bad password or server shutdown), the user will see N copies of that message, not only one as before. While that's slightly annoying, it's certainly better than not seeing any message; not to mention that we shouldn't assume that only the first failure is interesting. An additional problem in the same area was that the master failed to disable SIGPIPE (at least until much too late), which meant that sending a command to an already-dead worker would cause the master to crash silently. That was bad enough in itself but was made worse by the total reliance on the master to print errors: even if the worker had reported an error, you would probably not see it, depending on timing. Instead disable SIGPIPE right after we've forked the workers, before attempting to send them anything. Additionally, the master relies on seeing socket EOF to realize that a worker has exited prematurely --- but on Windows, there would be no EOF since the socket is attached to the process that includes both the master and worker threads, so it remains open. Make archive_close_connection() close the worker end of the sockets so that this acts more like the Unix case. It's not perfect, because if a worker thread exits without going through exit_nicely() the closures won't happen; but that's not really supposed to happen. This has been wrong all along, so back-patch to 9.3 where parallel dump was introduced. Report: <2458.1450894615@sss.pgh.pa.us>
1 parent 627e360 commit 9abd64e

File tree

3 files changed

+98
-116
lines changed

3 files changed

+98
-116
lines changed

src/bin/pg_dump/parallel.c

Lines changed: 65 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ static ShutdownInformation shutdown_info;
7777
static const char *modulename = gettext_noop("parallel archiver");
7878

7979
static ParallelSlot *GetMyPSlot(ParallelState *pstate);
80-
static void parallel_msg_master(ParallelSlot *slot, const char *modulename,
81-
const char *fmt, va_list ap) pg_attribute_printf(3, 0);
8280
static void archive_close_connection(int code, void *arg);
8381
static void ShutdownWorkersHard(ParallelState *pstate);
8482
static void WaitForTerminatingWorkers(ParallelState *pstate);
@@ -162,65 +160,6 @@ GetMyPSlot(ParallelState *pstate)
162160
return NULL;
163161
}
164162

165-
/*
166-
* Fail and die, with a message to stderr. Parameters as for write_msg.
167-
*
168-
* This is defined in parallel.c, because in parallel mode, things are more
169-
* complicated. If the worker process does exit_horribly(), we forward its
170-
* last words to the master process. The master process then does
171-
* exit_horribly() with this error message itself and prints it normally.
172-
* After printing the message, exit_horribly() on the master will shut down
173-
* the remaining worker processes.
174-
*/
175-
void
176-
exit_horribly(const char *modulename, const char *fmt,...)
177-
{
178-
va_list ap;
179-
ParallelState *pstate = shutdown_info.pstate;
180-
ParallelSlot *slot;
181-
182-
va_start(ap, fmt);
183-
184-
if (pstate == NULL)
185-
{
186-
/* Not in parallel mode, just write to stderr */
187-
vwrite_msg(modulename, fmt, ap);
188-
}
189-
else
190-
{
191-
slot = GetMyPSlot(pstate);
192-
193-
if (!slot)
194-
/* We're the parent, just write the message out */
195-
vwrite_msg(modulename, fmt, ap);
196-
else
197-
/* If we're a worker process, send the msg to the master process */
198-
parallel_msg_master(slot, modulename, fmt, ap);
199-
}
200-
201-
va_end(ap);
202-
203-
exit_nicely(1);
204-
}
205-
206-
/* Sends the error message from the worker to the master process */
207-
static void
208-
parallel_msg_master(ParallelSlot *slot, const char *modulename,
209-
const char *fmt, va_list ap)
210-
{
211-
char buf[512];
212-
int pipefd[2];
213-
214-
pipefd[PIPE_READ] = slot->pipeRevRead;
215-
pipefd[PIPE_WRITE] = slot->pipeRevWrite;
216-
217-
strcpy(buf, "ERROR ");
218-
vsnprintf(buf + strlen("ERROR "),
219-
sizeof(buf) - strlen("ERROR "), fmt, ap);
220-
221-
sendMessageToMaster(pipefd, buf);
222-
}
223-
224163
/*
225164
* A thread-local version of getLocalPQExpBuffer().
226165
*
@@ -271,7 +210,7 @@ getThreadLocalPQExpBuffer(void)
271210

272211
/*
273212
* pg_dump and pg_restore register the Archive pointer for the exit handler
274-
* (called from exit_horribly). This function mainly exists so that we can
213+
* (called from exit_nicely). This function mainly exists so that we can
275214
* keep shutdown_info in file scope only.
276215
*/
277216
void
@@ -282,8 +221,8 @@ on_exit_close_archive(Archive *AHX)
282221
}
283222

284223
/*
285-
* This function can close archives in both the parallel and non-parallel
286-
* case.
224+
* on_exit_nicely handler for shutting down database connections and
225+
* worker processes cleanly.
287226
*/
288227
static void
289228
archive_close_connection(int code, void *arg)
@@ -292,42 +231,62 @@ archive_close_connection(int code, void *arg)
292231

293232
if (si->pstate)
294233
{
234+
/* In parallel mode, must figure out who we are */
295235
ParallelSlot *slot = GetMyPSlot(si->pstate);
296236

297237
if (!slot)
298238
{
299239
/*
300-
* We're the master: We have already printed out the message
301-
* passed to exit_horribly() either from the master itself or from
302-
* a worker process. Now we need to close our own database
303-
* connection (only open during parallel dump but not restore) and
304-
* shut down the remaining workers.
240+
* We're the master. Close our own database connection, if any,
241+
* and then forcibly shut down workers.
305242
*/
306-
DisconnectDatabase(si->AHX);
243+
if (si->AHX)
244+
DisconnectDatabase(si->AHX);
245+
307246
#ifndef WIN32
308247

309248
/*
310-
* Setting aborting to true switches to best-effort-mode
311-
* (send/receive but ignore errors) in communicating with our
312-
* workers.
249+
* Setting aborting to true shuts off error/warning messages that
250+
* are no longer useful once we start killing workers.
313251
*/
314252
aborting = true;
315253
#endif
316254
ShutdownWorkersHard(si->pstate);
317255
}
318-
else if (slot->args->AH)
319-
DisconnectDatabase(&(slot->args->AH->public));
256+
else
257+
{
258+
/*
259+
* We're a worker. Shut down our own DB connection if any. On
260+
* Windows, we also have to close our communication sockets, to
261+
* emulate what will happen on Unix when the worker process exits.
262+
* (Without this, if this is a premature exit, the master would
263+
* fail to detect it because there would be no EOF condition on
264+
* the other end of the pipe.)
265+
*/
266+
if (slot->args->AH)
267+
DisconnectDatabase(&(slot->args->AH->public));
268+
269+
#ifdef WIN32
270+
closesocket(slot->pipeRevRead);
271+
closesocket(slot->pipeRevWrite);
272+
#endif
273+
}
274+
}
275+
else
276+
{
277+
/* Non-parallel operation: just kill the master DB connection */
278+
if (si->AHX)
279+
DisconnectDatabase(si->AHX);
320280
}
321-
else if (si->AHX)
322-
DisconnectDatabase(si->AHX);
323281
}
324282

325283
/*
326284
* If we have one worker that terminates for some reason, we'd like the other
327285
* threads to terminate as well (and not finish with their 70 GB table dump
328286
* first...). Now in UNIX we can just kill these processes, and let the signal
329287
* handler set wantAbort to 1. In Windows we set a termEvent and this serves
330-
* as the signal for everyone to terminate.
288+
* as the signal for everyone to terminate. We don't print any error message,
289+
* that would just clutter the screen.
331290
*/
332291
void
333292
checkAborting(ArchiveHandle *AH)
@@ -337,7 +296,7 @@ checkAborting(ArchiveHandle *AH)
337296
#else
338297
if (wantAbort)
339298
#endif
340-
exit_horribly(modulename, "worker is terminating\n");
299+
exit_nicely(1);
341300
}
342301

343302
/*
@@ -352,8 +311,6 @@ ShutdownWorkersHard(ParallelState *pstate)
352311
#ifndef WIN32
353312
int i;
354313

355-
signal(SIGPIPE, SIG_IGN);
356-
357314
/*
358315
* Close our write end of the sockets so that the workers know they can
359316
* exit.
@@ -428,27 +385,21 @@ sigTermHandler(int signum)
428385
#endif
429386

430387
/*
431-
* This function is called by both UNIX and Windows variants to set up a
432-
* worker process.
388+
* This function is called by both UNIX and Windows variants to set up
389+
* and run a worker process. Caller should exit the process (or thread)
390+
* upon return.
433391
*/
434392
static void
435393
SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker)
436394
{
437395
/*
438396
* Call the setup worker function that's defined in the ArchiveHandle.
439-
*
440-
* We get the raw connection only for the reason that we can close it
441-
* properly when we shut down. This happens only that way when it is
442-
* brought down because of an error.
443397
*/
444398
(AH->SetupWorkerPtr) ((Archive *) AH);
445399

446400
Assert(AH->connection != NULL);
447401

448402
WaitForCommands(AH, pipefd);
449-
450-
closesocket(pipefd[PIPE_READ]);
451-
closesocket(pipefd[PIPE_WRITE]);
452403
}
453404

454405
#ifdef WIN32
@@ -534,14 +485,22 @@ ParallelBackupStart(ArchiveHandle *AH)
534485
pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
535486
pstate->parallelSlot[i].args->AH = NULL;
536487
pstate->parallelSlot[i].args->te = NULL;
488+
489+
/* master's ends of the pipes */
490+
pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
491+
pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
492+
/* child's ends of the pipes */
493+
pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
494+
pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
495+
537496
#ifdef WIN32
538497
/* Allocate a new structure for every worker */
539498
wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
540499

541500
wi->worker = i;
542501
wi->AH = AH;
543-
wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
544-
wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
502+
wi->pipeRead = pipeMW[PIPE_READ];
503+
wi->pipeWrite = pipeWM[PIPE_WRITE];
545504

546505
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
547506
wi, 0, &(pstate->parallelSlot[i].threadId));
@@ -557,15 +516,6 @@ ParallelBackupStart(ArchiveHandle *AH)
557516
pipefd[0] = pipeMW[PIPE_READ];
558517
pipefd[1] = pipeWM[PIPE_WRITE];
559518

560-
/*
561-
* Store the fds for the reverse communication in pstate. Actually
562-
* we only use this in case of an error and don't use pstate
563-
* otherwise in the worker process. On Windows we write to the
564-
* global pstate, in Unix we write to our process-local copy but
565-
* that's also where we'd retrieve this information back from.
566-
*/
567-
pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
568-
pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
569519
pstate->parallelSlot[i].pid = getpid();
570520

571521
/*
@@ -584,7 +534,7 @@ ParallelBackupStart(ArchiveHandle *AH)
584534

585535
/*
586536
* Close all inherited fds for communication of the master with
587-
* the other workers.
537+
* previously-forked workers.
588538
*/
589539
for (j = 0; j < i; j++)
590540
{
@@ -612,11 +562,16 @@ ParallelBackupStart(ArchiveHandle *AH)
612562

613563
pstate->parallelSlot[i].pid = pid;
614564
#endif
615-
616-
pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
617-
pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
618565
}
619566

567+
/*
568+
* Having forked off the workers, disable SIGPIPE so that master isn't
569+
* killed if it tries to send a command to a dead worker.
570+
*/
571+
#ifndef WIN32
572+
signal(SIGPIPE, SIG_IGN);
573+
#endif
574+
620575
return pstate;
621576
}
622577

@@ -977,16 +932,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
977932
}
978933
else
979934
exit_horribly(modulename,
980-
"invalid message received from worker: %s\n", msg);
981-
}
982-
else if (messageStartsWith(msg, "ERROR "))
983-
{
984-
Assert(AH->format == archDirectory || AH->format == archCustom);
985-
pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
986-
exit_horribly(modulename, "%s", msg + strlen("ERROR "));
935+
"invalid message received from worker: \"%s\"\n",
936+
msg);
987937
}
988938
else
989-
exit_horribly(modulename, "invalid message received from worker: %s\n", msg);
939+
exit_horribly(modulename,
940+
"invalid message received from worker: \"%s\"\n",
941+
msg);
990942

991943
/* both Unix and Win32 return pg_malloc()ed space, so we free it */
992944
free(msg);

src/bin/pg_dump/parallel.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ typedef struct ParallelSlot
4242
ParallelArgs *args;
4343
T_WorkerStatus workerStatus;
4444
int status;
45-
int pipeRead;
45+
int pipeRead; /* master's end of the pipes */
4646
int pipeWrite;
47-
int pipeRevRead;
47+
int pipeRevRead; /* child's end of the pipes */
4848
int pipeRevWrite;
4949
#ifdef WIN32
5050
uintptr_t hThread;

src/bin/pg_dump/pg_backup_utils.c

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,23 @@ vwrite_msg(const char *modulename, const char *fmt, va_list ap)
9393
vfprintf(stderr, _(fmt), ap);
9494
}
9595

96+
/*
97+
* Fail and die, with a message to stderr. Parameters as for write_msg.
98+
*
99+
* Note that on_exit_nicely callbacks will get run.
100+
*/
101+
void
102+
exit_horribly(const char *modulename, const char *fmt,...)
103+
{
104+
va_list ap;
105+
106+
va_start(ap, fmt);
107+
vwrite_msg(modulename, fmt, ap);
108+
va_end(ap);
109+
110+
exit_nicely(1);
111+
}
112+
96113
/* Register a callback to be run when exit_nicely is invoked. */
97114
void
98115
on_exit_nicely(on_exit_nicely_callback function, void *arg)
@@ -106,7 +123,20 @@ on_exit_nicely(on_exit_nicely_callback function, void *arg)
106123

107124
/*
108125
* Run accumulated on_exit_nicely callbacks in reverse order and then exit
109-
* quietly. This needs to be thread-safe.
126+
* without printing any message.
127+
*
128+
* If running in a parallel worker thread on Windows, we only exit the thread,
129+
* not the whole process.
130+
*
131+
* Note that in parallel operation on Windows, the callback(s) will be run
132+
* by each thread since the list state is necessarily shared by all threads;
133+
* each callback must contain logic to ensure it does only what's appropriate
134+
* for its thread. On Unix, callbacks are also run by each process, but only
135+
* for callbacks established before we fork off the child processes. (It'd
136+
* be cleaner to reset the list after fork(), and let each child establish
137+
* its own callbacks; but then the behavior would be completely inconsistent
138+
* between Windows and Unix. For now, just be sure to establish callbacks
139+
* before forking to avoid inconsistency.)
110140
*/
111141
void
112142
exit_nicely(int code)

0 commit comments

Comments
 (0)