35
35
* the required action (dump or restore) and returns a malloc'd status string.
36
36
* The status string is passed back to the master where it is interpreted by
37
37
* AH->MasterEndParallelItemPtr, another format-specific routine. That
38
- * function can update state or catalog information on the master's side,
38
+ * function can update format-specific information on the master's side,
39
39
* depending on the reply from the worker process. In the end it returns a
40
- * status code, which is 0 for successful execution.
40
+ * status code, which we pass to the ParallelCompletionPtr callback function
41
+ * that was passed to DispatchJobForTocEntry(). The callback function does
42
+ * state updating for the master control logic in pg_backup_archiver.c.
41
43
*
42
44
* Remember that we have forked off the workers only after we have read in
43
45
* the catalog. That's why our worker processes can also access the catalog
48
50
* In the master process, the workerStatus field for each worker has one of
49
51
* the following values:
50
52
* WRKR_IDLE: it's waiting for a command
51
- * WRKR_WORKING: it's been sent a command
52
- * WRKR_FINISHED: it's returned a result
53
+ * WRKR_WORKING: it's working on a command
53
54
* WRKR_TERMINATED: process ended
54
- * The FINISHED state indicates that the worker is idle, but we've not yet
55
- * dealt with the status code it returned from the prior command.
56
- * ReapWorkerStatus() extracts the unhandled command status value and sets
57
- * the workerStatus back to WRKR_IDLE.
58
55
*/
59
56
60
57
#include "postgres_fe.h"
79
76
#define PIPE_READ 0
80
77
#define PIPE_WRITE 1
81
78
79
+ #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
80
+
82
81
#ifdef WIN32
83
82
84
83
/*
@@ -175,9 +174,12 @@ static void setup_cancel_handler(void);
175
174
static void set_cancel_pstate (ParallelState * pstate );
176
175
static void set_cancel_slot_archive (ParallelSlot * slot , ArchiveHandle * AH );
177
176
static void RunWorker (ArchiveHandle * AH , ParallelSlot * slot );
177
+ static int GetIdleWorker (ParallelState * pstate );
178
178
static bool HasEveryWorkerTerminated (ParallelState * pstate );
179
179
static void lockTableForWorker (ArchiveHandle * AH , TocEntry * te );
180
180
static void WaitForCommands (ArchiveHandle * AH , int pipefd [2 ]);
181
+ static bool ListenToWorkers (ArchiveHandle * AH , ParallelState * pstate ,
182
+ bool do_wait );
181
183
static char * getMessageFromMaster (int pipefd [2 ]);
182
184
static void sendMessageToMaster (int pipefd [2 ], const char * str );
183
185
static int select_loop (int maxFd , fd_set * workerset );
@@ -349,8 +351,8 @@ archive_close_connection(int code, void *arg)
349
351
* fail to detect it because there would be no EOF condition on
350
352
* the other end of the pipe.)
351
353
*/
352
- if (slot -> args -> AH )
353
- DisconnectDatabase (& (slot -> args -> AH -> public ));
354
+ if (slot -> AH )
355
+ DisconnectDatabase (& (slot -> AH -> public ));
354
356
355
357
#ifdef WIN32
356
358
closesocket (slot -> pipeRevRead );
@@ -407,7 +409,7 @@ ShutdownWorkersHard(ParallelState *pstate)
407
409
EnterCriticalSection (& signal_info_lock );
408
410
for (i = 0 ; i < pstate -> numWorkers ; i ++ )
409
411
{
410
- ArchiveHandle * AH = pstate -> parallelSlot [i ].args -> AH ;
412
+ ArchiveHandle * AH = pstate -> parallelSlot [i ].AH ;
411
413
char errbuf [1 ];
412
414
413
415
if (AH != NULL && AH -> connCancel != NULL )
@@ -634,7 +636,7 @@ consoleHandler(DWORD dwCtrlType)
634
636
for (i = 0 ; i < signal_info .pstate -> numWorkers ; i ++ )
635
637
{
636
638
ParallelSlot * slot = & (signal_info .pstate -> parallelSlot [i ]);
637
- ArchiveHandle * AH = slot -> args -> AH ;
639
+ ArchiveHandle * AH = slot -> AH ;
638
640
HANDLE hThread = (HANDLE ) slot -> hThread ;
639
641
640
642
/*
@@ -789,7 +791,7 @@ set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
789
791
EnterCriticalSection (& signal_info_lock );
790
792
#endif
791
793
792
- slot -> args -> AH = AH ;
794
+ slot -> AH = AH ;
793
795
794
796
#ifdef WIN32
795
797
LeaveCriticalSection (& signal_info_lock );
@@ -935,9 +937,10 @@ ParallelBackupStart(ArchiveHandle *AH)
935
937
strerror (errno ));
936
938
937
939
slot -> workerStatus = WRKR_IDLE ;
938
- slot -> args = (ParallelArgs * ) pg_malloc (sizeof (ParallelArgs ));
939
- slot -> args -> AH = NULL ;
940
- slot -> args -> te = NULL ;
940
+ slot -> AH = NULL ;
941
+ slot -> te = NULL ;
942
+ slot -> callback = NULL ;
943
+ slot -> callback_data = NULL ;
941
944
942
945
/* master's ends of the pipes */
943
946
slot -> pipeRead = pipeWM [PIPE_READ ];
@@ -1071,20 +1074,28 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1071
1074
}
1072
1075
1073
1076
/*
1074
- * Dispatch a job to some free worker (caller must ensure there is one!)
1077
+ * Dispatch a job to some free worker.
1075
1078
*
1076
1079
* te is the TocEntry to be processed, act is the action to be taken on it.
1080
+ * callback is the function to call on completion of the job.
1081
+ *
1082
+ * If no worker is currently available, this will block, and previously
1083
+ * registered callback functions may be called.
1077
1084
*/
1078
1085
void
1079
- DispatchJobForTocEntry (ArchiveHandle * AH , ParallelState * pstate , TocEntry * te ,
1080
- T_Action act )
1086
+ DispatchJobForTocEntry (ArchiveHandle * AH ,
1087
+ ParallelState * pstate ,
1088
+ TocEntry * te ,
1089
+ T_Action act ,
1090
+ ParallelCompletionPtr callback ,
1091
+ void * callback_data )
1081
1092
{
1082
1093
int worker ;
1083
1094
char * arg ;
1084
1095
1085
- /* our caller makes sure that at least one worker is idle */
1086
- worker = GetIdleWorker (pstate );
1087
- Assert ( worker != NO_SLOT );
1096
+ /* Get a worker, waiting if none are idle */
1097
+ while (( worker = GetIdleWorker (pstate )) == NO_SLOT )
1098
+ WaitForWorkers ( AH , pstate , WFW_ONE_IDLE );
1088
1099
1089
1100
/* Construct and send command string */
1090
1101
arg = (AH -> MasterStartParallelItemPtr ) (AH , te , act );
@@ -1095,14 +1106,16 @@ DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
1095
1106
1096
1107
/* Remember worker is busy, and which TocEntry it's working on */
1097
1108
pstate -> parallelSlot [worker ].workerStatus = WRKR_WORKING ;
1098
- pstate -> parallelSlot [worker ].args -> te = te ;
1109
+ pstate -> parallelSlot [worker ].te = te ;
1110
+ pstate -> parallelSlot [worker ].callback = callback ;
1111
+ pstate -> parallelSlot [worker ].callback_data = callback_data ;
1099
1112
}
1100
1113
1101
1114
/*
1102
1115
* Find an idle worker and return its slot number.
1103
1116
* Return NO_SLOT if none are idle.
1104
1117
*/
1105
- int
1118
+ static int
1106
1119
GetIdleWorker (ParallelState * pstate )
1107
1120
{
1108
1121
int i ;
@@ -1274,17 +1287,16 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1274
1287
* immediately if there is none available.
1275
1288
*
1276
1289
* When we get a status message, we let MasterEndParallelItemPtr process it,
1277
- * then save the resulting status code and switch the worker's state to
1278
- * WRKR_FINISHED. Later, caller must call ReapWorkerStatus() to verify
1279
- * that the status was "OK" and push the worker back to IDLE state.
1290
+ * then pass the resulting status code to the callback function that was
1291
+ * specified to DispatchJobForTocEntry, then reset the worker status to IDLE.
1280
1292
*
1281
- * XXX Rube Goldberg would be proud of this API, but no one else should be .
1293
+ * Returns true if we collected a status message, else false .
1282
1294
*
1283
1295
* XXX is it worth checking for more than one status message per call?
1284
1296
* It seems somewhat unlikely that multiple workers would finish at exactly
1285
1297
* the same time.
1286
1298
*/
1287
- void
1299
+ static bool
1288
1300
ListenToWorkers (ArchiveHandle * AH , ParallelState * pstate , bool do_wait )
1289
1301
{
1290
1302
int worker ;
@@ -1298,34 +1310,39 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1298
1310
/* If do_wait is true, we must have detected EOF on some socket */
1299
1311
if (do_wait )
1300
1312
exit_horribly (modulename , "a worker process died unexpectedly\n" );
1301
- return ;
1313
+ return false ;
1302
1314
}
1303
1315
1304
1316
/* Process it and update our idea of the worker's status */
1305
1317
if (messageStartsWith (msg , "OK " ))
1306
1318
{
1307
- TocEntry * te = pstate -> parallelSlot [worker ].args -> te ;
1319
+ ParallelSlot * slot = & pstate -> parallelSlot [worker ];
1320
+ TocEntry * te = slot -> te ;
1308
1321
char * statusString ;
1322
+ int status ;
1309
1323
1310
1324
if (messageStartsWith (msg , "OK RESTORE " ))
1311
1325
{
1312
1326
statusString = msg + strlen ("OK RESTORE " );
1313
- pstate -> parallelSlot [ worker ]. status =
1327
+ status =
1314
1328
(AH -> MasterEndParallelItemPtr )
1315
1329
(AH , te , statusString , ACT_RESTORE );
1330
+ slot -> callback (AH , te , status , slot -> callback_data );
1316
1331
}
1317
1332
else if (messageStartsWith (msg , "OK DUMP " ))
1318
1333
{
1319
1334
statusString = msg + strlen ("OK DUMP " );
1320
- pstate -> parallelSlot [ worker ]. status =
1335
+ status =
1321
1336
(AH -> MasterEndParallelItemPtr )
1322
1337
(AH , te , statusString , ACT_DUMP );
1338
+ slot -> callback (AH , te , status , slot -> callback_data );
1323
1339
}
1324
1340
else
1325
1341
exit_horribly (modulename ,
1326
1342
"invalid message received from worker: \"%s\"\n" ,
1327
1343
msg );
1328
- pstate -> parallelSlot [worker ].workerStatus = WRKR_FINISHED ;
1344
+ slot -> workerStatus = WRKR_IDLE ;
1345
+ slot -> te = NULL ;
1329
1346
}
1330
1347
else
1331
1348
exit_horribly (modulename ,
@@ -1334,110 +1351,79 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1334
1351
1335
1352
/* Free the string returned from getMessageFromWorker */
1336
1353
free (msg );
1337
- }
1338
-
1339
- /*
1340
- * Check to see if any worker is in WRKR_FINISHED state. If so,
1341
- * return its command status code into *status, reset it to IDLE state,
1342
- * and return its slot number. Otherwise return NO_SLOT.
1343
- *
1344
- * This function is executed in the master process.
1345
- */
1346
- int
1347
- ReapWorkerStatus (ParallelState * pstate , int * status )
1348
- {
1349
- int i ;
1350
1354
1351
- for (i = 0 ; i < pstate -> numWorkers ; i ++ )
1352
- {
1353
- if (pstate -> parallelSlot [i ].workerStatus == WRKR_FINISHED )
1354
- {
1355
- * status = pstate -> parallelSlot [i ].status ;
1356
- pstate -> parallelSlot [i ].status = 0 ;
1357
- pstate -> parallelSlot [i ].workerStatus = WRKR_IDLE ;
1358
- return i ;
1359
- }
1360
- }
1361
- return NO_SLOT ;
1355
+ return true;
1362
1356
}
1363
1357
1364
1358
/*
1365
- * Wait, if necessary, until we have at least one idle worker.
1366
- * Reap worker status as necessary to move FINISHED workers to IDLE state.
1359
+ * Check for status results from workers, waiting if necessary.
1367
1360
*
1368
- * We assume that no extra processing is required when reaping a finished
1369
- * command, except for checking that the status was OK (zero).
1370
- * Caution: that assumption means that this function can only be used in
1371
- * parallel dump, not parallel restore, because the latter has a more
1372
- * complex set of rules about handling status.
1361
+ * Available wait modes are:
1362
+ * WFW_NO_WAIT: reap any available status, but don't block
1363
+ * WFW_GOT_STATUS: wait for at least one more worker to finish
1364
+ * WFW_ONE_IDLE: wait for at least one worker to be idle
1365
+ * WFW_ALL_IDLE: wait for all workers to be idle
1366
+ *
1367
+ * Any received results are passed to MasterEndParallelItemPtr and then
1368
+ * to the callback specified to DispatchJobForTocEntry.
1373
1369
*
1374
1370
* This function is executed in the master process.
1375
1371
*/
1376
1372
void
1377
- EnsureIdleWorker (ArchiveHandle * AH , ParallelState * pstate )
1373
+ WaitForWorkers (ArchiveHandle * AH , ParallelState * pstate , WFW_WaitOption mode )
1378
1374
{
1379
- int ret_worker ;
1380
- int work_status ;
1375
+ bool do_wait = false;
1381
1376
1382
- for (;;)
1377
+ /*
1378
+ * In GOT_STATUS mode, always block waiting for a message, since we can't
1379
+ * return till we get something. In other modes, we don't block the first
1380
+ * time through the loop.
1381
+ */
1382
+ if (mode == WFW_GOT_STATUS )
1383
1383
{
1384
- int nTerm = 0 ;
1385
-
1386
- while ((ret_worker = ReapWorkerStatus (pstate , & work_status )) != NO_SLOT )
1387
- {
1388
- if (work_status != 0 )
1389
- exit_horribly (modulename , "error processing a parallel work item\n" );
1390
-
1391
- nTerm ++ ;
1392
- }
1393
-
1394
- /*
1395
- * We need to make sure that we have an idle worker before dispatching
1396
- * the next item. If nTerm > 0 we already have that (quick check).
1397
- */
1398
- if (nTerm > 0 )
1399
- return ;
1400
-
1401
- /* explicit check for an idle worker */
1402
- if (GetIdleWorker (pstate ) != NO_SLOT )
1403
- return ;
1384
+ /* Assert that caller knows what it's doing */
1385
+ Assert (!IsEveryWorkerIdle (pstate ));
1386
+ do_wait = true;
1387
+ }
1404
1388
1389
+ for (;;)
1390
+ {
1405
1391
/*
1406
- * If we have no idle worker, read the result of one or more workers
1407
- * and loop the loop to call ReapWorkerStatus() on them
1392
+ * Check for status messages, even if we don't need to block. We do
1393
+ * not try very hard to reap all available messages, though, since
1394
+ * there's unlikely to be more than one.
1408
1395
*/
1409
- ListenToWorkers (AH , pstate , true);
1410
- }
1411
- }
1412
-
1413
- /*
1414
- * Wait for all workers to be idle.
1415
- * Reap worker status as necessary to move FINISHED workers to IDLE state.
1416
- *
1417
- * We assume that no extra processing is required when reaping a finished
1418
- * command, except for checking that the status was OK (zero).
1419
- * Caution: that assumption means that this function can only be used in
1420
- * parallel dump, not parallel restore, because the latter has a more
1421
- * complex set of rules about handling status.
1422
- *
1423
- * This function is executed in the master process.
1424
- */
1425
- void
1426
- EnsureWorkersFinished (ArchiveHandle * AH , ParallelState * pstate )
1427
- {
1428
- int work_status ;
1396
+ if (ListenToWorkers (AH , pstate , do_wait ))
1397
+ {
1398
+ /*
1399
+ * If we got a message, we are done by definition for GOT_STATUS
1400
+ * mode, and we can also be certain that there's at least one idle
1401
+ * worker. So we're done in all but ALL_IDLE mode.
1402
+ */
1403
+ if (mode != WFW_ALL_IDLE )
1404
+ return ;
1405
+ }
1429
1406
1430
- if (!pstate || pstate -> numWorkers == 1 )
1431
- return ;
1407
+ /* Check whether we must wait for new status messages */
1408
+ switch (mode )
1409
+ {
1410
+ case WFW_NO_WAIT :
1411
+ return ; /* never wait */
1412
+ case WFW_GOT_STATUS :
1413
+ Assert (false); /* can't get here, because we waited */
1414
+ break ;
1415
+ case WFW_ONE_IDLE :
1416
+ if (GetIdleWorker (pstate ) != NO_SLOT )
1417
+ return ;
1418
+ break ;
1419
+ case WFW_ALL_IDLE :
1420
+ if (IsEveryWorkerIdle (pstate ))
1421
+ return ;
1422
+ break ;
1423
+ }
1432
1424
1433
- /* Waiting for the remaining worker processes to finish */
1434
- while (!IsEveryWorkerIdle (pstate ))
1435
- {
1436
- if (ReapWorkerStatus (pstate , & work_status ) == NO_SLOT )
1437
- ListenToWorkers (AH , pstate , true);
1438
- else if (work_status != 0 )
1439
- exit_horribly (modulename ,
1440
- "error processing a parallel work item\n" );
1425
+ /* Loop back, and this time wait for something to happen */
1426
+ do_wait = true;
1441
1427
}
1442
1428
}
1443
1429
0 commit comments