Skip to content

Commit 108a22b

Browse files
committed
pgbench: Prepare commands in pipelines in advance
Failing to do so results in an error when a pgbench script tries to start a serializable transaction inside a pipeline, because by the time BEGIN ISOLATION LEVEL SERIALIZABLE is executed, we're already in a transaction that has acquired a snapshot, so the server rightfully complains. We can work around that by preparing all commands in the pipeline before actually starting the pipeline. This changes the existing code in two aspects: first, we now prepare each command individually at the point where that command is about to be executed; previously, we would prepare all commands in a script as soon as the first command of that script would be executed. It's hard to see that this would make much of a difference (particularly since it only affects the first time to execute each script in a client), but I didn't actually try to measure it. Secondly, we no longer use PQsendPrepare() in pipeline mode, but only PQprepare. There's no specific reason for this change other than no longer needing to do differently in pipeline mode. (Previously we had no choice, because in pipeline mode PQprepare could not be used.) Backpatch to 14, where pgbench got support for pipeline mode. Reported-by: Yugo NAGATA <nagata@sraoss.co.jp> Discussion: https://postgr.es/m/20210716153013.fc53b1c780b06fccc07a7f0d@sraoss.co.jp
1 parent ded5ede commit 108a22b

File tree

2 files changed

+127
-52
lines changed

2 files changed

+127
-52
lines changed

src/bin/pgbench/pgbench.c

Lines changed: 107 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,8 @@ typedef struct
631631
pg_time_usec_t txn_begin; /* used for measuring schedule lag times */
632632
pg_time_usec_t stmt_begin; /* used for measuring statement latencies */
633633

634-
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
634+
/* whether client prepared each command of each script */
635+
bool **prepared;
635636

636637
/*
637638
* For processing failures and repeating transactions with serialization
@@ -736,7 +737,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
736737
* argv Command arguments, the first of which is the command or SQL
737738
* string itself. For SQL commands, after post-processing
738739
* argv[0] is the same as 'lines' with variables substituted.
739-
* varprefix SQL commands terminated with \gset or \aset have this set
740+
* prepname The name that this command is prepared under, in prepare mode
741+
* varprefix SQL commands terminated with \gset or \aset have this set
740742
* to a non NULL value. If nonempty, it's used to prefix the
741743
* variable name that receives the value.
742744
* aset do gset on all possible queries of a combined query (\;).
@@ -754,6 +756,7 @@ typedef struct Command
754756
MetaCommand meta;
755757
int argc;
756758
char *argv[MAX_ARGS];
759+
char *prepname;
757760
char *varprefix;
758761
PgBenchExpr *expr;
759762
SimpleStats stats;
@@ -3027,13 +3030,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
30273030
return true;
30283031
}
30293032

3030-
#define MAX_PREPARE_NAME 32
3031-
static void
3032-
preparedStatementName(char *buffer, int file, int state)
3033-
{
3034-
sprintf(buffer, "P%d_%d", file, state);
3035-
}
3036-
30373033
/*
30383034
* Report the abortion of the client when processing SQL commands.
30393035
*/
@@ -3074,6 +3070,87 @@ chooseScript(TState *thread)
30743070
return i - 1;
30753071
}
30763072

3073+
/*
3074+
* Prepare the SQL command from st->use_file at command_num.
3075+
*/
3076+
static void
3077+
prepareCommand(CState *st, int command_num)
3078+
{
3079+
Command *command = sql_script[st->use_file].commands[command_num];
3080+
3081+
/* No prepare for non-SQL commands */
3082+
if (command->type != SQL_COMMAND)
3083+
return;
3084+
3085+
/*
3086+
* If not already done, allocate space for 'prepared' flags: one boolean
3087+
* for each command of each script.
3088+
*/
3089+
if (!st->prepared)
3090+
{
3091+
st->prepared = pg_malloc(sizeof(bool *) * num_scripts);
3092+
for (int i = 0; i < num_scripts; i++)
3093+
{
3094+
ParsedScript *script = &sql_script[i];
3095+
int numcmds;
3096+
3097+
for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++)
3098+
;
3099+
st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds);
3100+
}
3101+
}
3102+
3103+
if (!st->prepared[st->use_file][command_num])
3104+
{
3105+
PGresult *res;
3106+
3107+
pg_log_debug("client %d preparing %s", st->id, command->prepname);
3108+
res = PQprepare(st->con, command->prepname,
3109+
command->argv[0], command->argc - 1, NULL);
3110+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
3111+
pg_log_error("%s", PQerrorMessage(st->con));
3112+
PQclear(res);
3113+
st->prepared[st->use_file][command_num] = true;
3114+
}
3115+
}
3116+
3117+
/*
3118+
* Prepare all the commands in the script that come after the \startpipeline
3119+
* that's at position st->command, and the first \endpipeline we find.
3120+
*
3121+
* This sets the ->prepared flag for each relevant command as well as the
3122+
* \startpipeline itself, but doesn't move the st->command counter.
3123+
*/
3124+
static void
3125+
prepareCommandsInPipeline(CState *st)
3126+
{
3127+
int j;
3128+
Command **commands = sql_script[st->use_file].commands;
3129+
3130+
Assert(commands[st->command]->type == META_COMMAND &&
3131+
commands[st->command]->meta == META_STARTPIPELINE);
3132+
3133+
/*
3134+
* We set the 'prepared' flag on the \startpipeline itself to flag that we
3135+
* don't need to do this next time without calling prepareCommand(), even
3136+
* though we don't actually prepare this command.
3137+
*/
3138+
if (st->prepared &&
3139+
st->prepared[st->use_file][st->command])
3140+
return;
3141+
3142+
for (j = st->command + 1; commands[j] != NULL; j++)
3143+
{
3144+
if (commands[j]->type == META_COMMAND &&
3145+
commands[j]->meta == META_ENDPIPELINE)
3146+
break;
3147+
3148+
prepareCommand(st, j);
3149+
}
3150+
3151+
st->prepared[st->use_file][st->command] = true;
3152+
}
3153+
30773154
/* Send a SQL command, using the chosen querymode */
30783155
static bool
30793156
sendCommand(CState *st, Command *command)
@@ -3104,50 +3181,13 @@ sendCommand(CState *st, Command *command)
31043181
}
31053182
else if (querymode == QUERY_PREPARED)
31063183
{
3107-
char name[MAX_PREPARE_NAME];
31083184
const char *params[MAX_ARGS];
31093185

3110-
if (!st->prepared[st->use_file])
3111-
{
3112-
int j;
3113-
Command **commands = sql_script[st->use_file].commands;
3114-
3115-
for (j = 0; commands[j] != NULL; j++)
3116-
{
3117-
PGresult *res;
3118-
char name[MAX_PREPARE_NAME];
3119-
3120-
if (commands[j]->type != SQL_COMMAND)
3121-
continue;
3122-
preparedStatementName(name, st->use_file, j);
3123-
if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
3124-
{
3125-
res = PQprepare(st->con, name,
3126-
commands[j]->argv[0], commands[j]->argc - 1, NULL);
3127-
if (PQresultStatus(res) != PGRES_COMMAND_OK)
3128-
pg_log_error("%s", PQerrorMessage(st->con));
3129-
PQclear(res);
3130-
}
3131-
else
3132-
{
3133-
/*
3134-
* In pipeline mode, we use asynchronous functions. If a
3135-
* server-side error occurs, it will be processed later
3136-
* among the other results.
3137-
*/
3138-
if (!PQsendPrepare(st->con, name,
3139-
commands[j]->argv[0], commands[j]->argc - 1, NULL))
3140-
pg_log_error("%s", PQerrorMessage(st->con));
3141-
}
3142-
}
3143-
st->prepared[st->use_file] = true;
3144-
}
3145-
3186+
prepareCommand(st, st->command);
31463187
getQueryParams(&st->variables, command, params);
3147-
preparedStatementName(name, st->use_file, st->command);
31483188

3149-
pg_log_debug("client %d sending %s", st->id, name);
3150-
r = PQsendQueryPrepared(st->con, name, command->argc - 1,
3189+
pg_log_debug("client %d sending %s", st->id, command->prepname);
3190+
r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1,
31513191
params, NULL, NULL, 0);
31523192
}
31533193
else /* unknown sql mode */
@@ -3620,7 +3660,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
36203660
thread->conn_duration += now - start;
36213661

36223662
/* Reset session-local state */
3623-
memset(st->prepared, 0, sizeof(st->prepared));
3663+
pg_free(st->prepared);
3664+
st->prepared = NULL;
36243665
}
36253666

36263667
/*
@@ -4387,6 +4428,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
43874428
return CSTATE_ABORTED;
43884429
}
43894430

4431+
/*
4432+
* If we're in prepared-query mode, we need to prepare all the
4433+
* commands that are inside the pipeline before we actually start the
4434+
* pipeline itself. This solves the problem that running BEGIN
4435+
* ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
4436+
* snapshot having been acquired by the prepare within the pipeline.
4437+
*/
4438+
if (querymode == QUERY_PREPARED)
4439+
prepareCommandsInPipeline(st);
4440+
43904441
if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
43914442
{
43924443
commandFailed(st, "startpipeline", "already in pipeline mode");
@@ -5466,6 +5517,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
54665517
my_command->varprefix = NULL; /* allocated later, if needed */
54675518
my_command->expr = NULL;
54685519
initSimpleStats(&my_command->stats);
5520+
my_command->prepname = NULL; /* set later, if needed */
54695521

54705522
return my_command;
54715523
}
@@ -5497,6 +5549,7 @@ static void
54975549
postprocess_sql_command(Command *my_command)
54985550
{
54995551
char buffer[128];
5552+
static int prepnum = 0;
55005553

55015554
Assert(my_command->type == SQL_COMMAND);
55025555

@@ -5505,15 +5558,17 @@ postprocess_sql_command(Command *my_command)
55055558
buffer[strcspn(buffer, "\n\r")] = '\0';
55065559
my_command->first_line = pg_strdup(buffer);
55075560

5508-
/* parse query if necessary */
5561+
/* Parse query and generate prepared statement name, if necessary */
55095562
switch (querymode)
55105563
{
55115564
case QUERY_SIMPLE:
55125565
my_command->argv[0] = my_command->lines.data;
55135566
my_command->argc++;
55145567
break;
5515-
case QUERY_EXTENDED:
55165568
case QUERY_PREPARED:
5569+
my_command->prepname = psprintf("P_%d", prepnum++);
5570+
/* fall through */
5571+
case QUERY_EXTENDED:
55175572
if (!parseQuery(my_command))
55185573
exit(1);
55195574
break;

src/bin/pgbench/t/001_pgbench_with_server.pl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,26 @@
839839
}
840840
});
841841

842+
# Working \startpipeline in prepared query mode with serializable
843+
$node->pgbench(
844+
'-c4 -j2 -t 10 -n -M prepared',
845+
0,
846+
[
847+
qr{type: .*/001_pgbench_pipeline_serializable},
848+
qr{actually processed: (\d+)/\1}
849+
],
850+
[],
851+
'working \startpipeline with serializable',
852+
{
853+
'001_pgbench_pipeline_serializable' => q{
854+
-- test startpipeline with serializable
855+
\startpipeline
856+
BEGIN ISOLATION LEVEL SERIALIZABLE;
857+
} . "select 1;\n" x 10 . q{
858+
END;
859+
\endpipeline
860+
}
861+
});
842862

843863
# trigger many expression errors
844864
my @errors = (

0 commit comments

Comments
 (0)