Skip to content

Commit ab09679

Browse files
committed
libpq: Fix sending queries in pipeline aborted state
When sending queries in pipeline mode, we were careless about leaving the connection in the right state so that PQgetResult would behave correctly; trying to read further results after sending a query after having read a result with an error would sometimes hang. Fix by ensuring internal libpq state is changed properly. All the state changes were being done by the callers of pqAppendCmdQueueEntry(); it would have become too repetitious to have this logic in each of them, so instead put it all in that function and relieve callers of the responsibility. Add a test to verify this case. Without the code fix, this new test hangs sometimes. Also, document that PQisBusy() would return false when no queries are pending result. This is not intuitively obvious, and NULL would be obtained by calling PQgetResult() at that point, which is confusing. Wording by Boris Kolpackov. In passing, fix bogus use of "false" to mean "0", per Ranier Vilela. Backpatch to 14. Author: Álvaro Herrera <alvherre@alvh.no-ip.org> Reported-by: Boris Kolpackov <boris@codesynthesis.com> Discussion: https://postgr.es/m/boris.20210624103805@codesynthesis.com
1 parent 8e7811e commit ab09679

File tree

3 files changed

+274
-14
lines changed

3 files changed

+274
-14
lines changed

doc/src/sgml/libpq.sgml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5171,7 +5171,10 @@ int PQflush(PGconn *conn);
51715171

51725172
<para>
51735173
<function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
5174-
operate as normal when processing pipeline results.
5174+
operate as normal when processing pipeline results. In particular,
5175+
a call to <function>PQisBusy</function> in the middle of a pipeline
5176+
returns 0 if the results for all the queries issued so far have been
5177+
consumed.
51755178
</para>
51765179

51775180
<para>

src/interfaces/libpq/fe-exec.c

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn)
12231223

12241224
/*
12251225
* pqAppendCmdQueueEntry
1226-
* Append a caller-allocated command queue entry to the queue.
1226+
* Append a caller-allocated entry to the command queue, and update
1227+
* conn->asyncStatus to account for it.
12271228
*
12281229
* The query itself must already have been put in the output buffer by the
12291230
* caller.
@@ -1239,6 +1240,38 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
12391240
conn->cmd_queue_tail->next = entry;
12401241

12411242
conn->cmd_queue_tail = entry;
1243+
1244+
switch (conn->pipelineStatus)
1245+
{
1246+
case PQ_PIPELINE_OFF:
1247+
case PQ_PIPELINE_ON:
1248+
1249+
/*
1250+
* When not in pipeline aborted state, if there's a result ready
1251+
* to be consumed, let it be so (that is, don't change away from
1252+
* READY or READY_MORE); otherwise set us busy to wait for
1253+
* something to arrive from the server.
1254+
*/
1255+
if (conn->asyncStatus == PGASYNC_IDLE)
1256+
conn->asyncStatus = PGASYNC_BUSY;
1257+
break;
1258+
1259+
case PQ_PIPELINE_ABORTED:
1260+
1261+
/*
1262+
* In aborted pipeline state, we don't expect anything from the
1263+
* server (since we don't send any queries that are queued).
1264+
* Therefore, if IDLE then do what PQgetResult would do to let
1265+
* itself consume commands from the queue; if we're in any other
1266+
* state, we don't have to do anything.
1267+
*/
1268+
if (conn->asyncStatus == PGASYNC_IDLE)
1269+
{
1270+
resetPQExpBuffer(&conn->errorMessage);
1271+
pqPipelineProcessQueue(conn);
1272+
}
1273+
break;
1274+
}
12421275
}
12431276

12441277
/*
@@ -1375,7 +1408,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
13751408

13761409
/* OK, it's launched! */
13771410
pqAppendCmdQueueEntry(conn, entry);
1378-
conn->asyncStatus = PGASYNC_BUSY;
13791411
return 1;
13801412

13811413
sendFailed:
@@ -1510,10 +1542,6 @@ PQsendPrepare(PGconn *conn,
15101542
/* if insufficient memory, query just winds up NULL */
15111543
entry->query = strdup(query);
15121544

1513-
pqAppendCmdQueueEntry(conn, entry);
1514-
1515-
conn->asyncStatus = PGASYNC_BUSY;
1516-
15171545
/*
15181546
* Give the data a push (in pipeline mode, only if we're past the size
15191547
* threshold). In nonblock mode, don't complain if we're unable to send
@@ -1522,6 +1550,9 @@ PQsendPrepare(PGconn *conn,
15221550
if (pqPipelineFlush(conn) < 0)
15231551
goto sendFailed;
15241552

1553+
/* OK, it's launched! */
1554+
pqAppendCmdQueueEntry(conn, entry);
1555+
15251556
return 1;
15261557

15271558
sendFailed:
@@ -1815,7 +1846,7 @@ PQsendQueryGuts(PGconn *conn,
18151846

18161847
/* OK, it's launched! */
18171848
pqAppendCmdQueueEntry(conn, entry);
1818-
conn->asyncStatus = PGASYNC_BUSY;
1849+
18191850
return 1;
18201851

18211852
sendFailed:
@@ -2445,7 +2476,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
24452476

24462477
/* OK, it's launched! */
24472478
pqAppendCmdQueueEntry(conn, entry);
2448-
conn->asyncStatus = PGASYNC_BUSY;
2479+
24492480
return 1;
24502481

24512482
sendFailed:
@@ -2948,7 +2979,7 @@ pqCommandQueueAdvance(PGconn *conn)
29482979
* pqPipelineProcessQueue: subroutine for PQgetResult
29492980
* In pipeline mode, start processing the results of the next query in the queue.
29502981
*/
2951-
void
2982+
static void
29522983
pqPipelineProcessQueue(PGconn *conn)
29532984
{
29542985
switch (conn->asyncStatus)
@@ -3072,15 +3103,15 @@ PQpipelineSync(PGconn *conn)
30723103
pqPutMsgEnd(conn) < 0)
30733104
goto sendFailed;
30743105

3075-
pqAppendCmdQueueEntry(conn, entry);
3076-
30773106
/*
30783107
* Give the data a push. In nonblock mode, don't complain if we're unable
30793108
* to send it all; PQgetResult() will do any additional flushing needed.
30803109
*/
30813110
if (PQflush(conn) < 0)
30823111
goto sendFailed;
3083-
conn->asyncStatus = PGASYNC_BUSY;
3112+
3113+
/* OK, it's launched! */
3114+
pqAppendCmdQueueEntry(conn, entry);
30843115

30853116
return 1;
30863117

@@ -3115,7 +3146,7 @@ PQsendFlushRequest(PGconn *conn)
31153146
{
31163147
appendPQExpBufferStr(&conn->errorMessage,
31173148
libpq_gettext("another command is already in progress\n"));
3118-
return false;
3149+
return 0;
31193150
}
31203151

31213152
if (pqPutMsgStart('H', conn) < 0 ||

src/test/modules/libpq_pipeline/libpq_pipeline.c

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929

3030
static void exit_nicely(PGconn *conn);
31+
static bool process_result(PGconn *conn, PGresult *res, int results,
32+
int numsent);
3133

3234
const char *const progname = "libpq_pipeline";
3335

@@ -1307,6 +1309,227 @@ test_transaction(PGconn *conn)
13071309
fprintf(stderr, "ok\n");
13081310
}
13091311

1312+
/*
1313+
* In this test mode we send a stream of queries, with one in the middle
1314+
* causing an error. Verify that we can still send some more after the
1315+
* error and have libpq work properly.
1316+
*/
1317+
static void
1318+
test_uniqviol(PGconn *conn)
1319+
{
1320+
int sock = PQsocket(conn);
1321+
PGresult *res;
1322+
Oid paramTypes[2] = {INT8OID, INT8OID};
1323+
const char *paramValues[2];
1324+
char paramValue0[MAXINT8LEN];
1325+
char paramValue1[MAXINT8LEN];
1326+
int ctr = 0;
1327+
int numsent = 0;
1328+
int results = 0;
1329+
bool read_done = false;
1330+
bool write_done = false;
1331+
bool error_sent = false;
1332+
bool got_error = false;
1333+
int switched = 0;
1334+
int socketful = 0;
1335+
fd_set in_fds;
1336+
fd_set out_fds;
1337+
1338+
fprintf(stderr, "uniqviol ...");
1339+
1340+
PQsetnonblocking(conn, 1);
1341+
1342+
paramValues[0] = paramValue0;
1343+
paramValues[1] = paramValue1;
1344+
sprintf(paramValue1, "42");
1345+
1346+
res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1347+
"create table ppln_uniqviol(id bigint primary key, idata bigint)");
1348+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
1349+
pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1350+
1351+
res = PQexec(conn, "begin");
1352+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
1353+
pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1354+
1355+
res = PQprepare(conn, "insertion",
1356+
"insert into ppln_uniqviol values ($1, $2) returning id",
1357+
2, paramTypes);
1358+
if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1359+
pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1360+
1361+
if (PQenterPipelineMode(conn) != 1)
1362+
pg_fatal("failed to enter pipeline mode");
1363+
1364+
while (!read_done)
1365+
{
1366+
/*
1367+
* Avoid deadlocks by reading everything the server has sent before
1368+
* sending anything. (Special precaution is needed here to process
1369+
* PQisBusy before testing the socket for read-readiness, because the
1370+
* socket does not turn read-ready after "sending" queries in aborted
1371+
* pipeline mode.)
1372+
*/
1373+
while (PQisBusy(conn) == 0)
1374+
{
1375+
bool new_error;
1376+
1377+
if (results >= numsent)
1378+
{
1379+
if (write_done)
1380+
read_done = true;
1381+
break;
1382+
}
1383+
1384+
res = PQgetResult(conn);
1385+
new_error = process_result(conn, res, results, numsent);
1386+
if (new_error && got_error)
1387+
pg_fatal("got two errors");
1388+
got_error |= new_error;
1389+
if (results++ >= numsent - 1)
1390+
{
1391+
if (write_done)
1392+
read_done = true;
1393+
break;
1394+
}
1395+
}
1396+
1397+
if (read_done)
1398+
break;
1399+
1400+
FD_ZERO(&out_fds);
1401+
FD_SET(sock, &out_fds);
1402+
1403+
FD_ZERO(&in_fds);
1404+
FD_SET(sock, &in_fds);
1405+
1406+
if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1407+
{
1408+
if (errno == EINTR)
1409+
continue;
1410+
pg_fatal("select() failed: %m");
1411+
}
1412+
1413+
if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1414+
pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1415+
1416+
/*
1417+
* If the socket is writable and we haven't finished sending queries,
1418+
* send some.
1419+
*/
1420+
if (!write_done && FD_ISSET(sock, &out_fds))
1421+
{
1422+
for (;;)
1423+
{
1424+
int flush;
1425+
1426+
/*
1427+
* provoke uniqueness violation exactly once after having
1428+
* switched to read mode.
1429+
*/
1430+
if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1431+
{
1432+
sprintf(paramValue0, "%d", numsent / 2);
1433+
fprintf(stderr, "E");
1434+
error_sent = true;
1435+
}
1436+
else
1437+
{
1438+
fprintf(stderr, ".");
1439+
sprintf(paramValue0, "%d", ctr++);
1440+
}
1441+
1442+
if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1443+
pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1444+
numsent++;
1445+
1446+
/* Are we done writing? */
1447+
if (socketful != 0 && numsent % socketful == 42 && error_sent)
1448+
{
1449+
if (PQsendFlushRequest(conn) != 1)
1450+
pg_fatal("failed to send flush request");
1451+
write_done = true;
1452+
fprintf(stderr, "\ndone writing\n");
1453+
PQflush(conn);
1454+
break;
1455+
}
1456+
1457+
/* is the outgoing socket full? */
1458+
flush = PQflush(conn);
1459+
if (flush == -1)
1460+
pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1461+
if (flush == 1)
1462+
{
1463+
if (socketful == 0)
1464+
socketful = numsent;
1465+
fprintf(stderr, "\nswitch to reading\n");
1466+
switched++;
1467+
break;
1468+
}
1469+
}
1470+
}
1471+
}
1472+
1473+
if (!got_error)
1474+
pg_fatal("did not get expected error");
1475+
1476+
fprintf(stderr, "ok\n");
1477+
}
1478+
1479+
/*
1480+
* Subroutine for test_uniqviol; given a PGresult, print it out and consume
1481+
* the expected NULL that should follow it.
1482+
*
1483+
* Returns true if we read a fatal error message, otherwise false.
1484+
*/
1485+
static bool
1486+
process_result(PGconn *conn, PGresult *res, int results, int numsent)
1487+
{
1488+
PGresult *res2;
1489+
bool got_error = false;
1490+
1491+
if (res == NULL)
1492+
pg_fatal("got unexpected NULL");
1493+
1494+
switch (PQresultStatus(res))
1495+
{
1496+
case PGRES_FATAL_ERROR:
1497+
got_error = true;
1498+
fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
1499+
PQclear(res);
1500+
1501+
res2 = PQgetResult(conn);
1502+
if (res2 != NULL)
1503+
pg_fatal("expected NULL, got %s",
1504+
PQresStatus(PQresultStatus(res2)));
1505+
break;
1506+
1507+
case PGRES_TUPLES_OK:
1508+
fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
1509+
PQclear(res);
1510+
1511+
res2 = PQgetResult(conn);
1512+
if (res2 != NULL)
1513+
pg_fatal("expected NULL, got %s",
1514+
PQresStatus(PQresultStatus(res2)));
1515+
break;
1516+
1517+
case PGRES_PIPELINE_ABORTED:
1518+
fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
1519+
res2 = PQgetResult(conn);
1520+
if (res2 != NULL)
1521+
pg_fatal("expected NULL, got %s",
1522+
PQresStatus(PQresultStatus(res2)));
1523+
break;
1524+
1525+
default:
1526+
pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
1527+
}
1528+
1529+
return got_error;
1530+
}
1531+
1532+
13101533
static void
13111534
usage(const char *progname)
13121535
{
@@ -1331,6 +1554,7 @@ print_test_list(void)
13311554
printf("simple_pipeline\n");
13321555
printf("singlerow\n");
13331556
printf("transaction\n");
1557+
printf("uniqviol\n");
13341558
}
13351559

13361560
int
@@ -1436,6 +1660,8 @@ main(int argc, char **argv)
14361660
test_singlerowmode(conn);
14371661
else if (strcmp(testname, "transaction") == 0)
14381662
test_transaction(conn);
1663+
else if (strcmp(testname, "uniqviol") == 0)
1664+
test_uniqviol(conn);
14391665
else
14401666
{
14411667
fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);

0 commit comments

Comments
 (0)