Skip to content

Commit 418611c

Browse files
committed
Generalize parallel slot result handling.
Instead of having a hard-coded behavior that we ignore missing tables and report all other errors, let the caller decide what to do by setting a callback. Mark Dilger, reviewed and somewhat revised by me. The larger patch series of which this is a part has also had review from Peter Geoghegan, Andres Freund, Álvaro Herrera, Michael Paquier, and Amul Sul, but I don't know whether any of them have reviewed this bit specifically. Discussion: http://postgr.es/m/12ED3DA8-25F0-4B68-937D-D907CFBF08E7@enterprisedb.com Discussion: http://postgr.es/m/5F743835-3399-419C-8324-2D424237E999@enterprisedb.com Discussion: http://postgr.es/m/70655DF3-33CE-4527-9A4D-DDEB582B6BA0@enterprisedb.com
1 parent e955bd4 commit 418611c

File tree

4 files changed

+94
-28
lines changed

4 files changed

+94
-28
lines changed

src/bin/scripts/reindexdb.c

+1
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type,
466466
goto finish;
467467
}
468468

469+
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
469470
run_reindex_command(free_slot->connection, process_type, objname,
470471
echo, verbose, concurrently, true);
471472

src/bin/scripts/vacuumdb.c

+1
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ vacuum_one_database(const ConnParams *cparams,
713713
* Execute the vacuum. All errors are handled in processQueryResult
714714
* through ParallelSlotsGetIdle.
715715
*/
716+
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
716717
run_vacuum_command(free_slot->connection, sql.data,
717718
echo, tabname);
718719

src/fe_utils/parallel_slot.c

+63-28
Original file line numberDiff line numberDiff line change
@@ -30,42 +30,32 @@
3030

3131
static void init_slot(ParallelSlot *slot, PGconn *conn);
3232
static int select_loop(int maxFd, fd_set *workerset);
33-
static bool processQueryResult(PGconn *conn, PGresult *result);
33+
static bool processQueryResult(ParallelSlot *slot, PGresult *result);
3434

3535
static void
3636
init_slot(ParallelSlot *slot, PGconn *conn)
3737
{
3838
slot->connection = conn;
3939
/* Initially assume connection is idle */
4040
slot->isFree = true;
41+
ParallelSlotClearHandler(slot);
4142
}
4243

4344
/*
44-
* Process (and delete) a query result. Returns true if there's no error,
45-
* false otherwise -- but errors about trying to work on a missing relation
46-
* are reported and subsequently ignored.
45+
* Process (and delete) a query result. Returns true if there's no problem,
46+
* false otherwise. It's up to the handler to decide what cosntitutes a
47+
* problem.
4748
*/
4849
static bool
49-
processQueryResult(PGconn *conn, PGresult *result)
50+
processQueryResult(ParallelSlot *slot, PGresult *result)
5051
{
51-
/*
52-
* If it's an error, report it. Errors about a missing table are harmless
53-
* so we continue processing; but die for other errors.
54-
*/
55-
if (PQresultStatus(result) != PGRES_COMMAND_OK)
56-
{
57-
char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
52+
Assert(slot->handler != NULL);
5853

59-
pg_log_error("processing of database \"%s\" failed: %s",
60-
PQdb(conn), PQerrorMessage(conn));
61-
62-
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
63-
{
64-
PQclear(result);
65-
return false;
66-
}
67-
}
54+
/* On failure, the handler should return NULL after freeing the result */
55+
if (!slot->handler(result, slot->connection, slot->handler_context))
56+
return false;
6857

58+
/* Ok, we have to free it ourself */
6959
PQclear(result);
7060
return true;
7161
}
@@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result)
7666
* Note that this will block if the connection is busy.
7767
*/
7868
static bool
79-
consumeQueryResult(PGconn *conn)
69+
consumeQueryResult(ParallelSlot *slot)
8070
{
8171
bool ok = true;
8272
PGresult *result;
8373

84-
SetCancelConn(conn);
85-
while ((result = PQgetResult(conn)) != NULL)
74+
SetCancelConn(slot->connection);
75+
while ((result = PQgetResult(slot->connection)) != NULL)
8676
{
87-
if (!processQueryResult(conn, result))
77+
if (!processQueryResult(slot, result))
8878
ok = false;
8979
}
9080
ResetCancelConn();
@@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
227217

228218
if (result != NULL)
229219
{
230-
/* Check and discard the command result */
231-
if (!processQueryResult(slots[i].connection, result))
220+
/* Handle and discard the command result */
221+
if (!processQueryResult(slots + i, result))
232222
return NULL;
233223
}
234224
else
235225
{
236226
/* This connection has become idle */
237227
slots[i].isFree = true;
228+
ParallelSlotClearHandler(slots + i);
238229
if (firstFree < 0)
239230
firstFree = i;
240231
break;
@@ -329,8 +320,52 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
329320

330321
for (i = 0; i < numslots; i++)
331322
{
332-
if (!consumeQueryResult((slots + i)->connection))
323+
if (!consumeQueryResult(slots + i))
324+
return false;
325+
}
326+
327+
return true;
328+
}
329+
330+
/*
331+
* TableCommandResultHandler
332+
*
333+
* ParallelSlotResultHandler for results of commands (not queries) against
334+
* tables.
335+
*
336+
* Requires that the result status is either PGRES_COMMAND_OK or an error about
337+
* a missing table. This is useful for utilities that compile a list of tables
338+
* to process and then run commands (vacuum, reindex, or whatever) against
339+
* those tables, as there is a race condition between the time the list is
340+
* compiled and the time the command attempts to open the table.
341+
*
342+
* For missing tables, logs an error but allows processing to continue.
343+
*
344+
* For all other errors, logs an error and terminates further processing.
345+
*
346+
* res: PGresult from the query executed on the slot's connection
347+
* conn: connection belonging to the slot
348+
* context: unused
349+
*/
350+
bool
351+
TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
352+
{
353+
/*
354+
* If it's an error, report it. Errors about a missing table are harmless
355+
* so we continue processing; but die for other errors.
356+
*/
357+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
358+
{
359+
char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
360+
361+
pg_log_error("processing of database \"%s\" failed: %s",
362+
PQdb(conn), PQerrorMessage(conn));
363+
364+
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
365+
{
366+
PQclear(res);
333367
return false;
368+
}
334369
}
335370

336371
return true;

src/include/fe_utils/parallel_slot.h

+29
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,39 @@
1515
#include "fe_utils/connect_utils.h"
1616
#include "libpq-fe.h"
1717

18+
typedef bool (*ParallelSlotResultHandler) (PGresult *res, PGconn *conn,
19+
void *context);
20+
1821
typedef struct ParallelSlot
1922
{
2023
PGconn *connection; /* One connection */
2124
bool isFree; /* Is it known to be idle? */
25+
26+
/*
27+
* Prior to issuing a command or query on 'connection', a handler callback
28+
* function may optionally be registered to be invoked to process the
29+
* results, and context information may optionally be registered for use
30+
* by the handler. If unset, these fields should be NULL.
31+
*/
32+
ParallelSlotResultHandler handler;
33+
void *handler_context;
2234
} ParallelSlot;
2335

36+
static inline void
37+
ParallelSlotSetHandler(ParallelSlot *slot, ParallelSlotResultHandler handler,
38+
void *context)
39+
{
40+
slot->handler = handler;
41+
slot->handler_context = context;
42+
}
43+
44+
static inline void
45+
ParallelSlotClearHandler(ParallelSlot *slot)
46+
{
47+
slot->handler = NULL;
48+
slot->handler_context = NULL;
49+
}
50+
2451
extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots);
2552

2653
extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams,
@@ -31,5 +58,7 @@ extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots);
3158

3259
extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots);
3360

61+
extern bool TableCommandResultHandler(PGresult *res, PGconn *conn,
62+
void *context);
3463

3564
#endif /* PARALLEL_SLOT_H */

0 commit comments

Comments
 (0)