@@ -126,21 +126,37 @@ typedef struct socket_set
126
126
#define THREAD_JOIN (handle ) \
127
127
(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
128
128
GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
129
+ #define THREAD_BARRIER_T SYNCHRONIZATION_BARRIER
130
+ #define THREAD_BARRIER_INIT (barrier , n ) \
131
+ (InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO())
132
+ #define THREAD_BARRIER_WAIT (barrier ) \
133
+ EnterSynchronizationBarrier((barrier), \
134
+ SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)
135
+ #define THREAD_BARRIER_DESTROY (barrier )
129
136
#elif defined(ENABLE_THREAD_SAFETY )
130
137
/* Use POSIX threads */
131
- #include <pthread.h>
138
+ #include "port/pg_pthread.h"
132
139
#define THREAD_T pthread_t
133
140
#define THREAD_FUNC_RETURN_TYPE void *
134
141
#define THREAD_FUNC_RETURN return NULL
135
142
#define THREAD_CREATE (handle , function , arg ) \
136
143
pthread_create((handle), NULL, (function), (arg))
137
144
#define THREAD_JOIN (handle ) \
138
145
pthread_join((handle), NULL)
146
+ #define THREAD_BARRIER_T pthread_barrier_t
147
+ #define THREAD_BARRIER_INIT (barrier , n ) \
148
+ pthread_barrier_init((barrier), NULL, (n))
149
+ #define THREAD_BARRIER_WAIT (barrier ) pthread_barrier_wait((barrier))
150
+ #define THREAD_BARRIER_DESTROY (barrier ) pthread_barrier_destroy((barrier))
139
151
#else
140
152
/* No threads implementation, use none (-j 1) */
141
153
#define THREAD_T void *
142
154
#define THREAD_FUNC_RETURN_TYPE void *
143
155
#define THREAD_FUNC_RETURN return NULL
156
+ #define THREAD_BARRIER_T int
157
+ #define THREAD_BARRIER_INIT (barrier , n ) (*(barrier) = 0)
158
+ #define THREAD_BARRIER_WAIT (barrier )
159
+ #define THREAD_BARRIER_DESTROY (barrier )
144
160
#endif
145
161
146
162
@@ -326,6 +342,9 @@ typedef struct RandomState
326
342
/* Various random sequences are initialized from this one. */
327
343
static RandomState base_random_sequence ;
328
344
345
+ /* Synchronization barrier for start and connection */
346
+ static THREAD_BARRIER_T barrier ;
347
+
329
348
/*
330
349
* Connection state machine states.
331
350
*/
@@ -6121,6 +6140,10 @@ main(int argc, char **argv)
6121
6140
if (duration > 0 )
6122
6141
setalarm (duration );
6123
6142
6143
+ errno = THREAD_BARRIER_INIT (& barrier , nthreads );
6144
+ if (errno != 0 )
6145
+ pg_log_fatal ("could not initialize barrier: %m" );
6146
+
6124
6147
#ifdef ENABLE_THREAD_SAFETY
6125
6148
/* start all threads but thread 0 which is executed directly later */
6126
6149
for (i = 1 ; i < nthreads ; i ++ )
@@ -6191,6 +6214,8 @@ main(int argc, char **argv)
6191
6214
printResults (& stats , pg_time_now () - bench_start , conn_total_duration ,
6192
6215
bench_start - start_time , latency_late );
6193
6216
6217
+ THREAD_BARRIER_DESTROY (& barrier );
6218
+
6194
6219
if (exit_code != 0 )
6195
6220
pg_log_fatal ("Run was aborted; the above results are incomplete." );
6196
6221
@@ -6237,6 +6262,8 @@ threadRun(void *arg)
6237
6262
state [i ].state = CSTATE_CHOOSE_SCRIPT ;
6238
6263
6239
6264
/* READY */
6265
+ THREAD_BARRIER_WAIT (& barrier );
6266
+
6240
6267
thread_start = pg_time_now ();
6241
6268
thread -> started_time = thread_start ;
6242
6269
last_report = thread_start ;
@@ -6249,7 +6276,18 @@ threadRun(void *arg)
6249
6276
for (int i = 0 ; i < nstate ; i ++ )
6250
6277
{
6251
6278
if ((state [i ].con = doConnect ()) == NULL )
6279
+ {
6280
+ /*
6281
+ * On connection failure, we meet the barrier here in place of
6282
+ * GO before proceeding to the "done" path which will cleanup,
6283
+ * so as to avoid locking the process.
6284
+ *
6285
+ * It is unclear whether it is worth doing anything rather than
6286
+ * coldly exiting with an error message.
6287
+ */
6288
+ THREAD_BARRIER_WAIT (& barrier );
6252
6289
goto done ;
6290
+ }
6253
6291
}
6254
6292
6255
6293
/* compute connection delay */
@@ -6261,6 +6299,8 @@ threadRun(void *arg)
6261
6299
thread -> conn_duration = 0 ;
6262
6300
}
6263
6301
6302
+ /* GO */
6303
+ THREAD_BARRIER_WAIT (& barrier );
6264
6304
6265
6305
start = pg_time_now ();
6266
6306
thread -> bench_start = start ;
0 commit comments