Skip to content

Commit 63f6ecb

Browse files
committed
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation and allowing DROP DATABASE (of a database not involved in the query). Apart from explicit dblink_cancel_query() calls, dblink still doesn't cancel the remote side. The replacement for the blocking calls consists of new, general-purpose query execution wrappers in the libpqsrv facility. Out-of-tree extensions should adopt these. The original commit d3c5f37 did not back-patch. Back-patch now to v16-v13, bringing coverage to all supported versions. This back-patch omits the orignal's refactoring in postgres_fdw. Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
1 parent 9e129a2 commit 63f6ecb

File tree

3 files changed

+144
-17
lines changed

3 files changed

+144
-17
lines changed

contrib/dblink/dblink.c

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include "funcapi.h"
4949
#include "lib/stringinfo.h"
5050
#include "libpq-fe.h"
51+
#include "libpq/libpq-be-fe-helpers.h"
5152
#include "mb/pg_wchar.h"
5253
#include "miscadmin.h"
5354
#include "parser/scansup.h"
@@ -59,6 +60,7 @@
5960
#include "utils/memutils.h"
6061
#include "utils/rel.h"
6162
#include "utils/varlena.h"
63+
#include "utils/wait_event.h"
6264

6365
PG_MODULE_MAGIC;
6466

@@ -478,7 +480,7 @@ dblink_open(PG_FUNCTION_ARGS)
478480
/* If we are not in a transaction, start one */
479481
if (PQtransactionStatus(conn) == PQTRANS_IDLE)
480482
{
481-
res = PQexec(conn, "BEGIN");
483+
res = libpqsrv_exec(conn, "BEGIN", PG_WAIT_EXTENSION);
482484
if (PQresultStatus(res) != PGRES_COMMAND_OK)
483485
dblink_res_internalerror(conn, res, "begin error");
484486
PQclear(res);
@@ -497,7 +499,7 @@ dblink_open(PG_FUNCTION_ARGS)
497499
(rconn->openCursorCount)++;
498500

499501
appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
500-
res = PQexec(conn, buf.data);
502+
res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
501503
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
502504
{
503505
dblink_res_error(conn, conname, res, fail,
@@ -566,7 +568,7 @@ dblink_close(PG_FUNCTION_ARGS)
566568
appendStringInfo(&buf, "CLOSE %s", curname);
567569

568570
/* close the cursor */
569-
res = PQexec(conn, buf.data);
571+
res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
570572
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
571573
{
572574
dblink_res_error(conn, conname, res, fail,
@@ -586,7 +588,7 @@ dblink_close(PG_FUNCTION_ARGS)
586588
{
587589
rconn->newXactForCursor = false;
588590

589-
res = PQexec(conn, "COMMIT");
591+
res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION);
590592
if (PQresultStatus(res) != PGRES_COMMAND_OK)
591593
dblink_res_internalerror(conn, res, "commit error");
592594
PQclear(res);
@@ -668,7 +670,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
668670
* PGresult will be long-lived even though we are still in a short-lived
669671
* memory context.
670672
*/
671-
res = PQexec(conn, buf.data);
673+
res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
672674
if (!res ||
673675
(PQresultStatus(res) != PGRES_COMMAND_OK &&
674676
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -816,7 +818,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
816818
else
817819
{
818820
/* async result retrieval, do it the old way */
819-
PGresult *res = PQgetResult(conn);
821+
PGresult *res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
820822

821823
/* NULL means we're all done with the async results */
822824
if (res)
@@ -1127,7 +1129,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
11271129
PQclear(sinfo.last_res);
11281130
PQclear(sinfo.cur_res);
11291131
/* and clear out any pending data in libpq */
1130-
while ((res = PQgetResult(conn)) != NULL)
1132+
while ((res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION)) !=
1133+
NULL)
11311134
PQclear(res);
11321135
PG_RE_THROW();
11331136
}
@@ -1154,7 +1157,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
11541157
{
11551158
CHECK_FOR_INTERRUPTS();
11561159

1157-
sinfo->cur_res = PQgetResult(conn);
1160+
sinfo->cur_res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
11581161
if (!sinfo->cur_res)
11591162
break;
11601163

@@ -1482,7 +1485,7 @@ dblink_exec(PG_FUNCTION_ARGS)
14821485
if (!conn)
14831486
dblink_conn_not_avail(conname);
14841487

1485-
res = PQexec(conn, sql);
1488+
res = libpqsrv_exec(conn, sql, PG_WAIT_EXTENSION);
14861489
if (!res ||
14871490
(PQresultStatus(res) != PGRES_COMMAND_OK &&
14881491
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2744,8 +2747,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
27442747

27452748
/*
27462749
* If we don't get a message from the PGresult, try the PGconn. This is
2747-
* needed because for connection-level failures, PQexec may just return
2748-
* NULL, not a PGresult at all.
2750+
* needed because for connection-level failures, PQgetResult may just
2751+
* return NULL, not a PGresult at all.
27492752
*/
27502753
if (message_primary == NULL)
27512754
message_primary = pchomp(PQerrorMessage(conn));

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -639,12 +639,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
639639
* Send a query and wait for the results by using the asynchronous libpq
640640
* functions and socket readiness events.
641641
*
642-
* We must not use the regular blocking libpq functions like PQexec()
643-
* since they are uninterruptible by signals on some platforms, such as
644-
* Windows.
645-
*
646-
* The function is modeled on PQexec() in libpq, but only implements
647-
* those parts that are in use in the walreceiver api.
642+
* The function is modeled on libpqsrv_exec(), with the behavior difference
643+
* being that it calls ProcessWalRcvInterrupts(). As an optimization, it
644+
* skips try/catch, since all errors terminate the process.
648645
*
649646
* May return NULL, rather than an error result, on failure.
650647
*/

src/include/libpq/libpq-be-fe-helpers.h

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949

5050
static inline void libpqsrv_connect_prepare(void);
5151
static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
52+
static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
53+
static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
5254

5355

5456
/*
@@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
239241
PG_END_TRY();
240242
}
241243

244+
/*
245+
* PQexec() wrapper that processes interrupts.
246+
*
247+
* Unless PQsetnonblocking(conn, 1) is in effect, this can't process
248+
* interrupts while pushing the query text to the server. Consider that
249+
* setting if query strings can be long relative to TCP buffer size.
250+
*
251+
* This has the preconditions of PQsendQuery(), not those of PQexec(). Most
252+
* notably, PQexec() would silently discard any prior query results.
253+
*/
254+
static inline PGresult *
255+
libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
256+
{
257+
if (!PQsendQuery(conn, query))
258+
return NULL;
259+
return libpqsrv_get_result_last(conn, wait_event_info);
260+
}
261+
262+
/*
263+
* PQexecParams() wrapper that processes interrupts.
264+
*
265+
* See notes at libpqsrv_exec().
266+
*/
267+
static inline PGresult *
268+
libpqsrv_exec_params(PGconn *conn,
269+
const char *command,
270+
int nParams,
271+
const Oid *paramTypes,
272+
const char *const *paramValues,
273+
const int *paramLengths,
274+
const int *paramFormats,
275+
int resultFormat,
276+
uint32 wait_event_info)
277+
{
278+
if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
279+
paramLengths, paramFormats, resultFormat))
280+
return NULL;
281+
return libpqsrv_get_result_last(conn, wait_event_info);
282+
}
283+
284+
/*
285+
* Like PQexec(), loop over PQgetResult() until it returns NULL or another
286+
* terminal state. Return the last non-NULL result or the terminal state.
287+
*/
288+
static inline PGresult *
289+
libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
290+
{
291+
PGresult *volatile lastResult = NULL;
292+
293+
/* In what follows, do not leak any PGresults on an error. */
294+
PG_TRY();
295+
{
296+
for (;;)
297+
{
298+
/* Wait for, and collect, the next PGresult. */
299+
PGresult *result;
300+
301+
result = libpqsrv_get_result(conn, wait_event_info);
302+
if (result == NULL)
303+
break; /* query is complete, or failure */
304+
305+
/*
306+
* Emulate PQexec()'s behavior of returning the last result when
307+
* there are many.
308+
*/
309+
PQclear(lastResult);
310+
lastResult = result;
311+
312+
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
313+
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
314+
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
315+
PQstatus(conn) == CONNECTION_BAD)
316+
break;
317+
}
318+
}
319+
PG_CATCH();
320+
{
321+
PQclear(lastResult);
322+
PG_RE_THROW();
323+
}
324+
PG_END_TRY();
325+
326+
return lastResult;
327+
}
328+
329+
/*
330+
* Perform the equivalent of PQgetResult(), but watch for interrupts.
331+
*/
332+
static inline PGresult *
333+
libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
334+
{
335+
/*
336+
* Collect data until PQgetResult is ready to get the result without
337+
* blocking.
338+
*/
339+
while (PQisBusy(conn))
340+
{
341+
int rc;
342+
343+
rc = WaitLatchOrSocket(MyLatch,
344+
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
345+
WL_SOCKET_READABLE,
346+
PQsocket(conn),
347+
0,
348+
wait_event_info);
349+
350+
/* Interrupted? */
351+
if (rc & WL_LATCH_SET)
352+
{
353+
ResetLatch(MyLatch);
354+
CHECK_FOR_INTERRUPTS();
355+
}
356+
357+
/* Consume whatever data is available from the socket */
358+
if (PQconsumeInput(conn) == 0)
359+
{
360+
/* trouble; expect PQgetResult() to return NULL */
361+
break;
362+
}
363+
}
364+
365+
/* Now we can collect and return the next PGresult */
366+
return PQgetResult(conn);
367+
}
368+
242369
#endif /* LIBPQ_BE_FE_HELPERS_H */

0 commit comments

Comments
 (0)