Skip to content

Commit 99090e9

Browse files
committed
Fix low-probability leaks of PGresult objects in the backend.
We had three occurrences of essentially the same coding pattern wherein we tried to retrieve a query result from a libpq connection without blocking. In the case where PQconsumeInput failed (typically indicating a lost connection), all three loops simply gave up and returned, forgetting to clear any previously-collected PGresult object. Since those are malloc'd not palloc'd, the oversight results in a process-lifespan memory leak. One instance, in libpqwalreceiver, is of little significance because the walreceiver process would just quit anyway if its connection fails. But we might as well fix it. The other two instances, in postgres_fdw, are somewhat more worrisome because at least in principle the scenario could be repeated, allowing the amount of memory leaked to build up to something worth worrying about. Moreover, in these cases the loops contain CHECK_FOR_INTERRUPTS calls, as well as other calls that could potentially elog(ERROR), providing another way to exit without having cleared the PGresult. Here we need to add PG_TRY logic similar to what exists in quite a few other places in postgres_fdw. Coverity noted the libpqwalreceiver bug; I found the other two cases by checking all calls of PQconsumeInput. Back-patch to all supported versions as appropriate (9.2 lacks postgres_fdw, so this is really quite unexciting for that branch). Discussion: https://postgr.es/m/22620.1497486981@sss.pgh.pa.us
1 parent ae58c15 commit 99090e9

File tree

2 files changed

+102
-66
lines changed

2 files changed

+102
-66
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 92 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -498,48 +498,58 @@ pgfdw_exec_query(PGconn *conn, const char *query)
498498
*
499499
* This function offers quick responsiveness by checking for any interruptions.
500500
*
501-
* This function emulates the PQexec()'s behavior of returning the last result
501+
* This function emulates PQexec()'s behavior of returning the last result
502502
* when there are many.
503503
*
504504
* Caller is responsible for the error handling on the result.
505505
*/
506506
PGresult *
507507
pgfdw_get_result(PGconn *conn, const char *query)
508508
{
509-
PGresult *last_res = NULL;
509+
PGresult *volatile last_res = NULL;
510510

511-
for (;;)
511+
/* In what follows, do not leak any PGresults on an error. */
512+
PG_TRY();
512513
{
513-
PGresult *res;
514-
515-
while (PQisBusy(conn))
514+
for (;;)
516515
{
517-
int wc;
516+
PGresult *res;
518517

519-
/* Sleep until there's something to do */
520-
wc = WaitLatchOrSocket(&MyProc->procLatch,
521-
WL_LATCH_SET | WL_SOCKET_READABLE,
522-
PQsocket(conn),
523-
-1L);
524-
ResetLatch(&MyProc->procLatch);
518+
while (PQisBusy(conn))
519+
{
520+
int wc;
525521

526-
CHECK_FOR_INTERRUPTS();
522+
/* Sleep until there's something to do */
523+
wc = WaitLatchOrSocket(&MyProc->procLatch,
524+
WL_LATCH_SET | WL_SOCKET_READABLE,
525+
PQsocket(conn),
526+
-1L);
527+
ResetLatch(&MyProc->procLatch);
527528

528-
/* Data available in socket */
529-
if (wc & WL_SOCKET_READABLE)
530-
{
531-
if (!PQconsumeInput(conn))
532-
pgfdw_report_error(ERROR, NULL, conn, false, query);
529+
CHECK_FOR_INTERRUPTS();
530+
531+
/* Data available in socket? */
532+
if (wc & WL_SOCKET_READABLE)
533+
{
534+
if (!PQconsumeInput(conn))
535+
pgfdw_report_error(ERROR, NULL, conn, false, query);
536+
}
533537
}
534-
}
535538

536-
res = PQgetResult(conn);
537-
if (res == NULL)
538-
break; /* query is complete */
539+
res = PQgetResult(conn);
540+
if (res == NULL)
541+
break; /* query is complete */
539542

543+
PQclear(last_res);
544+
last_res = res;
545+
}
546+
}
547+
PG_CATCH();
548+
{
540549
PQclear(last_res);
541-
last_res = res;
550+
PG_RE_THROW();
542551
}
552+
PG_END_TRY();
543553

544554
return last_res;
545555
}
@@ -1007,6 +1017,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
10071017
pgfdw_report_error(WARNING, result, conn, true, query);
10081018
return ignore_errors;
10091019
}
1020+
PQclear(result);
10101021

10111022
return true;
10121023
}
@@ -1029,56 +1040,75 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
10291040
static bool
10301041
pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
10311042
{
1032-
PGresult *last_res = NULL;
1043+
volatile bool timed_out = false;
1044+
PGresult *volatile last_res = NULL;
10331045

1034-
for (;;)
1046+
/* In what follows, do not leak any PGresults on an error. */
1047+
PG_TRY();
10351048
{
1036-
PGresult *res;
1037-
1038-
while (PQisBusy(conn))
1049+
for (;;)
10391050
{
1040-
int wc;
1041-
TimestampTz now = GetCurrentTimestamp();
1042-
long secs;
1043-
int microsecs;
1044-
long cur_timeout;
1045-
1046-
/* If timeout has expired, give up, else get sleep time. */
1047-
if (now >= endtime)
1048-
return true;
1049-
TimestampDifference(now, endtime, &secs, &microsecs);
1050-
1051-
/* To protect against clock skew, limit sleep to one minute. */
1052-
cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
1053-
1054-
/* Sleep until there's something to do */
1055-
wc = WaitLatchOrSocket(&MyProc->procLatch,
1051+
PGresult *res;
1052+
1053+
while (PQisBusy(conn))
1054+
{
1055+
int wc;
1056+
TimestampTz now = GetCurrentTimestamp();
1057+
long secs;
1058+
int microsecs;
1059+
long cur_timeout;
1060+
1061+
/* If timeout has expired, give up, else get sleep time. */
1062+
if (now >= endtime)
1063+
{
1064+
timed_out = true;
1065+
goto exit;
1066+
}
1067+
TimestampDifference(now, endtime, &secs, &microsecs);
1068+
1069+
/* To protect against clock skew, limit sleep to one minute. */
1070+
cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
1071+
1072+
/* Sleep until there's something to do */
1073+
wc = WaitLatchOrSocket(&MyProc->procLatch,
10561074
WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT,
1057-
PQsocket(conn),
1058-
cur_timeout);
1059-
ResetLatch(&MyProc->procLatch);
1075+
PQsocket(conn),
1076+
cur_timeout);
1077+
ResetLatch(&MyProc->procLatch);
10601078

1061-
CHECK_FOR_INTERRUPTS();
1079+
CHECK_FOR_INTERRUPTS();
10621080

1063-
/* Data available in socket */
1064-
if (wc & WL_SOCKET_READABLE)
1065-
{
1066-
if (!PQconsumeInput(conn))
1081+
/* Data available in socket? */
1082+
if (wc & WL_SOCKET_READABLE)
10671083
{
1068-
*result = NULL;
1069-
return false;
1084+
if (!PQconsumeInput(conn))
1085+
{
1086+
/* connection trouble; treat the same as a timeout */
1087+
timed_out = true;
1088+
goto exit;
1089+
}
10701090
}
10711091
}
1072-
}
10731092

1074-
res = PQgetResult(conn);
1075-
if (res == NULL)
1076-
break; /* query is complete */
1093+
res = PQgetResult(conn);
1094+
if (res == NULL)
1095+
break; /* query is complete */
10771096

1097+
PQclear(last_res);
1098+
last_res = res;
1099+
}
1100+
exit: ;
1101+
}
1102+
PG_CATCH();
1103+
{
10781104
PQclear(last_res);
1079-
last_res = res;
1105+
PG_RE_THROW();
10801106
}
1107+
PG_END_TRY();
10811108

1082-
*result = last_res;
1083-
return false;
1109+
if (timed_out)
1110+
PQclear(last_res);
1111+
else
1112+
*result = last_res;
1113+
return timed_out;
10841114
}

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -410,14 +410,20 @@ libpqrcv_PQexec(const char *query)
410410
*/
411411
if (!libpq_select(-1))
412412
continue; /* interrupted */
413+
414+
/* Consume whatever data is available from the socket */
413415
if (PQconsumeInput(streamConn) == 0)
414-
return NULL; /* trouble */
416+
{
417+
/* trouble; drop whatever we had and return NULL */
418+
PQclear(lastResult);
419+
return NULL;
420+
}
415421
}
416422

417423
/*
418-
* Emulate the PQexec()'s behavior of returning the last result when
419-
* there are many. Since walsender will never generate multiple
420-
* results, we skip the concatenation of error messages.
424+
* Emulate PQexec()'s behavior of returning the last result when there
425+
* are many. Since walsender will never generate multiple results, we
426+
* skip the concatenation of error messages.
421427
*/
422428
result = PQgetResult(streamConn);
423429
if (result == NULL)

0 commit comments

Comments
 (0)