@@ -395,10 +395,11 @@ typedef enum
395
395
*
396
396
* CSTATE_START_COMMAND starts the execution of a command. On a SQL
397
397
* command, the command is sent to the server, and we move to
398
- * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
399
- * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
400
- * meta-commands are executed immediately. If the command about to start
401
- * is actually beyond the end of the script, advance to CSTATE_END_TX.
398
+ * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
399
+ * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
400
+ * wait for it to expire. Other meta-commands are executed immediately. If
401
+ * the command about to start is actually beyond the end of the script,
402
+ * advance to CSTATE_END_TX.
402
403
*
403
404
* CSTATE_WAIT_RESULT waits until we get a result set back from the server
404
405
* for the current command.
@@ -530,7 +531,9 @@ typedef enum MetaCommand
530
531
META_IF , /* \if */
531
532
META_ELIF , /* \elif */
532
533
META_ELSE , /* \else */
533
- META_ENDIF /* \endif */
534
+ META_ENDIF , /* \endif */
535
+ META_STARTPIPELINE , /* \startpipeline */
536
+ META_ENDPIPELINE /* \endpipeline */
534
537
} MetaCommand ;
535
538
536
539
typedef enum QueryMode
@@ -2568,6 +2571,10 @@ getMetaCommand(const char *cmd)
2568
2571
mc = META_GSET ;
2569
2572
else if (pg_strcasecmp (cmd , "aset" ) == 0 )
2570
2573
mc = META_ASET ;
2574
+ else if (pg_strcasecmp (cmd , "startpipeline" ) == 0 )
2575
+ mc = META_STARTPIPELINE ;
2576
+ else if (pg_strcasecmp (cmd , "endpipeline" ) == 0 )
2577
+ mc = META_ENDPIPELINE ;
2571
2578
else
2572
2579
mc = META_NONE ;
2573
2580
return mc ;
@@ -2757,11 +2764,25 @@ sendCommand(CState *st, Command *command)
2757
2764
if (commands [j ]-> type != SQL_COMMAND )
2758
2765
continue ;
2759
2766
preparedStatementName (name , st -> use_file , j );
2760
- res = PQprepare (st -> con , name ,
2761
- commands [j ]-> argv [0 ], commands [j ]-> argc - 1 , NULL );
2762
- if (PQresultStatus (res ) != PGRES_COMMAND_OK )
2763
- pg_log_error ("%s" , PQerrorMessage (st -> con ));
2764
- PQclear (res );
2767
+ if (PQpipelineStatus (st -> con ) == PQ_PIPELINE_OFF )
2768
+ {
2769
+ res = PQprepare (st -> con , name ,
2770
+ commands [j ]-> argv [0 ], commands [j ]-> argc - 1 , NULL );
2771
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
2772
+ pg_log_error ("%s" , PQerrorMessage (st -> con ));
2773
+ PQclear (res );
2774
+ }
2775
+ else
2776
+ {
2777
+ /*
2778
+ * In pipeline mode, we use asynchronous functions. If a
2779
+ * server-side error occurs, it will be processed later
2780
+ * among the other results.
2781
+ */
2782
+ if (!PQsendPrepare (st -> con , name ,
2783
+ commands [j ]-> argv [0 ], commands [j ]-> argc - 1 , NULL ))
2784
+ pg_log_error ("%s" , PQerrorMessage (st -> con ));
2785
+ }
2765
2786
}
2766
2787
st -> prepared [st -> use_file ] = true;
2767
2788
}
@@ -2802,10 +2823,11 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
2802
2823
int qrynum = 0 ;
2803
2824
2804
2825
/*
2805
- * varprefix should be set only with \gset or \aset, and SQL commands do
2806
- * not need it.
2826
+ * varprefix should be set only with \gset or \aset, and \endpipeline and
2827
+ * SQL commands do not need it.
2807
2828
*/
2808
2829
Assert ((meta == META_NONE && varprefix == NULL ) ||
2830
+ ((meta == META_ENDPIPELINE ) && varprefix == NULL ) ||
2809
2831
((meta == META_GSET || meta == META_ASET ) && varprefix != NULL ));
2810
2832
2811
2833
res = PQgetResult (st -> con );
@@ -2874,6 +2896,13 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
2874
2896
/* otherwise the result is simply thrown away by PQclear below */
2875
2897
break ;
2876
2898
2899
+ case PGRES_PIPELINE_SYNC :
2900
+ pg_log_debug ("client %d pipeline ending" , st -> id );
2901
+ if (PQexitPipelineMode (st -> con ) != 1 )
2902
+ pg_log_error ("client %d failed to exit pipeline mode: %s" , st -> id ,
2903
+ PQerrorMessage (st -> con ));
2904
+ break ;
2905
+
2877
2906
default :
2878
2907
/* anything else is unexpected */
2879
2908
pg_log_error ("client %d script %d aborted in command %d query %d: %s" ,
@@ -3127,13 +3156,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
3127
3156
/* Execute the command */
3128
3157
if (command -> type == SQL_COMMAND )
3129
3158
{
3159
+ /* disallow \aset and \gset in pipeline mode */
3160
+ if (PQpipelineStatus (st -> con ) != PQ_PIPELINE_OFF )
3161
+ {
3162
+ if (command -> meta == META_GSET )
3163
+ {
3164
+ commandFailed (st , "gset" , "\\gset is not allowed in pipeline mode" );
3165
+ st -> state = CSTATE_ABORTED ;
3166
+ break ;
3167
+ }
3168
+ else if (command -> meta == META_ASET )
3169
+ {
3170
+ commandFailed (st , "aset" , "\\aset is not allowed in pipeline mode" );
3171
+ st -> state = CSTATE_ABORTED ;
3172
+ break ;
3173
+ }
3174
+ }
3175
+
3130
3176
if (!sendCommand (st , command ))
3131
3177
{
3132
3178
commandFailed (st , "SQL" , "SQL command send failed" );
3133
3179
st -> state = CSTATE_ABORTED ;
3134
3180
}
3135
3181
else
3136
- st -> state = CSTATE_WAIT_RESULT ;
3182
+ {
3183
+ /* Wait for results, unless in pipeline mode */
3184
+ if (PQpipelineStatus (st -> con ) == PQ_PIPELINE_OFF )
3185
+ st -> state = CSTATE_WAIT_RESULT ;
3186
+ else
3187
+ st -> state = CSTATE_END_COMMAND ;
3188
+ }
3137
3189
}
3138
3190
else if (command -> type == META_COMMAND )
3139
3191
{
@@ -3273,7 +3325,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
3273
3325
if (readCommandResponse (st ,
3274
3326
sql_script [st -> use_file ].commands [st -> command ]-> meta ,
3275
3327
sql_script [st -> use_file ].commands [st -> command ]-> varprefix ))
3276
- st -> state = CSTATE_END_COMMAND ;
3328
+ {
3329
+ /*
3330
+ * outside of pipeline mode: stop reading results.
3331
+ * pipeline mode: continue reading results until an
3332
+ * end-of-pipeline response.
3333
+ */
3334
+ if (PQpipelineStatus (st -> con ) != PQ_PIPELINE_ON )
3335
+ st -> state = CSTATE_END_COMMAND ;
3336
+ }
3277
3337
else
3278
3338
st -> state = CSTATE_ABORTED ;
3279
3339
break ;
@@ -3516,6 +3576,45 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
3516
3576
return CSTATE_ABORTED ;
3517
3577
}
3518
3578
}
3579
+ else if (command -> meta == META_STARTPIPELINE )
3580
+ {
3581
+ /*
3582
+ * In pipeline mode, we use a workflow based on libpq pipeline
3583
+ * functions.
3584
+ */
3585
+ if (querymode == QUERY_SIMPLE )
3586
+ {
3587
+ commandFailed (st , "startpipeline" , "cannot use pipeline mode with the simple query protocol" );
3588
+ return CSTATE_ABORTED ;
3589
+ }
3590
+
3591
+ if (PQpipelineStatus (st -> con ) != PQ_PIPELINE_OFF )
3592
+ {
3593
+ commandFailed (st , "startpipeline" , "already in pipeline mode" );
3594
+ return CSTATE_ABORTED ;
3595
+ }
3596
+ if (PQenterPipelineMode (st -> con ) == 0 )
3597
+ {
3598
+ commandFailed (st , "startpipeline" , "failed to enter pipeline mode" );
3599
+ return CSTATE_ABORTED ;
3600
+ }
3601
+ }
3602
+ else if (command -> meta == META_ENDPIPELINE )
3603
+ {
3604
+ if (PQpipelineStatus (st -> con ) != PQ_PIPELINE_ON )
3605
+ {
3606
+ commandFailed (st , "endpipeline" , "not in pipeline mode" );
3607
+ return CSTATE_ABORTED ;
3608
+ }
3609
+ if (!PQpipelineSync (st -> con ))
3610
+ {
3611
+ commandFailed (st , "endpipeline" , "failed to send a pipeline sync" );
3612
+ return CSTATE_ABORTED ;
3613
+ }
3614
+ /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
3615
+ /* collect pending results before getting out of pipeline mode */
3616
+ return CSTATE_WAIT_RESULT ;
3617
+ }
3519
3618
3520
3619
/*
3521
3620
* executing the expression or shell command might have taken a
@@ -4725,7 +4824,9 @@ process_backslash_command(PsqlScanState sstate, const char *source)
4725
4824
syntax_error (source , lineno , my_command -> first_line , my_command -> argv [0 ],
4726
4825
"missing command" , NULL , -1 );
4727
4826
}
4728
- else if (my_command -> meta == META_ELSE || my_command -> meta == META_ENDIF )
4827
+ else if (my_command -> meta == META_ELSE || my_command -> meta == META_ENDIF ||
4828
+ my_command -> meta == META_STARTPIPELINE ||
4829
+ my_command -> meta == META_ENDPIPELINE )
4729
4830
{
4730
4831
if (my_command -> argc != 1 )
4731
4832
syntax_error (source , lineno , my_command -> first_line , my_command -> argv [0 ],
0 commit comments