Skip to content

Commit 92785da

Browse files
committed
Add a "row processor" API to libpq for better handling of large results.
Traditionally libpq has collected an entire query result before passing it back to the application. That provides a simple and transactional API, but it's pretty inefficient for large result sets. This patch allows the application to process each row on-the-fly instead of accumulating the rows into the PGresult. Error recovery becomes a bit more complex, but often that tradeoff is well worth making. Kyotaro Horiguchi, reviewed by Marko Kreen and Tom Lane
1 parent cb917e1 commit 92785da

File tree

10 files changed

+992
-202
lines changed

10 files changed

+992
-202
lines changed

doc/src/sgml/libpq.sgml

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5581,6 +5581,274 @@ defaultNoticeProcessor(void *arg, const char *message)
55815581

55825582
</sect1>
55835583

5584+
<sect1 id="libpq-row-processor">
5585+
<title>Custom Row Processing</title>
5586+
5587+
<indexterm zone="libpq-row-processor">
5588+
<primary>PQrowProcessor</primary>
5589+
</indexterm>
5590+
5591+
<indexterm zone="libpq-row-processor">
5592+
<primary>row processor</primary>
5593+
<secondary>in libpq</secondary>
5594+
</indexterm>
5595+
5596+
<para>
5597+
Ordinarily, when receiving a query result from the server,
5598+
<application>libpq</> adds each row value to the current
5599+
<type>PGresult</type> until the entire result set is received; then
5600+
the <type>PGresult</type> is returned to the application as a unit.
5601+
This approach is simple to work with, but becomes inefficient for large
5602+
result sets. To improve performance, an application can register a
5603+
custom <firstterm>row processor</> function that processes each row
5604+
as the data is received from the network. The custom row processor could
5605+
process the data fully, or store it into some application-specific data
5606+
structure for later processing.
5607+
</para>
5608+
5609+
<caution>
5610+
<para>
5611+
The row processor function sees the rows before it is known whether the
5612+
query will succeed overall, since the server might return some rows before
5613+
encountering an error. For proper transactional behavior, it must be
5614+
possible to discard or undo whatever the row processor has done, if the
5615+
query ultimately fails.
5616+
</para>
5617+
</caution>
5618+
5619+
<para>
5620+
When using a custom row processor, row data is not accumulated into the
5621+
<type>PGresult</type>, so the <type>PGresult</type> ultimately delivered to
5622+
the application will contain no rows (<function>PQntuples</> =
5623+
<literal>0</>). However, it still has <function>PQresultStatus</> =
5624+
<literal>PGRES_TUPLES_OK</>, and it contains correct information about the
5625+
set of columns in the query result. On the other hand, if the query fails
5626+
partway through, the returned <type>PGresult</type> has
5627+
<function>PQresultStatus</> = <literal>PGRES_FATAL_ERROR</>. The
5628+
application must be prepared to undo any actions of the row processor
5629+
whenever it gets a <literal>PGRES_FATAL_ERROR</> result.
5630+
</para>
5631+
5632+
<para>
5633+
A custom row processor is registered for a particular connection by
5634+
calling <function>PQsetRowProcessor</function>, described below.
5635+
This row processor will be used for all subsequent query results on that
5636+
connection until changed again. A row processor function must have a
5637+
signature matching
5638+
5639+
<synopsis>
5640+
typedef int (*PQrowProcessor) (PGresult *res, const PGdataValue *columns,
5641+
const char **errmsgp, void *param);
5642+
</synopsis>
5643+
where <type>PGdataValue</> is described by
5644+
<synopsis>
5645+
typedef struct pgDataValue
5646+
{
5647+
int len; /* data length in bytes, or <0 if NULL */
5648+
const char *value; /* data value, without zero-termination */
5649+
} PGdataValue;
5650+
</synopsis>
5651+
</para>
5652+
5653+
<para>
5654+
The <parameter>res</> parameter is the <literal>PGRES_TUPLES_OK</>
5655+
<type>PGresult</type> that will eventually be delivered to the calling
5656+
application (if no error intervenes). It contains information about
5657+
the set of columns in the query result, but no row data. In particular the
5658+
row processor must fetch <literal>PQnfields(res)</> to know the number of
5659+
data columns.
5660+
</para>
5661+
5662+
<para>
5663+
Immediately after <application>libpq</> has determined the result set's
5664+
column information, it will make a call to the row processor with
5665+
<parameter>columns</parameter> set to NULL, but the other parameters as
5666+
usual. The row processor can use this call to initialize for a new result
5667+
set; if it has nothing to do, it can just return <literal>1</>. In
5668+
subsequent calls, one per received row, <parameter>columns</parameter>
5669+
is non-NULL and points to an array of <type>PGdataValue</> structs, one per
5670+
data column.
5671+
</para>
5672+
5673+
<para>
5674+
<parameter>errmsgp</parameter> is an output parameter used only for error
5675+
reporting. If the row processor needs to report an error, it can set
5676+
<literal>*</><parameter>errmsgp</parameter> to point to a suitable message
5677+
string (and then return <literal>-1</>). As a special case, returning
5678+
<literal>-1</> without changing <literal>*</><parameter>errmsgp</parameter>
5679+
from its initial value of NULL is taken to mean <quote>out of memory</>.
5680+
</para>
5681+
5682+
<para>
5683+
The last parameter, <parameter>param</parameter>, is just a void pointer
5684+
passed through from <function>PQsetRowProcessor</function>. This can be
5685+
used for communication between the row processor function and the
5686+
surrounding application.
5687+
</para>
5688+
5689+
<para>
5690+
In the <type>PGdataValue</> array passed to a row processor, data values
5691+
cannot be assumed to be zero-terminated, whether the data format is text
5692+
or binary. A SQL NULL value is indicated by a negative length field.
5693+
</para>
5694+
5695+
<para>
5696+
The row processor <emphasis>must</> process the row data values
5697+
immediately, or else copy them into application-controlled storage.
5698+
The value pointers passed to the row processor point into
5699+
<application>libpq</>'s internal data input buffer, which will be
5700+
overwritten by the next packet fetch.
5701+
</para>
5702+
5703+
<para>
5704+
The row processor function must return either <literal>1</> or
5705+
<literal>-1</>.
5706+
<literal>1</> is the normal, successful result value; <application>libpq</>
5707+
will continue with receiving row values from the server and passing them to
5708+
the row processor. <literal>-1</> indicates that the row processor has
5709+
encountered an error. In that case,
5710+
<application>libpq</> will discard all remaining rows in the result set
5711+
and then return a <literal>PGRES_FATAL_ERROR</> <type>PGresult</type> to
5712+
the application (containing the specified error message, or <quote>out of
5713+
memory for query result</> if <literal>*</><parameter>errmsgp</parameter>
5714+
was left as NULL).
5715+
</para>
5716+
5717+
<para>
5718+
Another option for exiting a row processor is to throw an exception using
5719+
C's <function>longjmp()</> or C++'s <literal>throw</>. If this is done,
5720+
processing of the incoming data can be resumed later by calling
5721+
<function>PQgetResult</>; the row processor will be invoked as normal for
5722+
any remaining rows in the current result.
5723+
As with any usage of <function>PQgetResult</>, the application
5724+
should continue calling <function>PQgetResult</> until it gets a NULL
5725+
result before issuing any new query.
5726+
</para>
5727+
5728+
<para>
5729+
In some cases, an exception may mean that the remainder of the
5730+
query result is not interesting. In such cases the application can discard
5731+
the remaining rows with <function>PQskipResult</>, described below.
5732+
Another possible recovery option is to close the connection altogether with
5733+
<function>PQfinish</>.
5734+
</para>
5735+
5736+
<para>
5737+
<variablelist>
5738+
<varlistentry id="libpq-pqsetrowprocessor">
5739+
<term>
5740+
<function>PQsetRowProcessor</function>
5741+
<indexterm>
5742+
<primary>PQsetRowProcessor</primary>
5743+
</indexterm>
5744+
</term>
5745+
5746+
<listitem>
5747+
<para>
5748+
Sets a callback function to process each row.
5749+
5750+
<synopsis>
5751+
void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param);
5752+
</synopsis>
5753+
</para>
5754+
5755+
<para>
5756+
The specified row processor function <parameter>func</> is installed as
5757+
the active row processor for the given connection <parameter>conn</>.
5758+
Also, <parameter>param</> is installed as the passthrough pointer to
5759+
pass to it. Alternatively, if <parameter>func</> is NULL, the standard
5760+
row processor is reinstalled on the given connection (and
5761+
<parameter>param</> is ignored).
5762+
</para>
5763+
5764+
<para>
5765+
Although the row processor can be changed at any time in the life of a
5766+
connection, it's generally unwise to do so while a query is active.
5767+
In particular, when using asynchronous mode, be aware that both
5768+
<function>PQisBusy</> and <function>PQgetResult</> can call the current
5769+
row processor.
5770+
</para>
5771+
</listitem>
5772+
</varlistentry>
5773+
5774+
<varlistentry id="libpq-pqgetrowprocessor">
5775+
<term>
5776+
<function>PQgetRowProcessor</function>
5777+
<indexterm>
5778+
<primary>PQgetRowProcessor</primary>
5779+
</indexterm>
5780+
</term>
5781+
5782+
<listitem>
5783+
<para>
5784+
Fetches the current row processor for the specified connection.
5785+
5786+
<synopsis>
5787+
PQrowProcessor PQgetRowProcessor(const PGconn *conn, void **param);
5788+
</synopsis>
5789+
</para>
5790+
5791+
<para>
5792+
In addition to returning the row processor function pointer, the
5793+
current passthrough pointer will be returned at
5794+
<literal>*</><parameter>param</>, if <parameter>param</> is not NULL.
5795+
</para>
5796+
</listitem>
5797+
</varlistentry>
5798+
5799+
<varlistentry id="libpq-pqskipresult">
5800+
<term>
5801+
<function>PQskipResult</function>
5802+
<indexterm>
5803+
<primary>PQskipResult</primary>
5804+
</indexterm>
5805+
</term>
5806+
5807+
<listitem>
5808+
<para>
5809+
Discard all the remaining rows in the incoming result set.
5810+
5811+
<synopsis>
5812+
PGresult *PQskipResult(PGconn *conn);
5813+
</synopsis>
5814+
</para>
5815+
5816+
<para>
5817+
This is a simple convenience function to discard incoming data after a
5818+
row processor has failed or it's determined that the rest of the result
5819+
set is not interesting. <function>PQskipResult</> is exactly
5820+
equivalent to <function>PQgetResult</> except that it transiently
5821+
installs a dummy row processor function that just discards data.
5822+
The returned <type>PGresult</> can be discarded without further ado
5823+
if it has status <literal>PGRES_TUPLES_OK</>; but other status values
5824+
should be handled normally. (In particular,
5825+
<literal>PGRES_FATAL_ERROR</> indicates a server-reported error that
5826+
will still need to be dealt with.)
5827+
As when using <function>PQgetResult</>, one should usually repeat the
5828+
call until NULL is returned to ensure the connection has reached an
5829+
idle state. Another possible usage is to call
5830+
<function>PQskipResult</> just once, and then resume using
5831+
<function>PQgetResult</> to process subsequent result sets normally.
5832+
</para>
5833+
5834+
<para>
5835+
Because <function>PQskipResult</> will wait for server input, it is not
5836+
very useful in asynchronous applications. In particular you should not
5837+
code a loop of <function>PQisBusy</> and <function>PQskipResult</>,
5838+
because that will result in the installed row processor being called
5839+
within <function>PQisBusy</>. To get the proper behavior in an
5840+
asynchronous application, you'll need to install a dummy row processor
5841+
(or set a flag to make your normal row processor do nothing) and leave
5842+
it that way until you have discarded all incoming data via your normal
5843+
<function>PQisBusy</> and <function>PQgetResult</> loop.
5844+
</para>
5845+
</listitem>
5846+
</varlistentry>
5847+
</variablelist>
5848+
</para>
5849+
5850+
</sect1>
5851+
55845852
<sect1 id="libpq-events">
55855853
<title>Event System</title>
55865854

src/interfaces/libpq/exports.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,6 @@ PQconnectStartParams 157
160160
PQping 158
161161
PQpingParams 159
162162
PQlibVersion 160
163+
PQsetRowProcessor 161
164+
PQgetRowProcessor 162
165+
PQskipResult 163

src/interfaces/libpq/fe-connect.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2425,7 +2425,7 @@ PQconnectPoll(PGconn *conn)
24252425
conn->status = CONNECTION_AUTH_OK;
24262426

24272427
/*
2428-
* Set asyncStatus so that PQsetResult will think that
2428+
* Set asyncStatus so that PQgetResult will think that
24292429
* what comes back next is the result of a query. See
24302430
* below.
24312431
*/
@@ -2686,8 +2686,11 @@ makeEmptyPGconn(void)
26862686
/* Zero all pointers and booleans */
26872687
MemSet(conn, 0, sizeof(PGconn));
26882688

2689+
/* install default row processor and notice hooks */
2690+
PQsetRowProcessor(conn, NULL, NULL);
26892691
conn->noticeHooks.noticeRec = defaultNoticeReceiver;
26902692
conn->noticeHooks.noticeProc = defaultNoticeProcessor;
2693+
26912694
conn->status = CONNECTION_BAD;
26922695
conn->asyncStatus = PGASYNC_IDLE;
26932696
conn->xactStatus = PQTRANS_IDLE;
@@ -2721,11 +2724,14 @@ makeEmptyPGconn(void)
27212724
conn->inBuffer = (char *) malloc(conn->inBufSize);
27222725
conn->outBufSize = 16 * 1024;
27232726
conn->outBuffer = (char *) malloc(conn->outBufSize);
2727+
conn->rowBufLen = 32;
2728+
conn->rowBuf = (PGdataValue *) malloc(conn->rowBufLen * sizeof(PGdataValue));
27242729
initPQExpBuffer(&conn->errorMessage);
27252730
initPQExpBuffer(&conn->workBuffer);
27262731

27272732
if (conn->inBuffer == NULL ||
27282733
conn->outBuffer == NULL ||
2734+
conn->rowBuf == NULL ||
27292735
PQExpBufferBroken(&conn->errorMessage) ||
27302736
PQExpBufferBroken(&conn->workBuffer))
27312737
{
@@ -2829,6 +2835,8 @@ freePGconn(PGconn *conn)
28292835
free(conn->inBuffer);
28302836
if (conn->outBuffer)
28312837
free(conn->outBuffer);
2838+
if (conn->rowBuf)
2839+
free(conn->rowBuf);
28322840
termPQExpBuffer(&conn->errorMessage);
28332841
termPQExpBuffer(&conn->workBuffer);
28342842

@@ -2888,7 +2896,7 @@ closePGconn(PGconn *conn)
28882896
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just
28892897
* absent */
28902898
conn->asyncStatus = PGASYNC_IDLE;
2891-
pqClearAsyncResult(conn); /* deallocate result and curTuple */
2899+
pqClearAsyncResult(conn); /* deallocate result */
28922900
pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
28932901
conn->addrlist = NULL;
28942902
conn->addr_cur = NULL;

0 commit comments

Comments
 (0)