Skip to content

Commit aeb57af

Browse files
committed
pgbench: Synchronize client threads.
Wait until all pgbench threads are connected before benchmarking begins. This fixes a problem where some connections could take a very long time to be established because of lock contention from earlier connections, making results unstable and bogus with high connection counts. Author: Andres Freund <andres@anarazel.de> Author: Fabien COELHO <coelho@cri.ensmp.fr> Reviewed-by: Marina Polyakova <m.polyakova@postgrespro.ru> Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: David Rowley <dgrowleyml@gmail.com> Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de
1 parent 44bf3d5 commit aeb57af

File tree

1 file changed

+41
-1
lines changed

1 file changed

+41
-1
lines changed

src/bin/pgbench/pgbench.c

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,21 +126,37 @@ typedef struct socket_set
126126
#define THREAD_JOIN(handle) \
127127
(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
128128
GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
129+
#define THREAD_BARRIER_T SYNCHRONIZATION_BARRIER
130+
#define THREAD_BARRIER_INIT(barrier, n) \
131+
(InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO())
132+
#define THREAD_BARRIER_WAIT(barrier) \
133+
EnterSynchronizationBarrier((barrier), \
134+
SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)
135+
#define THREAD_BARRIER_DESTROY(barrier)
129136
#elif defined(ENABLE_THREAD_SAFETY)
130137
/* Use POSIX threads */
131-
#include <pthread.h>
138+
#include "port/pg_pthread.h"
132139
#define THREAD_T pthread_t
133140
#define THREAD_FUNC_RETURN_TYPE void *
134141
#define THREAD_FUNC_RETURN return NULL
135142
#define THREAD_CREATE(handle, function, arg) \
136143
pthread_create((handle), NULL, (function), (arg))
137144
#define THREAD_JOIN(handle) \
138145
pthread_join((handle), NULL)
146+
#define THREAD_BARRIER_T pthread_barrier_t
147+
#define THREAD_BARRIER_INIT(barrier, n) \
148+
pthread_barrier_init((barrier), NULL, (n))
149+
#define THREAD_BARRIER_WAIT(barrier) pthread_barrier_wait((barrier))
150+
#define THREAD_BARRIER_DESTROY(barrier) pthread_barrier_destroy((barrier))
139151
#else
140152
/* No threads implementation, use none (-j 1) */
141153
#define THREAD_T void *
142154
#define THREAD_FUNC_RETURN_TYPE void *
143155
#define THREAD_FUNC_RETURN return NULL
156+
#define THREAD_BARRIER_T int
157+
#define THREAD_BARRIER_INIT(barrier, n) (*(barrier) = 0)
158+
#define THREAD_BARRIER_WAIT(barrier)
159+
#define THREAD_BARRIER_DESTROY(barrier)
144160
#endif
145161

146162

@@ -326,6 +342,9 @@ typedef struct RandomState
326342
/* Various random sequences are initialized from this one. */
327343
static RandomState base_random_sequence;
328344

345+
/* Synchronization barrier for start and connection */
346+
static THREAD_BARRIER_T barrier;
347+
329348
/*
330349
* Connection state machine states.
331350
*/
@@ -6121,6 +6140,10 @@ main(int argc, char **argv)
61216140
if (duration > 0)
61226141
setalarm(duration);
61236142

6143+
errno = THREAD_BARRIER_INIT(&barrier, nthreads);
6144+
if (errno != 0)
6145+
pg_log_fatal("could not initialize barrier: %m");
6146+
61246147
#ifdef ENABLE_THREAD_SAFETY
61256148
/* start all threads but thread 0 which is executed directly later */
61266149
for (i = 1; i < nthreads; i++)
@@ -6191,6 +6214,8 @@ main(int argc, char **argv)
61916214
printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
61926215
bench_start - start_time, latency_late);
61936216

6217+
THREAD_BARRIER_DESTROY(&barrier);
6218+
61946219
if (exit_code != 0)
61956220
pg_log_fatal("Run was aborted; the above results are incomplete.");
61966221

@@ -6237,6 +6262,8 @@ threadRun(void *arg)
62376262
state[i].state = CSTATE_CHOOSE_SCRIPT;
62386263

62396264
/* READY */
6265+
THREAD_BARRIER_WAIT(&barrier);
6266+
62406267
thread_start = pg_time_now();
62416268
thread->started_time = thread_start;
62426269
last_report = thread_start;
@@ -6249,7 +6276,18 @@ threadRun(void *arg)
62496276
for (int i = 0; i < nstate; i++)
62506277
{
62516278
if ((state[i].con = doConnect()) == NULL)
6279+
{
6280+
/*
6281+
* On connection failure, we meet the barrier here in place of
6282+
* GO before proceeding to the "done" path which will cleanup,
6283+
* so as to avoid locking the process.
6284+
*
6285+
* It is unclear whether it is worth doing anything rather than
6286+
* coldly exiting with an error message.
6287+
*/
6288+
THREAD_BARRIER_WAIT(&barrier);
62526289
goto done;
6290+
}
62536291
}
62546292

62556293
/* compute connection delay */
@@ -6261,6 +6299,8 @@ threadRun(void *arg)
62616299
thread->conn_duration = 0;
62626300
}
62636301

6302+
/* GO */
6303+
THREAD_BARRIER_WAIT(&barrier);
62646304

62656305
start = pg_time_now();
62666306
thread->bench_start = start;

0 commit comments

Comments
 (0)