@@ -77,10 +77,6 @@ static ShutdownInformation shutdown_info;
77
77
static const char * modulename = gettext_noop ("parallel archiver" );
78
78
79
79
static ParallelSlot * GetMyPSlot (ParallelState * pstate );
80
- static void
81
- parallel_msg_master (ParallelSlot * slot , const char * modulename ,
82
- const char * fmt , va_list ap )
83
- __attribute__((format (PG_PRINTF_ATTRIBUTE , 3 , 0 )));
84
80
static void archive_close_connection (int code , void * arg );
85
81
static void ShutdownWorkersHard (ParallelState * pstate );
86
82
static void WaitForTerminatingWorkers (ParallelState * pstate );
@@ -165,65 +161,6 @@ GetMyPSlot(ParallelState *pstate)
165
161
return NULL ;
166
162
}
167
163
168
- /*
169
- * Fail and die, with a message to stderr. Parameters as for write_msg.
170
- *
171
- * This is defined in parallel.c, because in parallel mode, things are more
172
- * complicated. If the worker process does exit_horribly(), we forward its
173
- * last words to the master process. The master process then does
174
- * exit_horribly() with this error message itself and prints it normally.
175
- * After printing the message, exit_horribly() on the master will shut down
176
- * the remaining worker processes.
177
- */
178
- void
179
- exit_horribly (const char * modulename , const char * fmt ,...)
180
- {
181
- va_list ap ;
182
- ParallelState * pstate = shutdown_info .pstate ;
183
- ParallelSlot * slot ;
184
-
185
- va_start (ap , fmt );
186
-
187
- if (pstate == NULL )
188
- {
189
- /* Not in parallel mode, just write to stderr */
190
- vwrite_msg (modulename , fmt , ap );
191
- }
192
- else
193
- {
194
- slot = GetMyPSlot (pstate );
195
-
196
- if (!slot )
197
- /* We're the parent, just write the message out */
198
- vwrite_msg (modulename , fmt , ap );
199
- else
200
- /* If we're a worker process, send the msg to the master process */
201
- parallel_msg_master (slot , modulename , fmt , ap );
202
- }
203
-
204
- va_end (ap );
205
-
206
- exit_nicely (1 );
207
- }
208
-
209
- /* Sends the error message from the worker to the master process */
210
- static void
211
- parallel_msg_master (ParallelSlot * slot , const char * modulename ,
212
- const char * fmt , va_list ap )
213
- {
214
- char buf [512 ];
215
- int pipefd [2 ];
216
-
217
- pipefd [PIPE_READ ] = slot -> pipeRevRead ;
218
- pipefd [PIPE_WRITE ] = slot -> pipeRevWrite ;
219
-
220
- strcpy (buf , "ERROR " );
221
- vsnprintf (buf + strlen ("ERROR " ),
222
- sizeof (buf ) - strlen ("ERROR " ), fmt , ap );
223
-
224
- sendMessageToMaster (pipefd , buf );
225
- }
226
-
227
164
/*
228
165
* A thread-local version of getLocalPQExpBuffer().
229
166
*
@@ -274,7 +211,7 @@ getThreadLocalPQExpBuffer(void)
274
211
275
212
/*
276
213
* pg_dump and pg_restore register the Archive pointer for the exit handler
277
- * (called from exit_horribly ). This function mainly exists so that we can
214
+ * (called from exit_nicely ). This function mainly exists so that we can
278
215
* keep shutdown_info in file scope only.
279
216
*/
280
217
void
@@ -285,8 +222,8 @@ on_exit_close_archive(Archive *AHX)
285
222
}
286
223
287
224
/*
288
- * This function can close archives in both the parallel and non-parallel
289
- * case .
225
+ * on_exit_nicely handler for shutting down database connections and
226
+ * worker processes cleanly .
290
227
*/
291
228
static void
292
229
archive_close_connection (int code , void * arg )
@@ -295,42 +232,62 @@ archive_close_connection(int code, void *arg)
295
232
296
233
if (si -> pstate )
297
234
{
235
+ /* In parallel mode, must figure out who we are */
298
236
ParallelSlot * slot = GetMyPSlot (si -> pstate );
299
237
300
238
if (!slot )
301
239
{
302
240
/*
303
- * We're the master: We have already printed out the message
304
- * passed to exit_horribly() either from the master itself or from
305
- * a worker process. Now we need to close our own database
306
- * connection (only open during parallel dump but not restore) and
307
- * shut down the remaining workers.
241
+ * We're the master. Close our own database connection, if any,
242
+ * and then forcibly shut down workers.
308
243
*/
309
- DisconnectDatabase (si -> AHX );
244
+ if (si -> AHX )
245
+ DisconnectDatabase (si -> AHX );
246
+
310
247
#ifndef WIN32
311
248
312
249
/*
313
- * Setting aborting to true switches to best-effort-mode
314
- * (send/receive but ignore errors) in communicating with our
315
- * workers.
250
+ * Setting aborting to true shuts off error/warning messages that
251
+ * are no longer useful once we start killing workers.
316
252
*/
317
253
aborting = true;
318
254
#endif
319
255
ShutdownWorkersHard (si -> pstate );
320
256
}
321
- else if (slot -> args -> AH )
322
- DisconnectDatabase (& (slot -> args -> AH -> public ));
257
+ else
258
+ {
259
+ /*
260
+ * We're a worker. Shut down our own DB connection if any. On
261
+ * Windows, we also have to close our communication sockets, to
262
+ * emulate what will happen on Unix when the worker process exits.
263
+ * (Without this, if this is a premature exit, the master would
264
+ * fail to detect it because there would be no EOF condition on
265
+ * the other end of the pipe.)
266
+ */
267
+ if (slot -> args -> AH )
268
+ DisconnectDatabase (& (slot -> args -> AH -> public ));
269
+
270
+ #ifdef WIN32
271
+ closesocket (slot -> pipeRevRead );
272
+ closesocket (slot -> pipeRevWrite );
273
+ #endif
274
+ }
275
+ }
276
+ else
277
+ {
278
+ /* Non-parallel operation: just kill the master DB connection */
279
+ if (si -> AHX )
280
+ DisconnectDatabase (si -> AHX );
323
281
}
324
- else if (si -> AHX )
325
- DisconnectDatabase (si -> AHX );
326
282
}
327
283
328
284
/*
329
285
* If we have one worker that terminates for some reason, we'd like the other
330
286
* threads to terminate as well (and not finish with their 70 GB table dump
331
287
* first...). Now in UNIX we can just kill these processes, and let the signal
332
288
* handler set wantAbort to 1. In Windows we set a termEvent and this serves
333
- * as the signal for everyone to terminate.
289
+ * as the signal for everyone to terminate. We don't print any error message,
290
+ * that would just clutter the screen.
334
291
*/
335
292
void
336
293
checkAborting (ArchiveHandle * AH )
@@ -340,7 +297,7 @@ checkAborting(ArchiveHandle *AH)
340
297
#else
341
298
if (wantAbort )
342
299
#endif
343
- exit_horribly ( modulename , "worker is terminating\n" );
300
+ exit_nicely ( 1 );
344
301
}
345
302
346
303
/*
@@ -355,8 +312,6 @@ ShutdownWorkersHard(ParallelState *pstate)
355
312
#ifndef WIN32
356
313
int i ;
357
314
358
- signal (SIGPIPE , SIG_IGN );
359
-
360
315
/*
361
316
* Close our write end of the sockets so that the workers know they can
362
317
* exit.
@@ -431,28 +386,22 @@ sigTermHandler(int signum)
431
386
#endif
432
387
433
388
/*
434
- * This function is called by both UNIX and Windows variants to set up a
435
- * worker process.
389
+ * This function is called by both UNIX and Windows variants to set up
390
+ * and run a worker process. Caller should exit the process (or thread)
391
+ * upon return.
436
392
*/
437
393
static void
438
394
SetupWorker (ArchiveHandle * AH , int pipefd [2 ], int worker ,
439
395
RestoreOptions * ropt )
440
396
{
441
397
/*
442
398
* Call the setup worker function that's defined in the ArchiveHandle.
443
- *
444
- * We get the raw connection only for the reason that we can close it
445
- * properly when we shut down. This happens only that way when it is
446
- * brought down because of an error.
447
399
*/
448
400
(AH -> SetupWorkerPtr ) ((Archive * ) AH , ropt );
449
401
450
402
Assert (AH -> connection != NULL );
451
403
452
404
WaitForCommands (AH , pipefd );
453
-
454
- closesocket (pipefd [PIPE_READ ]);
455
- closesocket (pipefd [PIPE_WRITE ]);
456
405
}
457
406
458
407
#ifdef WIN32
@@ -539,15 +488,23 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
539
488
pstate -> parallelSlot [i ].args = (ParallelArgs * ) pg_malloc (sizeof (ParallelArgs ));
540
489
pstate -> parallelSlot [i ].args -> AH = NULL ;
541
490
pstate -> parallelSlot [i ].args -> te = NULL ;
491
+
492
+ /* master's ends of the pipes */
493
+ pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
494
+ pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
495
+ /* child's ends of the pipes */
496
+ pstate -> parallelSlot [i ].pipeRevRead = pipeMW [PIPE_READ ];
497
+ pstate -> parallelSlot [i ].pipeRevWrite = pipeWM [PIPE_WRITE ];
498
+
542
499
#ifdef WIN32
543
500
/* Allocate a new structure for every worker */
544
501
wi = (WorkerInfo * ) pg_malloc (sizeof (WorkerInfo ));
545
502
546
503
wi -> ropt = ropt ;
547
504
wi -> worker = i ;
548
505
wi -> AH = AH ;
549
- wi -> pipeRead = pstate -> parallelSlot [ i ]. pipeRevRead = pipeMW [PIPE_READ ];
550
- wi -> pipeWrite = pstate -> parallelSlot [ i ]. pipeRevWrite = pipeWM [PIPE_WRITE ];
506
+ wi -> pipeRead = pipeMW [PIPE_READ ];
507
+ wi -> pipeWrite = pipeWM [PIPE_WRITE ];
551
508
552
509
handle = _beginthreadex (NULL , 0 , (void * ) & init_spawned_worker_win32 ,
553
510
wi , 0 , & (pstate -> parallelSlot [i ].threadId ));
@@ -563,15 +520,6 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
563
520
pipefd [0 ] = pipeMW [PIPE_READ ];
564
521
pipefd [1 ] = pipeWM [PIPE_WRITE ];
565
522
566
- /*
567
- * Store the fds for the reverse communication in pstate. Actually
568
- * we only use this in case of an error and don't use pstate
569
- * otherwise in the worker process. On Windows we write to the
570
- * global pstate, in Unix we write to our process-local copy but
571
- * that's also where we'd retrieve this information back from.
572
- */
573
- pstate -> parallelSlot [i ].pipeRevRead = pipefd [PIPE_READ ];
574
- pstate -> parallelSlot [i ].pipeRevWrite = pipefd [PIPE_WRITE ];
575
523
pstate -> parallelSlot [i ].pid = getpid ();
576
524
577
525
/*
@@ -590,7 +538,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
590
538
591
539
/*
592
540
* Close all inherited fds for communication of the master with
593
- * the other workers.
541
+ * previously-forked workers.
594
542
*/
595
543
for (j = 0 ; j < i ; j ++ )
596
544
{
@@ -618,11 +566,16 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
618
566
619
567
pstate -> parallelSlot [i ].pid = pid ;
620
568
#endif
621
-
622
- pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
623
- pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
624
569
}
625
570
571
+ /*
572
+ * Having forked off the workers, disable SIGPIPE so that master isn't
573
+ * killed if it tries to send a command to a dead worker.
574
+ */
575
+ #ifndef WIN32
576
+ signal (SIGPIPE , SIG_IGN );
577
+ #endif
578
+
626
579
return pstate ;
627
580
}
628
581
@@ -983,16 +936,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
983
936
}
984
937
else
985
938
exit_horribly (modulename ,
986
- "invalid message received from worker: %s\n" , msg );
987
- }
988
- else if (messageStartsWith (msg , "ERROR " ))
989
- {
990
- Assert (AH -> format == archDirectory || AH -> format == archCustom );
991
- pstate -> parallelSlot [worker ].workerStatus = WRKR_TERMINATED ;
992
- exit_horribly (modulename , "%s" , msg + strlen ("ERROR " ));
939
+ "invalid message received from worker: \"%s\"\n" ,
940
+ msg );
993
941
}
994
942
else
995
- exit_horribly (modulename , "invalid message received from worker: %s\n" , msg );
943
+ exit_horribly (modulename ,
944
+ "invalid message received from worker: \"%s\"\n" ,
945
+ msg );
996
946
997
947
/* both Unix and Win32 return pg_malloc()ed space, so we free it */
998
948
free (msg );
0 commit comments