Skip to content

Commit b1d6a8f

Browse files
committed
pgbench: Refactor thread portability support.
Instead of maintaining an incomplete emulation of POSIX threads for Windows, let's use an extremely minimalist macro-based abstraction for now. A later patch will extend this, without the need to supply more complicated pthread emulation code. (There may be a need for a more serious portable thread abstraction in later projects, but this is not it.) Minor incidental problems fixed: it wasn't OK to use (pthread_t) 0 as a special value, it wasn't OK to compare thread_t values with ==, and we incorrectly assumed that pthread functions set errno. Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de
1 parent e4e87a3 commit b1d6a8f

File tree

1 file changed

+31
-95
lines changed

1 file changed

+31
-95
lines changed

src/bin/pgbench/pgbench.c

+31-95
Original file line numberDiff line numberDiff line change
@@ -111,22 +111,36 @@ typedef struct socket_set
111111
#endif /* POLL_USING_SELECT */
112112

113113
/*
114-
* Multi-platform pthread implementations
114+
* Multi-platform thread implementations
115115
*/
116116

117117
#ifdef WIN32
118-
/* Use native win32 threads on Windows */
119-
typedef struct win32_pthread *pthread_t;
120-
typedef int pthread_attr_t;
121-
122-
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
123-
static int pthread_join(pthread_t th, void **thread_return);
118+
/* Use Windows threads */
119+
#include <windows.h>
120+
#define GETERRNO() (_dosmaperr(GetLastError()), errno)
121+
#define THREAD_T HANDLE
122+
#define THREAD_FUNC_RETURN_TYPE unsigned
123+
#define THREAD_FUNC_RETURN return 0
124+
#define THREAD_CREATE(handle, function, arg) \
125+
((*(handle) = (HANDLE) _beginthreadex(NULL, 0, (function), (arg), 0, NULL)) == 0 ? errno : 0)
126+
#define THREAD_JOIN(handle) \
127+
(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
128+
GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
124129
#elif defined(ENABLE_THREAD_SAFETY)
125-
/* Use platform-dependent pthread capability */
130+
/* Use POSIX threads */
126131
#include <pthread.h>
132+
#define THREAD_T pthread_t
133+
#define THREAD_FUNC_RETURN_TYPE void *
134+
#define THREAD_FUNC_RETURN return NULL
135+
#define THREAD_CREATE(handle, function, arg) \
136+
pthread_create((handle), NULL, (function), (arg))
137+
#define THREAD_JOIN(handle) \
138+
pthread_join((handle), NULL)
127139
#else
128140
/* No threads implementation, use none (-j 1) */
129-
#define pthread_t void *
141+
#define THREAD_T void *
142+
#define THREAD_FUNC_RETURN_TYPE void *
143+
#define THREAD_FUNC_RETURN return NULL
130144
#endif
131145

132146

@@ -436,7 +450,7 @@ typedef struct
436450
typedef struct
437451
{
438452
int tid; /* thread id */
439-
pthread_t thread; /* thread handle */
453+
THREAD_T thread; /* thread handle */
440454
CState *state; /* array of CState */
441455
int nstate; /* length of state[] */
442456

@@ -459,8 +473,6 @@ typedef struct
459473
int64 latency_late; /* executed but late transactions */
460474
} TState;
461475

462-
#define INVALID_THREAD ((pthread_t) 0)
463-
464476
/*
465477
* queries read from files
466478
*/
@@ -604,7 +616,7 @@ static void doLog(TState *thread, CState *st,
604616
static void processXactStats(TState *thread, CState *st, instr_time *now,
605617
bool skipped, StatsData *agg);
606618
static void addScript(ParsedScript script);
607-
static void *threadRun(void *arg);
619+
static THREAD_FUNC_RETURN_TYPE threadRun(void *arg);
608620
static void finishCon(CState *st);
609621
static void setalarm(int seconds);
610622
static socket_set *alloc_socket_set(int count);
@@ -6142,26 +6154,21 @@ main(int argc, char **argv)
61426154
/* the first thread (i = 0) is executed by main thread */
61436155
if (i > 0)
61446156
{
6145-
int err = pthread_create(&thread->thread, NULL, threadRun, thread);
6157+
errno = THREAD_CREATE(&thread->thread, threadRun, thread);
61466158

6147-
if (err != 0 || thread->thread == INVALID_THREAD)
6159+
if (errno != 0)
61486160
{
61496161
pg_log_fatal("could not create thread: %m");
61506162
exit(1);
61516163
}
61526164
}
6153-
else
6154-
{
6155-
thread->thread = INVALID_THREAD;
6156-
}
61576165
}
61586166
#else
61596167
INSTR_TIME_SET_CURRENT(threads[0].start_time);
61606168
/* compute when to stop */
61616169
if (duration > 0)
61626170
end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
61636171
(int64) 1000000 * duration;
6164-
threads[0].thread = INVALID_THREAD;
61656172
#endif /* ENABLE_THREAD_SAFETY */
61666173

61676174
/* wait for threads and accumulate results */
@@ -6172,12 +6179,12 @@ main(int argc, char **argv)
61726179
TState *thread = &threads[i];
61736180

61746181
#ifdef ENABLE_THREAD_SAFETY
6175-
if (threads[i].thread == INVALID_THREAD)
6182+
if (i == 0)
61766183
/* actually run this thread directly in the main thread */
61776184
(void) threadRun(thread);
61786185
else
61796186
/* wait of other threads. should check that 0 is returned? */
6180-
pthread_join(thread->thread, NULL);
6187+
THREAD_JOIN(thread->thread);
61816188
#else
61826189
(void) threadRun(thread);
61836190
#endif /* ENABLE_THREAD_SAFETY */
@@ -6216,7 +6223,7 @@ main(int argc, char **argv)
62166223
return exit_code;
62176224
}
62186225

6219-
static void *
6226+
static THREAD_FUNC_RETURN_TYPE
62206227
threadRun(void *arg)
62216228
{
62226229
TState *thread = (TState *) arg;
@@ -6501,7 +6508,7 @@ threadRun(void *arg)
65016508
thread->logfile = NULL;
65026509
}
65036510
free_socket_set(sockets);
6504-
return NULL;
6511+
THREAD_FUNC_RETURN;
65056512
}
65066513

65076514
static void
@@ -6732,74 +6739,3 @@ socket_has_input(socket_set *sa, int fd, int idx)
67326739
}
67336740

67346741
#endif /* POLL_USING_SELECT */
6735-
6736-
6737-
/* partial pthread implementation for Windows */
6738-
6739-
#ifdef WIN32
6740-
6741-
typedef struct win32_pthread
6742-
{
6743-
HANDLE handle;
6744-
void *(*routine) (void *);
6745-
void *arg;
6746-
void *result;
6747-
} win32_pthread;
6748-
6749-
static unsigned __stdcall
6750-
win32_pthread_run(void *arg)
6751-
{
6752-
win32_pthread *th = (win32_pthread *) arg;
6753-
6754-
th->result = th->routine(th->arg);
6755-
6756-
return 0;
6757-
}
6758-
6759-
static int
6760-
pthread_create(pthread_t *thread,
6761-
pthread_attr_t *attr,
6762-
void *(*start_routine) (void *),
6763-
void *arg)
6764-
{
6765-
int save_errno;
6766-
win32_pthread *th;
6767-
6768-
th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
6769-
th->routine = start_routine;
6770-
th->arg = arg;
6771-
th->result = NULL;
6772-
6773-
th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
6774-
if (th->handle == NULL)
6775-
{
6776-
save_errno = errno;
6777-
free(th);
6778-
return save_errno;
6779-
}
6780-
6781-
*thread = th;
6782-
return 0;
6783-
}
6784-
6785-
static int
6786-
pthread_join(pthread_t th, void **thread_return)
6787-
{
6788-
if (th == NULL || th->handle == NULL)
6789-
return errno = EINVAL;
6790-
6791-
if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
6792-
{
6793-
_dosmaperr(GetLastError());
6794-
return errno;
6795-
}
6796-
6797-
if (thread_return)
6798-
*thread_return = th->result;
6799-
6800-
CloseHandle(th->handle);
6801-
free(th);
6802-
return 0;
6803-
}
6804-
6805-
#endif /* WIN32 */

0 commit comments

Comments
 (0)