Skip to content

Commit 186c586

Browse files
committed
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation and allowing DROP DATABASE (of a database not involved in the query). Apart from explicit dblink_cancel_query() calls, dblink still doesn't cancel the remote side. The replacement for the blocking calls consists of new, general-purpose query execution wrappers in the libpqsrv facility. Out-of-tree extensions should adopt these. The original commit d3c5f37 did not back-patch. Back-patch now to v16-v13, bringing coverage to all supported versions. This back-patch omits the orignal's refactoring in postgres_fdw. Discussion: https://postgr.es/m/20231122012945.74@rfd.leadboat.com
1 parent 5a3d5c0 commit 186c586

File tree

3 files changed

+144
-17
lines changed

3 files changed

+144
-17
lines changed

contrib/dblink/dblink.c

+14-11
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,11 @@
4949
#include "funcapi.h"
5050
#include "lib/stringinfo.h"
5151
#include "libpq-fe.h"
52+
#include "libpq/libpq-be-fe-helpers.h"
5253
#include "mb/pg_wchar.h"
5354
#include "miscadmin.h"
5455
#include "parser/scansup.h"
56+
#include "pgstat.h"
5557
#include "utils/acl.h"
5658
#include "utils/builtins.h"
5759
#include "utils/fmgroids.h"
@@ -479,7 +481,7 @@ dblink_open(PG_FUNCTION_ARGS)
479481
/* If we are not in a transaction, start one */
480482
if (PQtransactionStatus(conn) == PQTRANS_IDLE)
481483
{
482-
res = PQexec(conn, "BEGIN");
484+
res = libpqsrv_exec(conn, "BEGIN", PG_WAIT_EXTENSION);
483485
if (PQresultStatus(res) != PGRES_COMMAND_OK)
484486
dblink_res_internalerror(conn, res, "begin error");
485487
PQclear(res);
@@ -498,7 +500,7 @@ dblink_open(PG_FUNCTION_ARGS)
498500
(rconn->openCursorCount)++;
499501

500502
appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
501-
res = PQexec(conn, buf.data);
503+
res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
502504
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
503505
{
504506
dblink_res_error(conn, conname, res, fail,
@@ -567,7 +569,7 @@ dblink_close(PG_FUNCTION_ARGS)
567569
appendStringInfo(&buf, "CLOSE %s", curname);
568570

569571
/* close the cursor */
570-
res = PQexec(conn, buf.data);
572+
res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
571573
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
572574
{
573575
dblink_res_error(conn, conname, res, fail,
@@ -587,7 +589,7 @@ dblink_close(PG_FUNCTION_ARGS)
587589
{
588590
rconn->newXactForCursor = false;
589591

590-
res = PQexec(conn, "COMMIT");
592+
res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION);
591593
if (PQresultStatus(res) != PGRES_COMMAND_OK)
592594
dblink_res_internalerror(conn, res, "commit error");
593595
PQclear(res);
@@ -669,7 +671,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
669671
* PGresult will be long-lived even though we are still in a short-lived
670672
* memory context.
671673
*/
672-
res = PQexec(conn, buf.data);
674+
res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
673675
if (!res ||
674676
(PQresultStatus(res) != PGRES_COMMAND_OK &&
675677
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -817,7 +819,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
817819
else
818820
{
819821
/* async result retrieval, do it the old way */
820-
PGresult *res = PQgetResult(conn);
822+
PGresult *res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
821823

822824
/* NULL means we're all done with the async results */
823825
if (res)
@@ -1131,7 +1133,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
11311133
PQclear(sinfo.last_res);
11321134
PQclear(sinfo.cur_res);
11331135
/* and clear out any pending data in libpq */
1134-
while ((res = PQgetResult(conn)) != NULL)
1136+
while ((res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION)) !=
1137+
NULL)
11351138
PQclear(res);
11361139
PG_RE_THROW();
11371140
}
@@ -1158,7 +1161,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
11581161
{
11591162
CHECK_FOR_INTERRUPTS();
11601163

1161-
sinfo->cur_res = PQgetResult(conn);
1164+
sinfo->cur_res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
11621165
if (!sinfo->cur_res)
11631166
break;
11641167

@@ -1486,7 +1489,7 @@ dblink_exec(PG_FUNCTION_ARGS)
14861489
if (!conn)
14871490
dblink_conn_not_avail(conname);
14881491

1489-
res = PQexec(conn, sql);
1492+
res = libpqsrv_exec(conn, sql, PG_WAIT_EXTENSION);
14901493
if (!res ||
14911494
(PQresultStatus(res) != PGRES_COMMAND_OK &&
14921495
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2771,8 +2774,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
27712774

27722775
/*
27732776
* If we don't get a message from the PGresult, try the PGconn. This is
2774-
* needed because for connection-level failures, PQexec may just return
2775-
* NULL, not a PGresult at all.
2777+
* needed because for connection-level failures, PQgetResult may just
2778+
* return NULL, not a PGresult at all.
27762779
*/
27772780
if (message_primary == NULL)
27782781
message_primary = pchomp(PQerrorMessage(conn));

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

+3-6
Original file line numberDiff line numberDiff line change
@@ -603,12 +603,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
603603
* Send a query and wait for the results by using the asynchronous libpq
604604
* functions and socket readiness events.
605605
*
606-
* We must not use the regular blocking libpq functions like PQexec()
607-
* since they are uninterruptible by signals on some platforms, such as
608-
* Windows.
609-
*
610-
* The function is modeled on PQexec() in libpq, but only implements
611-
* those parts that are in use in the walreceiver api.
606+
* The function is modeled on libpqsrv_exec(), with the behavior difference
607+
* being that it calls ProcessWalRcvInterrupts(). As an optimization, it
608+
* skips try/catch, since all errors terminate the process.
612609
*
613610
* May return NULL, rather than an error result, on failure.
614611
*/

src/include/libpq/libpq-be-fe-helpers.h

+127
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949

5050
static inline void libpqsrv_connect_prepare(void);
5151
static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
52+
static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
53+
static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
5254

5355

5456
/*
@@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
239241
PG_END_TRY();
240242
}
241243

244+
/*
245+
* PQexec() wrapper that processes interrupts.
246+
*
247+
* Unless PQsetnonblocking(conn, 1) is in effect, this can't process
248+
* interrupts while pushing the query text to the server. Consider that
249+
* setting if query strings can be long relative to TCP buffer size.
250+
*
251+
* This has the preconditions of PQsendQuery(), not those of PQexec(). Most
252+
* notably, PQexec() would silently discard any prior query results.
253+
*/
254+
static inline PGresult *
255+
libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
256+
{
257+
if (!PQsendQuery(conn, query))
258+
return NULL;
259+
return libpqsrv_get_result_last(conn, wait_event_info);
260+
}
261+
262+
/*
263+
* PQexecParams() wrapper that processes interrupts.
264+
*
265+
* See notes at libpqsrv_exec().
266+
*/
267+
static inline PGresult *
268+
libpqsrv_exec_params(PGconn *conn,
269+
const char *command,
270+
int nParams,
271+
const Oid *paramTypes,
272+
const char *const *paramValues,
273+
const int *paramLengths,
274+
const int *paramFormats,
275+
int resultFormat,
276+
uint32 wait_event_info)
277+
{
278+
if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
279+
paramLengths, paramFormats, resultFormat))
280+
return NULL;
281+
return libpqsrv_get_result_last(conn, wait_event_info);
282+
}
283+
284+
/*
285+
* Like PQexec(), loop over PQgetResult() until it returns NULL or another
286+
* terminal state. Return the last non-NULL result or the terminal state.
287+
*/
288+
static inline PGresult *
289+
libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
290+
{
291+
PGresult *volatile lastResult = NULL;
292+
293+
/* In what follows, do not leak any PGresults on an error. */
294+
PG_TRY();
295+
{
296+
for (;;)
297+
{
298+
/* Wait for, and collect, the next PGresult. */
299+
PGresult *result;
300+
301+
result = libpqsrv_get_result(conn, wait_event_info);
302+
if (result == NULL)
303+
break; /* query is complete, or failure */
304+
305+
/*
306+
* Emulate PQexec()'s behavior of returning the last result when
307+
* there are many.
308+
*/
309+
PQclear(lastResult);
310+
lastResult = result;
311+
312+
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
313+
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
314+
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
315+
PQstatus(conn) == CONNECTION_BAD)
316+
break;
317+
}
318+
}
319+
PG_CATCH();
320+
{
321+
PQclear(lastResult);
322+
PG_RE_THROW();
323+
}
324+
PG_END_TRY();
325+
326+
return lastResult;
327+
}
328+
329+
/*
330+
* Perform the equivalent of PQgetResult(), but watch for interrupts.
331+
*/
332+
static inline PGresult *
333+
libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
334+
{
335+
/*
336+
* Collect data until PQgetResult is ready to get the result without
337+
* blocking.
338+
*/
339+
while (PQisBusy(conn))
340+
{
341+
int rc;
342+
343+
rc = WaitLatchOrSocket(MyLatch,
344+
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
345+
WL_SOCKET_READABLE,
346+
PQsocket(conn),
347+
0,
348+
wait_event_info);
349+
350+
/* Interrupted? */
351+
if (rc & WL_LATCH_SET)
352+
{
353+
ResetLatch(MyLatch);
354+
CHECK_FOR_INTERRUPTS();
355+
}
356+
357+
/* Consume whatever data is available from the socket */
358+
if (PQconsumeInput(conn) == 0)
359+
{
360+
/* trouble; expect PQgetResult() to return NULL */
361+
break;
362+
}
363+
}
364+
365+
/* Now we can collect and return the next PGresult */
366+
return PQgetResult(conn);
367+
}
368+
242369
#endif /* LIBPQ_BE_FE_HELPERS_H */

0 commit comments

Comments
 (0)