@@ -74,7 +74,7 @@ static int pthread_join(pthread_t th, void **thread_return);
74
74
#include <pthread.h>
75
75
#else
76
76
/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
77
-
77
+ #define PTHREAD_FORK_EMULATION
78
78
#include <sys/wait.h>
79
79
80
80
#define pthread_t pg_pthread_t
@@ -164,6 +164,8 @@ bool use_log; /* log transaction latencies to a file */
164
164
bool use_quiet ; /* quiet logging onto stderr */
165
165
int agg_interval ; /* log aggregates instead of individual
166
166
* transactions */
167
+ int progress = 0 ; /* thread progress report every this seconds */
168
+ int progress_nclients = 0 ; /* number of clients for progress report */
167
169
bool is_connect ; /* establish connection for each transaction */
168
170
bool is_latencies ; /* report per-command latencies */
169
171
int main_pid ; /* main process id used in log filename */
@@ -352,6 +354,7 @@ usage(void)
352
354
"(default: simple)\n"
353
355
" -n, --no-vacuum do not run VACUUM before tests\n"
354
356
" -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
357
+ " -P, --progress NUM show thread progress report every NUM seconds\n"
355
358
" -r, --report-latencies report average latency per command\n"
356
359
" -s, --scale=NUM report this scale factor in output\n"
357
360
" -S, --select-only perform SELECT-only transactions\n"
@@ -2119,6 +2122,7 @@ main(int argc, char **argv)
2119
2122
{"log" , no_argument , NULL , 'l' },
2120
2123
{"no-vacuum" , no_argument , NULL , 'n' },
2121
2124
{"port" , required_argument , NULL , 'p' },
2125
+ {"progress" , required_argument , NULL , 'P' },
2122
2126
{"protocol" , required_argument , NULL , 'M' },
2123
2127
{"quiet" , no_argument , NULL , 'q' },
2124
2128
{"report-latencies" , no_argument , NULL , 'r' },
@@ -2202,7 +2206,7 @@ main(int argc, char **argv)
2202
2206
state = (CState * ) pg_malloc (sizeof (CState ));
2203
2207
memset (state , 0 , sizeof (CState ));
2204
2208
2205
- while ((c = getopt_long (argc , argv , "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:" , long_options , & optindex )) != -1 )
2209
+ while ((c = getopt_long (argc , argv , "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P: " , long_options , & optindex )) != -1 )
2206
2210
{
2207
2211
switch (c )
2208
2212
{
@@ -2357,6 +2361,16 @@ main(int argc, char **argv)
2357
2361
exit (1 );
2358
2362
}
2359
2363
break ;
2364
+ case 'P' :
2365
+ progress = atoi (optarg );
2366
+ if (progress <= 0 )
2367
+ {
2368
+ fprintf (stderr ,
2369
+ "thread progress delay (-P) must be positive (%s)\n" ,
2370
+ optarg );
2371
+ exit (1 );
2372
+ }
2373
+ break ;
2360
2374
case 0 :
2361
2375
/* This covers long options which take no argument. */
2362
2376
break ;
@@ -2482,6 +2496,7 @@ main(int argc, char **argv)
2482
2496
* changed after fork.
2483
2497
*/
2484
2498
main_pid = (int ) getpid ();
2499
+ progress_nclients = nclients ;
2485
2500
2486
2501
if (nclients > 1 )
2487
2502
{
@@ -2733,6 +2748,11 @@ threadRun(void *arg)
2733
2748
int nstate = thread -> nstate ;
2734
2749
int remains = nstate ; /* number of remaining clients */
2735
2750
int i ;
2751
+ /* for reporting progress: */
2752
+ int64 thread_start = INSTR_TIME_GET_MICROSEC (thread -> start_time );
2753
+ int64 last_report = thread_start ;
2754
+ int64 next_report = last_report + progress * 1000000 ;
2755
+ int64 last_count = 0 ;
2736
2756
2737
2757
AggVals aggs ;
2738
2758
@@ -2896,6 +2916,68 @@ threadRun(void *arg)
2896
2916
st -> con = NULL ;
2897
2917
}
2898
2918
}
2919
+
2920
+ #ifdef PTHREAD_FORK_EMULATION
2921
+ /* each process reports its own progression */
2922
+ if (progress )
2923
+ {
2924
+ instr_time now_time ;
2925
+ int64 now ;
2926
+ INSTR_TIME_SET_CURRENT (now_time );
2927
+ now = INSTR_TIME_GET_MICROSEC (now_time );
2928
+ if (now >= next_report )
2929
+ {
2930
+ /* generate and show report */
2931
+ int64 count = 0 ;
2932
+ int64 run = now - last_report ;
2933
+ float tps , total_run , latency ;
2934
+
2935
+ for (i = 0 ; i < nstate ; i ++ )
2936
+ count += state [i ].cnt ;
2937
+
2938
+ total_run = (now - thread_start ) / 1000000.0 ;
2939
+ tps = 1000000.0 * (count - last_count ) / run ;
2940
+ latency = 1000.0 * nstate / tps ;
2941
+
2942
+ fprintf (stderr , "progress %d: %.1f s, %.1f tps, %.3f ms lat\n" ,
2943
+ thread -> tid , total_run , tps , latency );
2944
+
2945
+ last_count = count ;
2946
+ last_report = now ;
2947
+ next_report += progress * 1000000 ;
2948
+ }
2949
+ }
2950
+ #else
2951
+ /* progress report by thread 0 for all threads */
2952
+ if (progress && thread -> tid == 0 )
2953
+ {
2954
+ instr_time now_time ;
2955
+ int64 now ;
2956
+ INSTR_TIME_SET_CURRENT (now_time );
2957
+ now = INSTR_TIME_GET_MICROSEC (now_time );
2958
+ if (now >= next_report )
2959
+ {
2960
+ /* generate and show report */
2961
+ int64 count = 0 ;
2962
+ int64 run = now - last_report ;
2963
+ float tps , total_run , latency ;
2964
+
2965
+ for (i = 0 ; i < progress_nclients ; i ++ )
2966
+ count += state [i ].cnt ;
2967
+
2968
+ total_run = (now - thread_start ) / 1000000.0 ;
2969
+ tps = 1000000.0 * (count - last_count ) / run ;
2970
+ latency = 1000.0 * progress_nclients / tps ;
2971
+
2972
+ fprintf (stderr , "progress: %.1f s, %.1f tps, %.3f ms lat\n" ,
2973
+ total_run , tps , latency );
2974
+
2975
+ last_count = count ;
2976
+ last_report = now ;
2977
+ next_report += progress * 1000000 ;
2978
+ }
2979
+ }
2980
+ #endif /* PTHREAD_FORK_EMULATION */
2899
2981
}
2900
2982
2901
2983
done :
0 commit comments