30
30
31
31
static void init_slot (ParallelSlot * slot , PGconn * conn );
32
32
static int select_loop (int maxFd , fd_set * workerset );
33
- static bool processQueryResult (PGconn * conn , PGresult * result );
33
+ static bool processQueryResult (ParallelSlot * slot , PGresult * result );
34
34
35
35
static void
36
36
init_slot (ParallelSlot * slot , PGconn * conn )
37
37
{
38
38
slot -> connection = conn ;
39
39
/* Initially assume connection is idle */
40
40
slot -> isFree = true;
41
+ ParallelSlotClearHandler (slot );
41
42
}
42
43
43
44
/*
44
- * Process (and delete) a query result. Returns true if there's no error ,
45
- * false otherwise -- but errors about trying to work on a missing relation
46
- * are reported and subsequently ignored .
45
+ * Process (and delete) a query result. Returns true if there's no problem ,
46
+ * false otherwise. It's up to the handler to decide what cosntitutes a
47
+ * problem .
47
48
*/
48
49
static bool
49
- processQueryResult (PGconn * conn , PGresult * result )
50
+ processQueryResult (ParallelSlot * slot , PGresult * result )
50
51
{
51
- /*
52
- * If it's an error, report it. Errors about a missing table are harmless
53
- * so we continue processing; but die for other errors.
54
- */
55
- if (PQresultStatus (result ) != PGRES_COMMAND_OK )
56
- {
57
- char * sqlState = PQresultErrorField (result , PG_DIAG_SQLSTATE );
52
+ Assert (slot -> handler != NULL );
58
53
59
- pg_log_error ("processing of database \"%s\" failed: %s" ,
60
- PQdb (conn ), PQerrorMessage (conn ));
61
-
62
- if (sqlState && strcmp (sqlState , ERRCODE_UNDEFINED_TABLE ) != 0 )
63
- {
64
- PQclear (result );
65
- return false;
66
- }
67
- }
54
+ /* On failure, the handler should return NULL after freeing the result */
55
+ if (!slot -> handler (result , slot -> connection , slot -> handler_context ))
56
+ return false;
68
57
58
+ /* Ok, we have to free it ourself */
69
59
PQclear (result );
70
60
return true;
71
61
}
@@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result)
76
66
* Note that this will block if the connection is busy.
77
67
*/
78
68
static bool
79
- consumeQueryResult (PGconn * conn )
69
+ consumeQueryResult (ParallelSlot * slot )
80
70
{
81
71
bool ok = true;
82
72
PGresult * result ;
83
73
84
- SetCancelConn (conn );
85
- while ((result = PQgetResult (conn )) != NULL )
74
+ SetCancelConn (slot -> connection );
75
+ while ((result = PQgetResult (slot -> connection )) != NULL )
86
76
{
87
- if (!processQueryResult (conn , result ))
77
+ if (!processQueryResult (slot , result ))
88
78
ok = false;
89
79
}
90
80
ResetCancelConn ();
@@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
227
217
228
218
if (result != NULL )
229
219
{
230
- /* Check and discard the command result */
231
- if (!processQueryResult (slots [ i ]. connection , result ))
220
+ /* Handle and discard the command result */
221
+ if (!processQueryResult (slots + i , result ))
232
222
return NULL ;
233
223
}
234
224
else
235
225
{
236
226
/* This connection has become idle */
237
227
slots [i ].isFree = true;
228
+ ParallelSlotClearHandler (slots + i );
238
229
if (firstFree < 0 )
239
230
firstFree = i ;
240
231
break ;
@@ -329,8 +320,52 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
329
320
330
321
for (i = 0 ; i < numslots ; i ++ )
331
322
{
332
- if (!consumeQueryResult ((slots + i )-> connection ))
323
+ if (!consumeQueryResult (slots + i ))
324
+ return false;
325
+ }
326
+
327
+ return true;
328
+ }
329
+
330
+ /*
331
+ * TableCommandResultHandler
332
+ *
333
+ * ParallelSlotResultHandler for results of commands (not queries) against
334
+ * tables.
335
+ *
336
+ * Requires that the result status is either PGRES_COMMAND_OK or an error about
337
+ * a missing table. This is useful for utilities that compile a list of tables
338
+ * to process and then run commands (vacuum, reindex, or whatever) against
339
+ * those tables, as there is a race condition between the time the list is
340
+ * compiled and the time the command attempts to open the table.
341
+ *
342
+ * For missing tables, logs an error but allows processing to continue.
343
+ *
344
+ * For all other errors, logs an error and terminates further processing.
345
+ *
346
+ * res: PGresult from the query executed on the slot's connection
347
+ * conn: connection belonging to the slot
348
+ * context: unused
349
+ */
350
+ bool
351
+ TableCommandResultHandler (PGresult * res , PGconn * conn , void * context )
352
+ {
353
+ /*
354
+ * If it's an error, report it. Errors about a missing table are harmless
355
+ * so we continue processing; but die for other errors.
356
+ */
357
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
358
+ {
359
+ char * sqlState = PQresultErrorField (res , PG_DIAG_SQLSTATE );
360
+
361
+ pg_log_error ("processing of database \"%s\" failed: %s" ,
362
+ PQdb (conn ), PQerrorMessage (conn ));
363
+
364
+ if (sqlState && strcmp (sqlState , ERRCODE_UNDEFINED_TABLE ) != 0 )
365
+ {
366
+ PQclear (res );
333
367
return false;
368
+ }
334
369
}
335
370
336
371
return true;
0 commit comments