@@ -77,8 +77,6 @@ static ShutdownInformation shutdown_info;
77
77
static const char * modulename = gettext_noop ("parallel archiver" );
78
78
79
79
static ParallelSlot * GetMyPSlot (ParallelState * pstate );
80
- static void parallel_msg_master (ParallelSlot * slot , const char * modulename ,
81
- const char * fmt , va_list ap ) pg_attribute_printf (3 , 0 );
82
80
static void archive_close_connection (int code , void * arg );
83
81
static void ShutdownWorkersHard (ParallelState * pstate );
84
82
static void WaitForTerminatingWorkers (ParallelState * pstate );
@@ -162,65 +160,6 @@ GetMyPSlot(ParallelState *pstate)
162
160
return NULL ;
163
161
}
164
162
165
- /*
166
- * Fail and die, with a message to stderr. Parameters as for write_msg.
167
- *
168
- * This is defined in parallel.c, because in parallel mode, things are more
169
- * complicated. If the worker process does exit_horribly(), we forward its
170
- * last words to the master process. The master process then does
171
- * exit_horribly() with this error message itself and prints it normally.
172
- * After printing the message, exit_horribly() on the master will shut down
173
- * the remaining worker processes.
174
- */
175
- void
176
- exit_horribly (const char * modulename , const char * fmt ,...)
177
- {
178
- va_list ap ;
179
- ParallelState * pstate = shutdown_info .pstate ;
180
- ParallelSlot * slot ;
181
-
182
- va_start (ap , fmt );
183
-
184
- if (pstate == NULL )
185
- {
186
- /* Not in parallel mode, just write to stderr */
187
- vwrite_msg (modulename , fmt , ap );
188
- }
189
- else
190
- {
191
- slot = GetMyPSlot (pstate );
192
-
193
- if (!slot )
194
- /* We're the parent, just write the message out */
195
- vwrite_msg (modulename , fmt , ap );
196
- else
197
- /* If we're a worker process, send the msg to the master process */
198
- parallel_msg_master (slot , modulename , fmt , ap );
199
- }
200
-
201
- va_end (ap );
202
-
203
- exit_nicely (1 );
204
- }
205
-
206
- /* Sends the error message from the worker to the master process */
207
- static void
208
- parallel_msg_master (ParallelSlot * slot , const char * modulename ,
209
- const char * fmt , va_list ap )
210
- {
211
- char buf [512 ];
212
- int pipefd [2 ];
213
-
214
- pipefd [PIPE_READ ] = slot -> pipeRevRead ;
215
- pipefd [PIPE_WRITE ] = slot -> pipeRevWrite ;
216
-
217
- strcpy (buf , "ERROR " );
218
- vsnprintf (buf + strlen ("ERROR " ),
219
- sizeof (buf ) - strlen ("ERROR " ), fmt , ap );
220
-
221
- sendMessageToMaster (pipefd , buf );
222
- }
223
-
224
163
/*
225
164
* A thread-local version of getLocalPQExpBuffer().
226
165
*
@@ -271,7 +210,7 @@ getThreadLocalPQExpBuffer(void)
271
210
272
211
/*
273
212
* pg_dump and pg_restore register the Archive pointer for the exit handler
274
- * (called from exit_horribly ). This function mainly exists so that we can
213
+ * (called from exit_nicely ). This function mainly exists so that we can
275
214
* keep shutdown_info in file scope only.
276
215
*/
277
216
void
@@ -282,8 +221,8 @@ on_exit_close_archive(Archive *AHX)
282
221
}
283
222
284
223
/*
285
- * This function can close archives in both the parallel and non-parallel
286
- * case .
224
+ * on_exit_nicely handler for shutting down database connections and
225
+ * worker processes cleanly .
287
226
*/
288
227
static void
289
228
archive_close_connection (int code , void * arg )
@@ -292,42 +231,62 @@ archive_close_connection(int code, void *arg)
292
231
293
232
if (si -> pstate )
294
233
{
234
+ /* In parallel mode, must figure out who we are */
295
235
ParallelSlot * slot = GetMyPSlot (si -> pstate );
296
236
297
237
if (!slot )
298
238
{
299
239
/*
300
- * We're the master: We have already printed out the message
301
- * passed to exit_horribly() either from the master itself or from
302
- * a worker process. Now we need to close our own database
303
- * connection (only open during parallel dump but not restore) and
304
- * shut down the remaining workers.
240
+ * We're the master. Close our own database connection, if any,
241
+ * and then forcibly shut down workers.
305
242
*/
306
- DisconnectDatabase (si -> AHX );
243
+ if (si -> AHX )
244
+ DisconnectDatabase (si -> AHX );
245
+
307
246
#ifndef WIN32
308
247
309
248
/*
310
- * Setting aborting to true switches to best-effort-mode
311
- * (send/receive but ignore errors) in communicating with our
312
- * workers.
249
+ * Setting aborting to true shuts off error/warning messages that
250
+ * are no longer useful once we start killing workers.
313
251
*/
314
252
aborting = true;
315
253
#endif
316
254
ShutdownWorkersHard (si -> pstate );
317
255
}
318
- else if (slot -> args -> AH )
319
- DisconnectDatabase (& (slot -> args -> AH -> public ));
256
+ else
257
+ {
258
+ /*
259
+ * We're a worker. Shut down our own DB connection if any. On
260
+ * Windows, we also have to close our communication sockets, to
261
+ * emulate what will happen on Unix when the worker process exits.
262
+ * (Without this, if this is a premature exit, the master would
263
+ * fail to detect it because there would be no EOF condition on
264
+ * the other end of the pipe.)
265
+ */
266
+ if (slot -> args -> AH )
267
+ DisconnectDatabase (& (slot -> args -> AH -> public ));
268
+
269
+ #ifdef WIN32
270
+ closesocket (slot -> pipeRevRead );
271
+ closesocket (slot -> pipeRevWrite );
272
+ #endif
273
+ }
274
+ }
275
+ else
276
+ {
277
+ /* Non-parallel operation: just kill the master DB connection */
278
+ if (si -> AHX )
279
+ DisconnectDatabase (si -> AHX );
320
280
}
321
- else if (si -> AHX )
322
- DisconnectDatabase (si -> AHX );
323
281
}
324
282
325
283
/*
326
284
* If we have one worker that terminates for some reason, we'd like the other
327
285
* threads to terminate as well (and not finish with their 70 GB table dump
328
286
* first...). Now in UNIX we can just kill these processes, and let the signal
329
287
* handler set wantAbort to 1. In Windows we set a termEvent and this serves
330
- * as the signal for everyone to terminate.
288
+ * as the signal for everyone to terminate. We don't print any error message,
289
+ * that would just clutter the screen.
331
290
*/
332
291
void
333
292
checkAborting (ArchiveHandle * AH )
@@ -337,7 +296,7 @@ checkAborting(ArchiveHandle *AH)
337
296
#else
338
297
if (wantAbort )
339
298
#endif
340
- exit_horribly ( modulename , "worker is terminating\n" );
299
+ exit_nicely ( 1 );
341
300
}
342
301
343
302
/*
@@ -352,8 +311,6 @@ ShutdownWorkersHard(ParallelState *pstate)
352
311
#ifndef WIN32
353
312
int i ;
354
313
355
- signal (SIGPIPE , SIG_IGN );
356
-
357
314
/*
358
315
* Close our write end of the sockets so that the workers know they can
359
316
* exit.
@@ -428,27 +385,21 @@ sigTermHandler(int signum)
428
385
#endif
429
386
430
387
/*
431
- * This function is called by both UNIX and Windows variants to set up a
432
- * worker process.
388
+ * This function is called by both UNIX and Windows variants to set up
389
+ * and run a worker process. Caller should exit the process (or thread)
390
+ * upon return.
433
391
*/
434
392
static void
435
393
SetupWorker (ArchiveHandle * AH , int pipefd [2 ], int worker )
436
394
{
437
395
/*
438
396
* Call the setup worker function that's defined in the ArchiveHandle.
439
- *
440
- * We get the raw connection only for the reason that we can close it
441
- * properly when we shut down. This happens only that way when it is
442
- * brought down because of an error.
443
397
*/
444
398
(AH -> SetupWorkerPtr ) ((Archive * ) AH );
445
399
446
400
Assert (AH -> connection != NULL );
447
401
448
402
WaitForCommands (AH , pipefd );
449
-
450
- closesocket (pipefd [PIPE_READ ]);
451
- closesocket (pipefd [PIPE_WRITE ]);
452
403
}
453
404
454
405
#ifdef WIN32
@@ -534,14 +485,22 @@ ParallelBackupStart(ArchiveHandle *AH)
534
485
pstate -> parallelSlot [i ].args = (ParallelArgs * ) pg_malloc (sizeof (ParallelArgs ));
535
486
pstate -> parallelSlot [i ].args -> AH = NULL ;
536
487
pstate -> parallelSlot [i ].args -> te = NULL ;
488
+
489
+ /* master's ends of the pipes */
490
+ pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
491
+ pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
492
+ /* child's ends of the pipes */
493
+ pstate -> parallelSlot [i ].pipeRevRead = pipeMW [PIPE_READ ];
494
+ pstate -> parallelSlot [i ].pipeRevWrite = pipeWM [PIPE_WRITE ];
495
+
537
496
#ifdef WIN32
538
497
/* Allocate a new structure for every worker */
539
498
wi = (WorkerInfo * ) pg_malloc (sizeof (WorkerInfo ));
540
499
541
500
wi -> worker = i ;
542
501
wi -> AH = AH ;
543
- wi -> pipeRead = pstate -> parallelSlot [ i ]. pipeRevRead = pipeMW [PIPE_READ ];
544
- wi -> pipeWrite = pstate -> parallelSlot [ i ]. pipeRevWrite = pipeWM [PIPE_WRITE ];
502
+ wi -> pipeRead = pipeMW [PIPE_READ ];
503
+ wi -> pipeWrite = pipeWM [PIPE_WRITE ];
545
504
546
505
handle = _beginthreadex (NULL , 0 , (void * ) & init_spawned_worker_win32 ,
547
506
wi , 0 , & (pstate -> parallelSlot [i ].threadId ));
@@ -557,15 +516,6 @@ ParallelBackupStart(ArchiveHandle *AH)
557
516
pipefd [0 ] = pipeMW [PIPE_READ ];
558
517
pipefd [1 ] = pipeWM [PIPE_WRITE ];
559
518
560
- /*
561
- * Store the fds for the reverse communication in pstate. Actually
562
- * we only use this in case of an error and don't use pstate
563
- * otherwise in the worker process. On Windows we write to the
564
- * global pstate, in Unix we write to our process-local copy but
565
- * that's also where we'd retrieve this information back from.
566
- */
567
- pstate -> parallelSlot [i ].pipeRevRead = pipefd [PIPE_READ ];
568
- pstate -> parallelSlot [i ].pipeRevWrite = pipefd [PIPE_WRITE ];
569
519
pstate -> parallelSlot [i ].pid = getpid ();
570
520
571
521
/*
@@ -584,7 +534,7 @@ ParallelBackupStart(ArchiveHandle *AH)
584
534
585
535
/*
586
536
* Close all inherited fds for communication of the master with
587
- * the other workers.
537
+ * previously-forked workers.
588
538
*/
589
539
for (j = 0 ; j < i ; j ++ )
590
540
{
@@ -612,11 +562,16 @@ ParallelBackupStart(ArchiveHandle *AH)
612
562
613
563
pstate -> parallelSlot [i ].pid = pid ;
614
564
#endif
615
-
616
- pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
617
- pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
618
565
}
619
566
567
+ /*
568
+ * Having forked off the workers, disable SIGPIPE so that master isn't
569
+ * killed if it tries to send a command to a dead worker.
570
+ */
571
+ #ifndef WIN32
572
+ signal (SIGPIPE , SIG_IGN );
573
+ #endif
574
+
620
575
return pstate ;
621
576
}
622
577
@@ -977,16 +932,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
977
932
}
978
933
else
979
934
exit_horribly (modulename ,
980
- "invalid message received from worker: %s\n" , msg );
981
- }
982
- else if (messageStartsWith (msg , "ERROR " ))
983
- {
984
- Assert (AH -> format == archDirectory || AH -> format == archCustom );
985
- pstate -> parallelSlot [worker ].workerStatus = WRKR_TERMINATED ;
986
- exit_horribly (modulename , "%s" , msg + strlen ("ERROR " ));
935
+ "invalid message received from worker: \"%s\"\n" ,
936
+ msg );
987
937
}
988
938
else
989
- exit_horribly (modulename , "invalid message received from worker: %s\n" , msg );
939
+ exit_horribly (modulename ,
940
+ "invalid message received from worker: \"%s\"\n" ,
941
+ msg );
990
942
991
943
/* both Unix and Win32 return pg_malloc()ed space, so we free it */
992
944
free (msg );
0 commit comments