Skip to content

Commit 9aa491a

Browse files
committed
Add libpq pipeline mode support to pgbench
New metacommands \startpipeline and \endpipeline allow the user to run queries in libpq pipeline mode. Author: Daniel Vérité <daniel@manitou-mail.org> Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org> Discussion: https://postgr.es/m/b4e34135-2bd9-4b8a-94ca-27d760da26d7@manitou-mail.org
1 parent acb7e4e commit 9aa491a

File tree

3 files changed

+216
-16
lines changed

3 files changed

+216
-16
lines changed

doc/src/sgml/ref/pgbench.sgml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,12 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
11101110
row, the last value is kept.
11111111
</para>
11121112

1113+
<para>
1114+
<literal>\gset</literal> and <literal>\aset</literal> cannot be used in
1115+
pipeline mode, since the query results are not yet available by the time
1116+
the commands would need them.
1117+
</para>
1118+
11131119
<para>
11141120
The following example puts the final account balance from the first query
11151121
into variable <replaceable>abalance</replaceable>, and fills variables
@@ -1270,6 +1276,22 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
12701276
</programlisting></para>
12711277
</listitem>
12721278
</varlistentry>
1279+
1280+
<varlistentry id='pgbench-metacommand-pipeline'>
1281+
<term><literal>\startpipeline</literal></term>
1282+
<term><literal>\endpipeline</literal></term>
1283+
1284+
<listitem>
1285+
<para>
1286+
These commands delimit the start and end of a pipeline of SQL
1287+
statements. In pipeline mode, statements are sent to the server
1288+
without waiting for the results of previous statements. See
1289+
<xref linkend="libpq-pipeline-mode"/> for more details.
1290+
Pipeline mode requires the use of extended query protocol.
1291+
</para>
1292+
</listitem>
1293+
</varlistentry>
1294+
12731295
</variablelist>
12741296
</refsect2>
12751297

src/bin/pgbench/pgbench.c

Lines changed: 116 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -395,10 +395,11 @@ typedef enum
395395
*
396396
* CSTATE_START_COMMAND starts the execution of a command. On a SQL
397397
* command, the command is sent to the server, and we move to
398-
* CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
399-
* and we enter the CSTATE_SLEEP state to wait for it to expire. Other
400-
* meta-commands are executed immediately. If the command about to start
401-
* is actually beyond the end of the script, advance to CSTATE_END_TX.
398+
* CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
399+
* meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
400+
* wait for it to expire. Other meta-commands are executed immediately. If
401+
* the command about to start is actually beyond the end of the script,
402+
* advance to CSTATE_END_TX.
402403
*
403404
* CSTATE_WAIT_RESULT waits until we get a result set back from the server
404405
* for the current command.
@@ -530,7 +531,9 @@ typedef enum MetaCommand
530531
META_IF, /* \if */
531532
META_ELIF, /* \elif */
532533
META_ELSE, /* \else */
533-
META_ENDIF /* \endif */
534+
META_ENDIF, /* \endif */
535+
META_STARTPIPELINE, /* \startpipeline */
536+
META_ENDPIPELINE /* \endpipeline */
534537
} MetaCommand;
535538

536539
typedef enum QueryMode
@@ -2568,6 +2571,10 @@ getMetaCommand(const char *cmd)
25682571
mc = META_GSET;
25692572
else if (pg_strcasecmp(cmd, "aset") == 0)
25702573
mc = META_ASET;
2574+
else if (pg_strcasecmp(cmd, "startpipeline") == 0)
2575+
mc = META_STARTPIPELINE;
2576+
else if (pg_strcasecmp(cmd, "endpipeline") == 0)
2577+
mc = META_ENDPIPELINE;
25712578
else
25722579
mc = META_NONE;
25732580
return mc;
@@ -2757,11 +2764,25 @@ sendCommand(CState *st, Command *command)
27572764
if (commands[j]->type != SQL_COMMAND)
27582765
continue;
27592766
preparedStatementName(name, st->use_file, j);
2760-
res = PQprepare(st->con, name,
2761-
commands[j]->argv[0], commands[j]->argc - 1, NULL);
2762-
if (PQresultStatus(res) != PGRES_COMMAND_OK)
2763-
pg_log_error("%s", PQerrorMessage(st->con));
2764-
PQclear(res);
2767+
if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
2768+
{
2769+
res = PQprepare(st->con, name,
2770+
commands[j]->argv[0], commands[j]->argc - 1, NULL);
2771+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
2772+
pg_log_error("%s", PQerrorMessage(st->con));
2773+
PQclear(res);
2774+
}
2775+
else
2776+
{
2777+
/*
2778+
* In pipeline mode, we use asynchronous functions. If a
2779+
* server-side error occurs, it will be processed later
2780+
* among the other results.
2781+
*/
2782+
if (!PQsendPrepare(st->con, name,
2783+
commands[j]->argv[0], commands[j]->argc - 1, NULL))
2784+
pg_log_error("%s", PQerrorMessage(st->con));
2785+
}
27652786
}
27662787
st->prepared[st->use_file] = true;
27672788
}
@@ -2802,10 +2823,11 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
28022823
int qrynum = 0;
28032824

28042825
/*
2805-
* varprefix should be set only with \gset or \aset, and SQL commands do
2806-
* not need it.
2826+
* varprefix should be set only with \gset or \aset, and \endpipeline and
2827+
* SQL commands do not need it.
28072828
*/
28082829
Assert((meta == META_NONE && varprefix == NULL) ||
2830+
((meta == META_ENDPIPELINE) && varprefix == NULL) ||
28092831
((meta == META_GSET || meta == META_ASET) && varprefix != NULL));
28102832

28112833
res = PQgetResult(st->con);
@@ -2874,6 +2896,13 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
28742896
/* otherwise the result is simply thrown away by PQclear below */
28752897
break;
28762898

2899+
case PGRES_PIPELINE_SYNC:
2900+
pg_log_debug("client %d pipeline ending", st->id);
2901+
if (PQexitPipelineMode(st->con) != 1)
2902+
pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
2903+
PQerrorMessage(st->con));
2904+
break;
2905+
28772906
default:
28782907
/* anything else is unexpected */
28792908
pg_log_error("client %d script %d aborted in command %d query %d: %s",
@@ -3127,13 +3156,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
31273156
/* Execute the command */
31283157
if (command->type == SQL_COMMAND)
31293158
{
3159+
/* disallow \aset and \gset in pipeline mode */
3160+
if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
3161+
{
3162+
if (command->meta == META_GSET)
3163+
{
3164+
commandFailed(st, "gset", "\\gset is not allowed in pipeline mode");
3165+
st->state = CSTATE_ABORTED;
3166+
break;
3167+
}
3168+
else if (command->meta == META_ASET)
3169+
{
3170+
commandFailed(st, "aset", "\\aset is not allowed in pipeline mode");
3171+
st->state = CSTATE_ABORTED;
3172+
break;
3173+
}
3174+
}
3175+
31303176
if (!sendCommand(st, command))
31313177
{
31323178
commandFailed(st, "SQL", "SQL command send failed");
31333179
st->state = CSTATE_ABORTED;
31343180
}
31353181
else
3136-
st->state = CSTATE_WAIT_RESULT;
3182+
{
3183+
/* Wait for results, unless in pipeline mode */
3184+
if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
3185+
st->state = CSTATE_WAIT_RESULT;
3186+
else
3187+
st->state = CSTATE_END_COMMAND;
3188+
}
31373189
}
31383190
else if (command->type == META_COMMAND)
31393191
{
@@ -3273,7 +3325,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
32733325
if (readCommandResponse(st,
32743326
sql_script[st->use_file].commands[st->command]->meta,
32753327
sql_script[st->use_file].commands[st->command]->varprefix))
3276-
st->state = CSTATE_END_COMMAND;
3328+
{
3329+
/*
3330+
* outside of pipeline mode: stop reading results.
3331+
* pipeline mode: continue reading results until an
3332+
* end-of-pipeline response.
3333+
*/
3334+
if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
3335+
st->state = CSTATE_END_COMMAND;
3336+
}
32773337
else
32783338
st->state = CSTATE_ABORTED;
32793339
break;
@@ -3516,6 +3576,45 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
35163576
return CSTATE_ABORTED;
35173577
}
35183578
}
3579+
else if (command->meta == META_STARTPIPELINE)
3580+
{
3581+
/*
3582+
* In pipeline mode, we use a workflow based on libpq pipeline
3583+
* functions.
3584+
*/
3585+
if (querymode == QUERY_SIMPLE)
3586+
{
3587+
commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol");
3588+
return CSTATE_ABORTED;
3589+
}
3590+
3591+
if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
3592+
{
3593+
commandFailed(st, "startpipeline", "already in pipeline mode");
3594+
return CSTATE_ABORTED;
3595+
}
3596+
if (PQenterPipelineMode(st->con) == 0)
3597+
{
3598+
commandFailed(st, "startpipeline", "failed to enter pipeline mode");
3599+
return CSTATE_ABORTED;
3600+
}
3601+
}
3602+
else if (command->meta == META_ENDPIPELINE)
3603+
{
3604+
if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
3605+
{
3606+
commandFailed(st, "endpipeline", "not in pipeline mode");
3607+
return CSTATE_ABORTED;
3608+
}
3609+
if (!PQpipelineSync(st->con))
3610+
{
3611+
commandFailed(st, "endpipeline", "failed to send a pipeline sync");
3612+
return CSTATE_ABORTED;
3613+
}
3614+
/* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
3615+
/* collect pending results before getting out of pipeline mode */
3616+
return CSTATE_WAIT_RESULT;
3617+
}
35193618

35203619
/*
35213620
* executing the expression or shell command might have taken a
@@ -4725,7 +4824,9 @@ process_backslash_command(PsqlScanState sstate, const char *source)
47254824
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
47264825
"missing command", NULL, -1);
47274826
}
4728-
else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
4827+
else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
4828+
my_command->meta == META_STARTPIPELINE ||
4829+
my_command->meta == META_ENDPIPELINE)
47294830
{
47304831
if (my_command->argc != 1)
47314832
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],

src/bin/pgbench/t/001_pgbench_with_server.pl

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ sub pgbench
4141
# filenames are expected to be unique on a test
4242
if (-e $filename)
4343
{
44-
ok(0, "$filename must not already exists");
44+
ok(0, "$filename must not already exist");
4545
unlink $filename or die "cannot unlink $filename: $!";
4646
}
4747
append_to_file($filename, $$files{$fn});
@@ -755,6 +755,83 @@ sub pgbench
755755
}
756756
});
757757

758+
# Working \startpipeline
759+
pgbench(
760+
'-t 1 -n -M extended',
761+
0,
762+
[ qr{type: .*/001_pgbench_pipeline}, qr{actually processed: 1/1} ],
763+
[],
764+
'working \startpipeline',
765+
{
766+
'001_pgbench_pipeline' => q{
767+
-- test startpipeline
768+
\startpipeline
769+
} . "select 1;\n" x 10 . q{
770+
\endpipeline
771+
}
772+
});
773+
774+
# Working \startpipeline in prepared query mode
775+
pgbench(
776+
'-t 1 -n -M prepared',
777+
0,
778+
[ qr{type: .*/001_pgbench_pipeline_prep}, qr{actually processed: 1/1} ],
779+
[],
780+
'working \startpipeline',
781+
{
782+
'001_pgbench_pipeline_prep' => q{
783+
-- test startpipeline
784+
\startpipeline
785+
} . "select 1;\n" x 10 . q{
786+
\endpipeline
787+
}
788+
});
789+
790+
# Try \startpipeline twice
791+
pgbench(
792+
'-t 1 -n -M extended',
793+
2,
794+
[],
795+
[qr{already in pipeline mode}],
796+
'error: call \startpipeline twice',
797+
{
798+
'001_pgbench_pipeline_2' => q{
799+
-- startpipeline twice
800+
\startpipeline
801+
\startpipeline
802+
}
803+
});
804+
805+
# Try to end a pipeline that hasn't started
806+
pgbench(
807+
'-t 1 -n -M extended',
808+
2,
809+
[],
810+
[qr{not in pipeline mode}],
811+
'error: \endpipeline with no start',
812+
{
813+
'001_pgbench_pipeline_3' => q{
814+
-- pipeline not started
815+
\endpipeline
816+
}
817+
});
818+
819+
# Try \gset in pipeline mode
820+
pgbench(
821+
'-t 1 -n -M extended',
822+
2,
823+
[],
824+
[qr{gset is not allowed in pipeline mode}],
825+
'error: \gset not allowed in pipeline mode',
826+
{
827+
'001_pgbench_pipeline_4' => q{
828+
\startpipeline
829+
select 1 \gset f
830+
\endpipeline
831+
}
832+
});
833+
834+
758835
# trigger many expression errors
759836
my @errors = (
760837

0 commit comments

Comments
 (0)