Skip to content

Commit acb7e4e

Browse files
committed
Implement pipeline mode in libpq
Pipeline mode in libpq lets an application avoid the Sync messages in the FE/BE protocol that are implicit in the old libpq API after each query. The application can then insert Sync at its leisure with a new libpq function PQpipelineSync. This can lead to substantial reductions in query latency. Co-authored-by: Craig Ringer <craig.ringer@enterprisedb.com> Co-authored-by: Matthieu Garrigues <matthieu.garrigues@gmail.com> Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Aya Iwata <iwata.aya@jp.fujitsu.com> Reviewed-by: Daniel Vérité <daniel@manitou-mail.org> Reviewed-by: David G. Johnston <david.g.johnston@gmail.com> Reviewed-by: Justin Pryzby <pryzby@telsasoft.com> Reviewed-by: Kirk Jamison <k.jamison@fujitsu.com> Reviewed-by: Michael Paquier <michael.paquier@gmail.com> Reviewed-by: Nikhil Sontakke <nikhils@2ndquadrant.com> Reviewed-by: Vaishnavi Prabakaran <VaishnaviP@fast.au.fujitsu.com> Reviewed-by: Zhihong Yu <zyu@yugabyte.com> Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com Discussion: https://postgr.es/m/CAJkzx4T5E-2cQe3dtv2R78dYFvz+in8PY7A8MArvLhs_pg75gg@mail.gmail.com
1 parent 146cb38 commit acb7e4e

File tree

18 files changed

+2706
-113
lines changed

18 files changed

+2706
-113
lines changed

doc/src/sgml/libpq.sgml

Lines changed: 520 additions & 2 deletions
Large diffs are not rendered by default.

doc/src/sgml/lobj.sgml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@
130130
<application>libpq</application> library.
131131
</para>
132132

133+
<para>
134+
Client applications cannot use these functions while a libpq connection is in pipeline mode.
135+
</para>
136+
133137
<sect2 id="lo-create">
134138
<title>Creating a Large Object</title>
135139

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,6 +1019,12 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
10191019
walres->err = _("empty query");
10201020
break;
10211021

1022+
case PGRES_PIPELINE_SYNC:
1023+
case PGRES_PIPELINE_ABORTED:
1024+
walres->status = WALRCV_ERROR;
1025+
walres->err = _("unexpected pipeline mode");
1026+
break;
1027+
10221028
case PGRES_NONFATAL_ERROR:
10231029
case PGRES_FATAL_ERROR:
10241030
case PGRES_BAD_RESPONSE:

src/bin/pg_amcheck/pg_amcheck.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,6 +929,8 @@ should_processing_continue(PGresult *res)
929929
case PGRES_COPY_IN:
930930
case PGRES_COPY_BOTH:
931931
case PGRES_SINGLE_TUPLE:
932+
case PGRES_PIPELINE_SYNC:
933+
case PGRES_PIPELINE_ABORTED:
932934
return false;
933935
}
934936
return true;

src/interfaces/libpq/exports.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,3 +179,7 @@ PQgetgssctx 176
179179
PQsetSSLKeyPassHook_OpenSSL 177
180180
PQgetSSLKeyPassHook_OpenSSL 178
181181
PQdefaultSSLKeyPassHook_OpenSSL 179
182+
PQenterPipelineMode 180
183+
PQexitPipelineMode 181
184+
PQpipelineSync 182
185+
PQpipelineStatus 183

src/interfaces/libpq/fe-connect.c

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
522522
}
523523
}
524524

525+
/*
526+
* pqFreeCommandQueue
527+
* Free all the entries of PGcmdQueueEntry queue passed.
528+
*/
529+
static void
530+
pqFreeCommandQueue(PGcmdQueueEntry *queue)
531+
{
532+
while (queue != NULL)
533+
{
534+
PGcmdQueueEntry *cur = queue;
535+
536+
queue = cur->next;
537+
if (cur->query)
538+
free(cur->query);
539+
free(cur);
540+
}
541+
}
525542

526543
/*
527544
* pqDropServerData
@@ -553,6 +570,12 @@ pqDropServerData(PGconn *conn)
553570
}
554571
conn->notifyHead = conn->notifyTail = NULL;
555572

573+
pqFreeCommandQueue(conn->cmd_queue_head);
574+
conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
575+
576+
pqFreeCommandQueue(conn->cmd_queue_recycle);
577+
conn->cmd_queue_recycle = NULL;
578+
556579
/* Reset ParameterStatus data, as well as variables deduced from it */
557580
pstatus = conn->pstatus;
558581
while (pstatus != NULL)
@@ -2459,6 +2482,7 @@ PQconnectPoll(PGconn *conn)
24592482
/* Drop any PGresult we might have, too */
24602483
conn->asyncStatus = PGASYNC_IDLE;
24612484
conn->xactStatus = PQTRANS_IDLE;
2485+
conn->pipelineStatus = PQ_PIPELINE_OFF;
24622486
pqClearAsyncResult(conn);
24632487

24642488
/* Reset conn->status to put the state machine in the right state */
@@ -3917,6 +3941,7 @@ makeEmptyPGconn(void)
39173941

39183942
conn->status = CONNECTION_BAD;
39193943
conn->asyncStatus = PGASYNC_IDLE;
3944+
conn->pipelineStatus = PQ_PIPELINE_OFF;
39203945
conn->xactStatus = PQTRANS_IDLE;
39213946
conn->options_valid = false;
39223947
conn->nonblocking = false;
@@ -4084,8 +4109,6 @@ freePGconn(PGconn *conn)
40844109
if (conn->connip)
40854110
free(conn->connip);
40864111
/* Note that conn->Pfdebug is not ours to close or free */
4087-
if (conn->last_query)
4088-
free(conn->last_query);
40894112
if (conn->write_err_msg)
40904113
free(conn->write_err_msg);
40914114
if (conn->inBuffer)
@@ -4174,6 +4197,7 @@ closePGconn(PGconn *conn)
41744197
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just absent */
41754198
conn->asyncStatus = PGASYNC_IDLE;
41764199
conn->xactStatus = PQTRANS_IDLE;
4200+
conn->pipelineStatus = PQ_PIPELINE_OFF;
41774201
pqClearAsyncResult(conn); /* deallocate result */
41784202
resetPQExpBuffer(&conn->errorMessage);
41794203
release_conn_addrinfo(conn);
@@ -6726,6 +6750,15 @@ PQbackendPID(const PGconn *conn)
67266750
return conn->be_pid;
67276751
}
67286752

6753+
PGpipelineStatus
6754+
PQpipelineStatus(const PGconn *conn)
6755+
{
6756+
if (!conn)
6757+
return PQ_PIPELINE_OFF;
6758+
6759+
return conn->pipelineStatus;
6760+
}
6761+
67296762
int
67306763
PQconnectionNeedsPassword(const PGconn *conn)
67316764
{

0 commit comments

Comments
 (0)