Skip to content

Commit 82a8f0f

Browse files
committed
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation and allowing DROP DATABASE (of a database not involved in the query). Apart from explicit dblink_cancel_query() calls, dblink still doesn't cancel the remote side. The replacement for the blocking calls consists of new, general-purpose query execution wrappers in the libpqsrv facility. Out-of-tree extensions should adopt these. The original commit d3c5f37 did not back-patch. Back-patch now to v16-v13, bringing coverage to all supported versions. This back-patch omits the orignal's refactoring in postgres_fdw. Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
1 parent 5fb9f96 commit 82a8f0f

File tree

3 files changed

+143
-17
lines changed

3 files changed

+143
-17
lines changed

contrib/dblink/dblink.c

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#include "utils/memutils.h"
6262
#include "utils/rel.h"
6363
#include "utils/varlena.h"
64+
#include "utils/wait_event.h"
6465

6566
PG_MODULE_MAGIC;
6667

@@ -430,7 +431,7 @@ dblink_open(PG_FUNCTION_ARGS)
430431
/* If we are not in a transaction, start one */
431432
if (PQtransactionStatus(conn) == PQTRANS_IDLE)
432433
{
433-
res = PQexec(conn, "BEGIN");
434+
res = libpqsrv_exec(conn, "BEGIN", PG_WAIT_EXTENSION);
434435
if (PQresultStatus(res) != PGRES_COMMAND_OK)
435436
dblink_res_internalerror(conn, res, "begin error");
436437
PQclear(res);
@@ -449,7 +450,7 @@ dblink_open(PG_FUNCTION_ARGS)
449450
(rconn->openCursorCount)++;
450451

451452
appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
452-
res = PQexec(conn, buf.data);
453+
res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
453454
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
454455
{
455456
dblink_res_error(conn, conname, res, fail,
@@ -518,7 +519,7 @@ dblink_close(PG_FUNCTION_ARGS)
518519
appendStringInfo(&buf, "CLOSE %s", curname);
519520

520521
/* close the cursor */
521-
res = PQexec(conn, buf.data);
522+
res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
522523
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
523524
{
524525
dblink_res_error(conn, conname, res, fail,
@@ -538,7 +539,7 @@ dblink_close(PG_FUNCTION_ARGS)
538539
{
539540
rconn->newXactForCursor = false;
540541

541-
res = PQexec(conn, "COMMIT");
542+
res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION);
542543
if (PQresultStatus(res) != PGRES_COMMAND_OK)
543544
dblink_res_internalerror(conn, res, "commit error");
544545
PQclear(res);
@@ -620,7 +621,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
620621
* PGresult will be long-lived even though we are still in a short-lived
621622
* memory context.
622623
*/
623-
res = PQexec(conn, buf.data);
624+
res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
624625
if (!res ||
625626
(PQresultStatus(res) != PGRES_COMMAND_OK &&
626627
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -768,7 +769,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
768769
else
769770
{
770771
/* async result retrieval, do it the old way */
771-
PGresult *res = PQgetResult(conn);
772+
PGresult *res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
772773

773774
/* NULL means we're all done with the async results */
774775
if (res)
@@ -1076,7 +1077,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
10761077
PQclear(sinfo.last_res);
10771078
PQclear(sinfo.cur_res);
10781079
/* and clear out any pending data in libpq */
1079-
while ((res = PQgetResult(conn)) != NULL)
1080+
while ((res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION)) !=
1081+
NULL)
10801082
PQclear(res);
10811083
PG_RE_THROW();
10821084
}
@@ -1103,7 +1105,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
11031105
{
11041106
CHECK_FOR_INTERRUPTS();
11051107

1106-
sinfo->cur_res = PQgetResult(conn);
1108+
sinfo->cur_res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
11071109
if (!sinfo->cur_res)
11081110
break;
11091111

@@ -1431,7 +1433,7 @@ dblink_exec(PG_FUNCTION_ARGS)
14311433
if (!conn)
14321434
dblink_conn_not_avail(conname);
14331435

1434-
res = PQexec(conn, sql);
1436+
res = libpqsrv_exec(conn, sql, PG_WAIT_EXTENSION);
14351437
if (!res ||
14361438
(PQresultStatus(res) != PGRES_COMMAND_OK &&
14371439
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2728,8 +2730,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
27282730

27292731
/*
27302732
* If we don't get a message from the PGresult, try the PGconn. This is
2731-
* needed because for connection-level failures, PQexec may just return
2732-
* NULL, not a PGresult at all.
2733+
* needed because for connection-level failures, PQgetResult may just
2734+
* return NULL, not a PGresult at all.
27332735
*/
27342736
if (message_primary == NULL)
27352737
message_primary = pchomp(PQerrorMessage(conn));

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -705,12 +705,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
705705
* Send a query and wait for the results by using the asynchronous libpq
706706
* functions and socket readiness events.
707707
*
708-
* We must not use the regular blocking libpq functions like PQexec()
709-
* since they are uninterruptible by signals on some platforms, such as
710-
* Windows.
711-
*
712-
* The function is modeled on PQexec() in libpq, but only implements
713-
* those parts that are in use in the walreceiver api.
708+
* The function is modeled on libpqsrv_exec(), with the behavior difference
709+
* being that it calls ProcessWalRcvInterrupts(). As an optimization, it
710+
* skips try/catch, since all errors terminate the process.
714711
*
715712
* May return NULL, rather than an error result, on failure.
716713
*/

src/include/libpq/libpq-be-fe-helpers.h

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949

5050
static inline void libpqsrv_connect_prepare(void);
5151
static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
52+
static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
53+
static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
5254

5355

5456
/*
@@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
239241
PG_END_TRY();
240242
}
241243

244+
/*
245+
* PQexec() wrapper that processes interrupts.
246+
*
247+
* Unless PQsetnonblocking(conn, 1) is in effect, this can't process
248+
* interrupts while pushing the query text to the server. Consider that
249+
* setting if query strings can be long relative to TCP buffer size.
250+
*
251+
* This has the preconditions of PQsendQuery(), not those of PQexec(). Most
252+
* notably, PQexec() would silently discard any prior query results.
253+
*/
254+
static inline PGresult *
255+
libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
256+
{
257+
if (!PQsendQuery(conn, query))
258+
return NULL;
259+
return libpqsrv_get_result_last(conn, wait_event_info);
260+
}
261+
262+
/*
263+
* PQexecParams() wrapper that processes interrupts.
264+
*
265+
* See notes at libpqsrv_exec().
266+
*/
267+
static inline PGresult *
268+
libpqsrv_exec_params(PGconn *conn,
269+
const char *command,
270+
int nParams,
271+
const Oid *paramTypes,
272+
const char *const *paramValues,
273+
const int *paramLengths,
274+
const int *paramFormats,
275+
int resultFormat,
276+
uint32 wait_event_info)
277+
{
278+
if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
279+
paramLengths, paramFormats, resultFormat))
280+
return NULL;
281+
return libpqsrv_get_result_last(conn, wait_event_info);
282+
}
283+
284+
/*
285+
* Like PQexec(), loop over PQgetResult() until it returns NULL or another
286+
* terminal state. Return the last non-NULL result or the terminal state.
287+
*/
288+
static inline PGresult *
289+
libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
290+
{
291+
PGresult *volatile lastResult = NULL;
292+
293+
/* In what follows, do not leak any PGresults on an error. */
294+
PG_TRY();
295+
{
296+
for (;;)
297+
{
298+
/* Wait for, and collect, the next PGresult. */
299+
PGresult *result;
300+
301+
result = libpqsrv_get_result(conn, wait_event_info);
302+
if (result == NULL)
303+
break; /* query is complete, or failure */
304+
305+
/*
306+
* Emulate PQexec()'s behavior of returning the last result when
307+
* there are many.
308+
*/
309+
PQclear(lastResult);
310+
lastResult = result;
311+
312+
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
313+
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
314+
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
315+
PQstatus(conn) == CONNECTION_BAD)
316+
break;
317+
}
318+
}
319+
PG_CATCH();
320+
{
321+
PQclear(lastResult);
322+
PG_RE_THROW();
323+
}
324+
PG_END_TRY();
325+
326+
return lastResult;
327+
}
328+
329+
/*
330+
* Perform the equivalent of PQgetResult(), but watch for interrupts.
331+
*/
332+
static inline PGresult *
333+
libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
334+
{
335+
/*
336+
* Collect data until PQgetResult is ready to get the result without
337+
* blocking.
338+
*/
339+
while (PQisBusy(conn))
340+
{
341+
int rc;
342+
343+
rc = WaitLatchOrSocket(MyLatch,
344+
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
345+
WL_SOCKET_READABLE,
346+
PQsocket(conn),
347+
0,
348+
wait_event_info);
349+
350+
/* Interrupted? */
351+
if (rc & WL_LATCH_SET)
352+
{
353+
ResetLatch(MyLatch);
354+
CHECK_FOR_INTERRUPTS();
355+
}
356+
357+
/* Consume whatever data is available from the socket */
358+
if (PQconsumeInput(conn) == 0)
359+
{
360+
/* trouble; expect PQgetResult() to return NULL */
361+
break;
362+
}
363+
}
364+
365+
/* Now we can collect and return the next PGresult */
366+
return PQgetResult(conn);
367+
}
368+
242369
#endif /* LIBPQ_BE_FE_HELPERS_H */

0 commit comments

Comments
 (0)