Skip to content

Commit 054325c

Browse files
alvherrehoriguti
andcommitted
libpq: Improve idle state handling in pipeline mode
We were going into IDLE state too soon when executing queries via PQsendQuery in pipeline mode, causing several scenarios to misbehave in different ways -- most notably, as reported by Daniele Varrazzo, that a warning message is produced by libpq: message type 0x33 arrived from server while idle But it is also possible, if queries are sent and results consumed not in lockstep, for the expected mediating NULL result values from PQgetResult to be lost (a problem which has not been reported, but which is more serious). Fix this by introducing two new concepts: one is a command queue element PGQUERY_CLOSE to tell libpq to wait for the CloseComplete server response to the Close message that is sent by PQsendQuery. Because the application is not expecting any PGresult from this, the mechanism to consume it is a bit hackish. The other concept, authored by Horiguchi-san, is a PGASYNC_PIPELINE_IDLE state for libpq's state machine to differentiate "really idle" from merely "the idle state that occurs in between reading results from the server for elements in the pipeline". This makes libpq not go fully IDLE when the libpq command queue contains entries; in normal cases, we only go IDLE once at the end of the pipeline, when the server response to the final SYNC message is received. (However, there are corner cases it doesn't fix, such as terminating the query sequence by PQsendFlushRequest instead of PQpipelineSync; this sort of scenario is what requires PGQUERY_CLOSE bit above.) This last bit helps make the libpq state machine clearer; in particular we can get rid of an ugly hack in pqParseInput3 to avoid considering IDLE as such when the command queue contains entries. A new test mode is added to libpq_pipeline.c to tickle some related problematic cases. Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com> Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
1 parent 5001b44 commit 054325c

File tree

6 files changed

+424
-35
lines changed

6 files changed

+424
-35
lines changed

src/interfaces/libpq/fe-exec.c

Lines changed: 96 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
13761376
* itself consume commands from the queue; if we're in any other
13771377
* state, we don't have to do anything.
13781378
*/
1379-
if (conn->asyncStatus == PGASYNC_IDLE)
1379+
if (conn->asyncStatus == PGASYNC_IDLE ||
1380+
conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
13801381
pqPipelineProcessQueue(conn);
13811382
break;
13821383
}
@@ -1432,6 +1433,7 @@ static int
14321433
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
14331434
{
14341435
PGcmdQueueEntry *entry = NULL;
1436+
PGcmdQueueEntry *entry2 = NULL;
14351437

14361438
if (!PQsendQueryStart(conn, newQuery))
14371439
return 0;
@@ -1447,6 +1449,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
14471449
entry = pqAllocCmdQueueEntry(conn);
14481450
if (entry == NULL)
14491451
return 0; /* error msg already set */
1452+
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
1453+
{
1454+
entry2 = pqAllocCmdQueueEntry(conn);
1455+
if (entry2 == NULL)
1456+
goto sendFailed;
1457+
}
14501458

14511459
/* Send the query message(s) */
14521460
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1516,6 +1524,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
15161524

15171525
/* OK, it's launched! */
15181526
pqAppendCmdQueueEntry(conn, entry);
1527+
1528+
/*
1529+
* When pipeline mode is in use, we need a second entry in the command
1530+
* queue to represent Close Portal message. This allows us later to wait
1531+
* for the CloseComplete message to be received before getting in IDLE
1532+
* state.
1533+
*/
1534+
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
1535+
{
1536+
entry2->queryclass = PGQUERY_CLOSE;
1537+
entry2->query = NULL;
1538+
pqAppendCmdQueueEntry(conn, entry2);
1539+
}
1540+
15191541
return 1;
15201542

15211543
sendFailed:
@@ -1763,11 +1785,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
17631785
switch (conn->asyncStatus)
17641786
{
17651787
case PGASYNC_IDLE:
1788+
case PGASYNC_PIPELINE_IDLE:
17661789
case PGASYNC_READY:
17671790
case PGASYNC_READY_MORE:
17681791
case PGASYNC_BUSY:
17691792
/* ok to queue */
17701793
break;
1794+
17711795
case PGASYNC_COPY_IN:
17721796
case PGASYNC_COPY_OUT:
17731797
case PGASYNC_COPY_BOTH:
@@ -2140,16 +2164,21 @@ PQgetResult(PGconn *conn)
21402164
{
21412165
case PGASYNC_IDLE:
21422166
res = NULL; /* query is complete */
2143-
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
2144-
{
2145-
/*
2146-
* We're about to return the NULL that terminates the round of
2147-
* results from the current query; prepare to send the results
2148-
* of the next query when we're called next.
2149-
*/
2150-
pqPipelineProcessQueue(conn);
2151-
}
21522167
break;
2168+
case PGASYNC_PIPELINE_IDLE:
2169+
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
2170+
2171+
/*
2172+
* We're about to return the NULL that terminates the round of
2173+
* results from the current query; prepare to send the results
2174+
* of the next query, if any, when we're called next. If there's
2175+
* no next element in the command queue, this gets us in IDLE
2176+
* state.
2177+
*/
2178+
pqPipelineProcessQueue(conn);
2179+
res = NULL; /* query is complete */
2180+
break;
2181+
21532182
case PGASYNC_READY:
21542183

21552184
/*
@@ -2170,7 +2199,7 @@ PQgetResult(PGconn *conn)
21702199
* We're about to send the results of the current query. Set
21712200
* us idle now, and ...
21722201
*/
2173-
conn->asyncStatus = PGASYNC_IDLE;
2202+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
21742203

21752204
/*
21762205
* ... in cases when we're sending a pipeline-sync result,
@@ -2216,6 +2245,22 @@ PQgetResult(PGconn *conn)
22162245
break;
22172246
}
22182247

2248+
/* If the next command we expect is CLOSE, read and consume it */
2249+
if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
2250+
conn->cmd_queue_head &&
2251+
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
2252+
{
2253+
if (res && res->resultStatus != PGRES_FATAL_ERROR)
2254+
{
2255+
conn->asyncStatus = PGASYNC_BUSY;
2256+
parseInput(conn);
2257+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
2258+
}
2259+
else
2260+
/* we won't ever see the Close */
2261+
pqCommandQueueAdvance(conn);
2262+
}
2263+
22192264
/* Time to fire PGEVT_RESULTCREATE events, if there are any */
22202265
if (res && res->nEvents > 0)
22212266
(void) PQfireResultCreateEvents(conn, res);
@@ -3009,7 +3054,10 @@ PQexitPipelineMode(PGconn *conn)
30093054
if (!conn)
30103055
return 0;
30113056

3012-
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
3057+
if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
3058+
(conn->asyncStatus == PGASYNC_IDLE ||
3059+
conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
3060+
conn->cmd_queue_head == NULL)
30133061
return 1;
30143062

30153063
switch (conn->asyncStatus)
@@ -3026,9 +3074,16 @@ PQexitPipelineMode(PGconn *conn)
30263074
libpq_gettext("cannot exit pipeline mode while busy\n"));
30273075
return 0;
30283076

3029-
default:
3077+
case PGASYNC_IDLE:
3078+
case PGASYNC_PIPELINE_IDLE:
30303079
/* OK */
30313080
break;
3081+
3082+
case PGASYNC_COPY_IN:
3083+
case PGASYNC_COPY_OUT:
3084+
case PGASYNC_COPY_BOTH:
3085+
appendPQExpBufferStr(&conn->errorMessage,
3086+
libpq_gettext("cannot exit pipeline mode while in COPY\n"));
30323087
}
30333088

30343089
/* still work to process */
@@ -3065,6 +3120,10 @@ pqCommandQueueAdvance(PGconn *conn)
30653120
prevquery = conn->cmd_queue_head;
30663121
conn->cmd_queue_head = conn->cmd_queue_head->next;
30673122

3123+
/* If the queue is now empty, reset the tail too */
3124+
if (conn->cmd_queue_head == NULL)
3125+
conn->cmd_queue_tail = NULL;
3126+
30683127
/* and make it recyclable */
30693128
prevquery->next = NULL;
30703129
pqRecycleCmdQueueEntry(conn, prevquery);
@@ -3087,15 +3146,35 @@ pqPipelineProcessQueue(PGconn *conn)
30873146
case PGASYNC_BUSY:
30883147
/* client still has to process current query or results */
30893148
return;
3149+
30903150
case PGASYNC_IDLE:
3151+
/*
3152+
* If we're in IDLE mode and there's some command in the queue,
3153+
* get us into PIPELINE_IDLE mode and process normally. Otherwise
3154+
* there's nothing for us to do.
3155+
*/
3156+
if (conn->cmd_queue_head != NULL)
3157+
{
3158+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
3159+
break;
3160+
}
3161+
return;
3162+
3163+
case PGASYNC_PIPELINE_IDLE:
3164+
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
30913165
/* next query please */
30923166
break;
30933167
}
30943168

3095-
/* Nothing to do if not in pipeline mode, or queue is empty */
3096-
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
3097-
conn->cmd_queue_head == NULL)
3169+
/*
3170+
* If there are no further commands to process in the queue, get us in
3171+
* "real idle" mode now.
3172+
*/
3173+
if (conn->cmd_queue_head == NULL)
3174+
{
3175+
conn->asyncStatus = PGASYNC_IDLE;
30983176
return;
3177+
}
30993178

31003179
/*
31013180
* Reset the error state. This and the next couple of steps correspond to
@@ -3188,6 +3267,7 @@ PQpipelineSync(PGconn *conn)
31883267
case PGASYNC_READY_MORE:
31893268
case PGASYNC_BUSY:
31903269
case PGASYNC_IDLE:
3270+
case PGASYNC_PIPELINE_IDLE:
31913271
/* OK to send sync */
31923272
break;
31933273
}

src/interfaces/libpq/fe-protocol3.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
158158
if (conn->asyncStatus != PGASYNC_IDLE)
159159
return;
160160

161-
/*
162-
* We're also notionally not-IDLE when in pipeline mode the state
163-
* says "idle" (so we have completed receiving the results of one
164-
* query from the server and dispatched them to the application)
165-
* but another query is queued; yield back control to caller so
166-
* that they can initiate processing of the next query in the
167-
* queue.
168-
*/
169-
if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
170-
conn->cmd_queue_head != NULL)
171-
return;
172-
173161
/*
174162
* Unexpected message in IDLE state; need to recover somehow.
175163
* ERROR messages are handled using the notice processor;
@@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn)
296284
}
297285
break;
298286
case '2': /* Bind Complete */
287+
/* Nothing to do for this message type */
288+
break;
299289
case '3': /* Close Complete */
300-
/* Nothing to do for these message types */
290+
/*
291+
* If we get CloseComplete when waiting for it, consume
292+
* the queue element and keep going. A result is not
293+
* expected from this message; it is just there so that
294+
* we know to wait for it when PQsendQuery is used in
295+
* pipeline mode, before going in IDLE state. Failing to
296+
* do this makes us receive CloseComplete when IDLE, which
297+
* creates problems.
298+
*/
299+
if (conn->cmd_queue_head &&
300+
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
301+
{
302+
pqCommandQueueAdvance(conn);
303+
}
304+
301305
break;
302306
case 'S': /* parameter status */
303307
if (getParameterStatus(conn))

src/interfaces/libpq/libpq-int.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ typedef enum
225225
* query */
226226
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
227227
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
228-
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
228+
PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
229+
PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */
229230
} PGAsyncStatusType;
230231

231232
/* Target server type (decoded value of target_session_attrs) */
@@ -311,7 +312,8 @@ typedef enum
311312
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
312313
PGQUERY_PREPARE, /* Parse only (PQprepare) */
313314
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
314-
PGQUERY_SYNC /* Sync (at end of a pipeline) */
315+
PGQUERY_SYNC, /* Sync (at end of a pipeline) */
316+
PGQUERY_CLOSE
315317
} PGQueryClass;
316318

317319
/*

0 commit comments

Comments
 (0)