Skip to content

Commit 7c1f426

Browse files
alvherrehoriguti
andcommitted
libpq: Improve idle state handling in pipeline mode
We were going into IDLE state too soon when executing queries via PQsendQuery in pipeline mode, causing several scenarios to misbehave in different ways -- most notably, as reported by Daniele Varrazzo, that a warning message is produced by libpq: message type 0x33 arrived from server while idle But it is also possible, if queries are sent and results consumed not in lockstep, for the expected mediating NULL result values from PQgetResult to be lost (a problem which has not been reported, but which is more serious). Fix this by introducing two new concepts: one is a command queue element PGQUERY_CLOSE to tell libpq to wait for the CloseComplete server response to the Close message that is sent by PQsendQuery. Because the application is not expecting any PGresult from this, the mechanism to consume it is a bit hackish. The other concept, authored by Horiguchi-san, is a PGASYNC_PIPELINE_IDLE state for libpq's state machine to differentiate "really idle" from merely "the idle state that occurs in between reading results from the server for elements in the pipeline". This makes libpq not go fully IDLE when the libpq command queue contains entries; in normal cases, we only go IDLE once at the end of the pipeline, when the server response to the final SYNC message is received. (However, there are corner cases it doesn't fix, such as terminating the query sequence by PQsendFlushRequest instead of PQpipelineSync; this sort of scenario is what requires PGQUERY_CLOSE bit above.) This last bit helps make the libpq state machine clearer; in particular we can get rid of an ugly hack in pqParseInput3 to avoid considering IDLE as such when the command queue contains entries. A new test mode is added to libpq_pipeline.c to tickle some related problematic cases. Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com> Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
1 parent 0b71e43 commit 7c1f426

File tree

6 files changed

+425
-38
lines changed

6 files changed

+425
-38
lines changed

src/interfaces/libpq/fe-exec.c

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
12791279
* itself consume commands from the queue; if we're in any other
12801280
* state, we don't have to do anything.
12811281
*/
1282-
if (conn->asyncStatus == PGASYNC_IDLE)
1282+
if (conn->asyncStatus == PGASYNC_IDLE ||
1283+
conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
12831284
{
12841285
resetPQExpBuffer(&conn->errorMessage);
12851286
pqPipelineProcessQueue(conn);
@@ -1338,6 +1339,7 @@ static int
13381339
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
13391340
{
13401341
PGcmdQueueEntry *entry = NULL;
1342+
PGcmdQueueEntry *entry2 = NULL;
13411343

13421344
if (!PQsendQueryStart(conn, newQuery))
13431345
return 0;
@@ -1353,6 +1355,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
13531355
entry = pqAllocCmdQueueEntry(conn);
13541356
if (entry == NULL)
13551357
return 0; /* error msg already set */
1358+
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
1359+
{
1360+
entry2 = pqAllocCmdQueueEntry(conn);
1361+
if (entry2 == NULL)
1362+
goto sendFailed;
1363+
}
13561364

13571365
/* Send the query message(s) */
13581366
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1422,6 +1430,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
14221430

14231431
/* OK, it's launched! */
14241432
pqAppendCmdQueueEntry(conn, entry);
1433+
1434+
/*
1435+
* When pipeline mode is in use, we need a second entry in the command
1436+
* queue to represent Close Portal message. This allows us later to wait
1437+
* for the CloseComplete message to be received before getting in IDLE
1438+
* state.
1439+
*/
1440+
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
1441+
{
1442+
entry2->queryclass = PGQUERY_CLOSE;
1443+
entry2->query = NULL;
1444+
pqAppendCmdQueueEntry(conn, entry2);
1445+
}
1446+
14251447
return 1;
14261448

14271449
sendFailed:
@@ -1667,11 +1689,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
16671689
switch (conn->asyncStatus)
16681690
{
16691691
case PGASYNC_IDLE:
1692+
case PGASYNC_PIPELINE_IDLE:
16701693
case PGASYNC_READY:
16711694
case PGASYNC_READY_MORE:
16721695
case PGASYNC_BUSY:
16731696
/* ok to queue */
16741697
break;
1698+
16751699
case PGASYNC_COPY_IN:
16761700
case PGASYNC_COPY_OUT:
16771701
case PGASYNC_COPY_BOTH:
@@ -2047,19 +2071,22 @@ PQgetResult(PGconn *conn)
20472071
{
20482072
case PGASYNC_IDLE:
20492073
res = NULL; /* query is complete */
2050-
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
2051-
{
2052-
/*
2053-
* We're about to return the NULL that terminates the round of
2054-
* results from the current query; prepare to send the results
2055-
* of the next query when we're called next. Also, since this
2056-
* is the start of the results of the next query, clear any
2057-
* prior error message.
2058-
*/
2059-
resetPQExpBuffer(&conn->errorMessage);
2060-
pqPipelineProcessQueue(conn);
2061-
}
20622074
break;
2075+
case PGASYNC_PIPELINE_IDLE:
2076+
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
2077+
2078+
/*
2079+
* We're about to return the NULL that terminates the round of
2080+
* results from the current query; prepare to send the results
2081+
* of the next query, if any, when we're called next. If there's
2082+
* no next element in the command queue, this gets us in IDLE
2083+
* state.
2084+
*/
2085+
resetPQExpBuffer(&conn->errorMessage);
2086+
pqPipelineProcessQueue(conn);
2087+
res = NULL; /* query is complete */
2088+
break;
2089+
20632090
case PGASYNC_READY:
20642091

20652092
/*
@@ -2080,7 +2107,7 @@ PQgetResult(PGconn *conn)
20802107
* We're about to send the results of the current query. Set
20812108
* us idle now, and ...
20822109
*/
2083-
conn->asyncStatus = PGASYNC_IDLE;
2110+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
20842111

20852112
/*
20862113
* ... in cases when we're sending a pipeline-sync result,
@@ -2124,6 +2151,22 @@ PQgetResult(PGconn *conn)
21242151
break;
21252152
}
21262153

2154+
/* If the next command we expect is CLOSE, read and consume it */
2155+
if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
2156+
conn->cmd_queue_head &&
2157+
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
2158+
{
2159+
if (res && res->resultStatus != PGRES_FATAL_ERROR)
2160+
{
2161+
conn->asyncStatus = PGASYNC_BUSY;
2162+
parseInput(conn);
2163+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
2164+
}
2165+
else
2166+
/* we won't ever see the Close */
2167+
pqCommandQueueAdvance(conn);
2168+
}
2169+
21272170
if (res)
21282171
{
21292172
int i;
@@ -2932,7 +2975,10 @@ PQexitPipelineMode(PGconn *conn)
29322975
if (!conn)
29332976
return 0;
29342977

2935-
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
2978+
if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
2979+
(conn->asyncStatus == PGASYNC_IDLE ||
2980+
conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
2981+
conn->cmd_queue_head == NULL)
29362982
return 1;
29372983

29382984
switch (conn->asyncStatus)
@@ -2949,9 +2995,16 @@ PQexitPipelineMode(PGconn *conn)
29492995
libpq_gettext("cannot exit pipeline mode while busy\n"));
29502996
return 0;
29512997

2952-
default:
2998+
case PGASYNC_IDLE:
2999+
case PGASYNC_PIPELINE_IDLE:
29533000
/* OK */
29543001
break;
3002+
3003+
case PGASYNC_COPY_IN:
3004+
case PGASYNC_COPY_OUT:
3005+
case PGASYNC_COPY_BOTH:
3006+
appendPQExpBufferStr(&conn->errorMessage,
3007+
libpq_gettext("cannot exit pipeline mode while in COPY\n"));
29553008
}
29563009

29573010
/* still work to process */
@@ -2988,6 +3041,10 @@ pqCommandQueueAdvance(PGconn *conn)
29883041
prevquery = conn->cmd_queue_head;
29893042
conn->cmd_queue_head = conn->cmd_queue_head->next;
29903043

3044+
/* If the queue is now empty, reset the tail too */
3045+
if (conn->cmd_queue_head == NULL)
3046+
conn->cmd_queue_tail = NULL;
3047+
29913048
/* and make it recyclable */
29923049
prevquery->next = NULL;
29933050
pqRecycleCmdQueueEntry(conn, prevquery);
@@ -3010,15 +3067,35 @@ pqPipelineProcessQueue(PGconn *conn)
30103067
case PGASYNC_BUSY:
30113068
/* client still has to process current query or results */
30123069
return;
3070+
30133071
case PGASYNC_IDLE:
3072+
/*
3073+
* If we're in IDLE mode and there's some command in the queue,
3074+
* get us into PIPELINE_IDLE mode and process normally. Otherwise
3075+
* there's nothing for us to do.
3076+
*/
3077+
if (conn->cmd_queue_head != NULL)
3078+
{
3079+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
3080+
break;
3081+
}
3082+
return;
3083+
3084+
case PGASYNC_PIPELINE_IDLE:
3085+
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
30143086
/* next query please */
30153087
break;
30163088
}
30173089

3018-
/* Nothing to do if not in pipeline mode, or queue is empty */
3019-
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
3020-
conn->cmd_queue_head == NULL)
3090+
/*
3091+
* If there are no further commands to process in the queue, get us in
3092+
* "real idle" mode now.
3093+
*/
3094+
if (conn->cmd_queue_head == NULL)
3095+
{
3096+
conn->asyncStatus = PGASYNC_IDLE;
30213097
return;
3098+
}
30223099

30233100
/* Initialize async result-accumulation state */
30243101
pqClearAsyncResult(conn);
@@ -3105,6 +3182,7 @@ PQpipelineSync(PGconn *conn)
31053182
case PGASYNC_READY_MORE:
31063183
case PGASYNC_BUSY:
31073184
case PGASYNC_IDLE:
3185+
case PGASYNC_PIPELINE_IDLE:
31083186
/* OK to send sync */
31093187
break;
31103188
}

src/interfaces/libpq/fe-protocol3.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
158158
if (conn->asyncStatus != PGASYNC_IDLE)
159159
return;
160160

161-
/*
162-
* We're also notionally not-IDLE when in pipeline mode the state
163-
* says "idle" (so we have completed receiving the results of one
164-
* query from the server and dispatched them to the application)
165-
* but another query is queued; yield back control to caller so
166-
* that they can initiate processing of the next query in the
167-
* queue.
168-
*/
169-
if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
170-
conn->cmd_queue_head != NULL)
171-
return;
172-
173161
/*
174162
* Unexpected message in IDLE state; need to recover somehow.
175163
* ERROR messages are handled using the notice processor;
@@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn)
296284
}
297285
break;
298286
case '2': /* Bind Complete */
287+
/* Nothing to do for this message type */
288+
break;
299289
case '3': /* Close Complete */
300-
/* Nothing to do for these message types */
290+
/*
291+
* If we get CloseComplete when waiting for it, consume
292+
* the queue element and keep going. A result is not
293+
* expected from this message; it is just there so that
294+
* we know to wait for it when PQsendQuery is used in
295+
* pipeline mode, before going in IDLE state. Failing to
296+
* do this makes us receive CloseComplete when IDLE, which
297+
* creates problems.
298+
*/
299+
if (conn->cmd_queue_head &&
300+
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
301+
{
302+
pqCommandQueueAdvance(conn);
303+
}
304+
301305
break;
302306
case 'S': /* parameter status */
303307
if (getParameterStatus(conn))

src/interfaces/libpq/libpq-int.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ typedef enum
224224
* query */
225225
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
226226
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
227-
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
227+
PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
228+
PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */
228229
} PGAsyncStatusType;
229230

230231
/* Target server type (decoded value of target_session_attrs) */
@@ -310,7 +311,8 @@ typedef enum
310311
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
311312
PGQUERY_PREPARE, /* Parse only (PQprepare) */
312313
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
313-
PGQUERY_SYNC /* Sync (at end of a pipeline) */
314+
PGQUERY_SYNC, /* Sync (at end of a pipeline) */
315+
PGQUERY_CLOSE
314316
} PGQueryClass;
315317

316318
/*

0 commit comments

Comments
 (0)