Skip to content

Commit bd824c1

Browse files
committed
Switch to epoll in sockhub
1 parent ac1796d commit bd824c1

File tree

4 files changed

+66
-24
lines changed

4 files changed

+66
-24
lines changed

contrib/multimaster/dtmd/src/main.c

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ static void gen_snapshot(Snapshot *s) {
195195
Transaction* t;
196196
int n = 0;
197197
s->times_sent = 0;
198-
#if 0
199198
for (t = (Transaction*)active_transactions.prev; t != (Transaction*)&active_transactions; t = (Transaction*)t->elem.prev) {
200199
/*
201200
if (t->xid < s->xmin) {
@@ -207,12 +206,6 @@ static void gen_snapshot(Snapshot *s) {
207206
*/
208207
s->active[n++] = t->xid;
209208
}
210-
#else
211-
if (!l2_list_is_empty(&active_transactions)) {
212-
s->active[0] = ((Transaction*)active_transactions.prev)->xid;
213-
n = 1;
214-
}
215-
#endif
216209
s->nactive = n;
217210
if (n > 0) {
218211
s->xmin = s->active[0];

contrib/multimaster/sockhub/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC = gcc
2-
CFLAGS = -c -I. -Wall -O3 -g -fPIC
2+
CFLAGS = -c -I. -Wall -O3 -g -fPIC -DUSE_EPOLL
33
LD = $(CC)
44
LDFLAGS = -g
55
AR = ar

contrib/multimaster/sockhub/sockhub.c

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,24 @@
1919

2020
#include "sockhub.h"
2121

22+
inline void ShubAddSocket(Shub* shub, int fd)
23+
{
24+
#ifdef USE_EPOLL
25+
struct epoll_event ev;
26+
ev.events = EPOLLIN;
27+
ev.data.fd = fd;
28+
if (epoll_ctl(shub->epollfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
29+
shub->params->error_handler("Failed to add socket to epoll set", SHUB_FATAL_ERROR);
30+
}
31+
#else
32+
FD_SET(fd, &shub->inset);
33+
if (fd > shub->max_fd) {
34+
shub->max_fd = fd;
35+
}
36+
#endif
37+
}
38+
39+
2240
static void default_error_handler(char const* msg, ShubErrorSeverity severity)
2341
{
2442
perror(msg);
@@ -68,7 +86,13 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
6886
static void close_socket(Shub* shub, int fd)
6987
{
7088
close(fd);
89+
#ifdef USE_EPOLL
90+
if (epoll_ctl(shub->epollfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
91+
shub->params->error_handler("Failed to add socket to epoll set", SHUB_RECOVERABLE_ERROR);
92+
}
93+
#else
7194
FD_CLR(fd, &shub->inset);
95+
#endif
7296
}
7397

7498
int ShubReadSocketEx(int sd, void* buf, int min_size, int max_size)
@@ -163,7 +187,8 @@ static void reconnect(Shub* shub)
163187
} else {
164188
int optval = 1;
165189
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
166-
FD_SET(shub->output, &shub->inset);
190+
191+
ShubAddSocket(shub, shub->output);
167192
if (sep != NULL) {
168193
*sep = ',';
169194
}
@@ -196,6 +221,7 @@ static void notify_disconnect(Shub* shub, int chan)
196221

197222
static void recovery(Shub* shub)
198223
{
224+
#ifndef USE_EPOLL
199225
int i, max_fd;
200226

201227
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
@@ -212,6 +238,7 @@ static void recovery(Shub* shub)
212238
}
213239
}
214240
}
241+
#endif
215242
}
216243

217244
void ShubInitialize(Shub* shub, ShubParams* params)
@@ -233,11 +260,14 @@ void ShubInitialize(Shub* shub, ShubParams* params)
233260
if (listen(shub->input, params->queue_size) < 0) {
234261
shub->params->error_handler("Failed to listen local socket", SHUB_FATAL_ERROR);
235262
}
236-
FD_ZERO(&shub->inset);
237-
FD_SET(shub->input, &shub->inset);
238-
239263
shub->output = -1;
240-
shub->max_fd = shub->input;
264+
#ifdef USE_EPOLL
265+
shub->epollfd = epoll_create(MAX_EVENTS);
266+
#else
267+
FD_ZERO(&shub->inset);
268+
shub->max_fd = 0;
269+
#endif
270+
ShubAddSocket(shub, shub->input);
241271
reconnect(shub);
242272

243273
shub->in_buffer = malloc(params->buffer_size);
@@ -266,34 +296,42 @@ void ShubLoop(Shub* shub)
266296
sigprocmask(SIG_UNBLOCK, &sset, NULL);
267297

268298
while (!stop) {
299+
int i, rc;
300+
#ifdef USE_EPOLL
301+
struct epoll_event events[MAX_EVENTS];
302+
rc = epoll_wait(shub->epollfd, events, MAX_EVENTS, shub->params->delay);
303+
#else
269304
fd_set events;
270305
struct timeval tm;
271-
int i, rc;
272306
int max_fd = shub->max_fd;
273307

274308
tm.tv_sec = shub->params->delay/1000;
275309
tm.tv_usec = shub->params->delay % 1000 * 1000;
276310

277-
events = shub->inset;
278311
rc = select(max_fd+1, &events, NULL, NULL, shub->in_buffer_used == 0 ? NULL : &tm);
312+
#endif
279313
if (rc < 0) {
280314
if (errno != EINTR) {
281315
shub->params->error_handler("Select failed", SHUB_RECOVERABLE_ERROR);
282316
recovery(shub);
283317
}
284318
} else {
285319
if (rc > 0) {
286-
for (i = 0; i <= max_fd; i++) {
320+
#ifdef USE_EPOLL
321+
int j;
322+
for (j = 0; j < rc; j++) {
323+
{
324+
i = events[j].data.fd;
325+
#else
326+
for (i = 0; i <= max_fd; i++) {
287327
if (FD_ISSET(i, &events)) {
328+
#endif
288329
if (i == shub->input) { /* accept incomming connection */
289330
int s = accept(i, NULL, NULL);
290331
if (s < 0) {
291332
shub->params->error_handler("Failed to accept socket", SHUB_RECOVERABLE_ERROR);
292333
} else {
293-
if (s > shub->max_fd) {
294-
shub->max_fd = s;
295-
}
296-
FD_SET(s, &shub->inset);
334+
ShubAddSocket(shub, i);
297335
}
298336
} else if (i == shub->output) { /* receive response from server */
299337
/* try to read as much as possible */
@@ -420,10 +458,10 @@ void ShubLoop(Shub* shub)
420458
do {
421459
unsigned int n = processed + size > buffer_size ? buffer_size - processed : size;
422460
if (chan >= 0 && !ShubReadSocket(chan, shub->in_buffer + processed, n)) {
423-
char buf[1024];
424-
sprintf(buf, "Failed to read local socket rc=%d, len=%d, errno=%d", rc, n, errno);
425-
shub->params->error_handler(buf, SHUB_RECOVERABLE_ERROR);
426-
//shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
461+
char buf[1024];
462+
sprintf(buf, "Failed to read local socket rc=%d, len=%d, errno=%d", rc, n, errno);
463+
shub->params->error_handler(buf, SHUB_RECOVERABLE_ERROR);
464+
//shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
427465
close_socket(shub, chan);
428466
if (hdr != NULL) { /* if message header is not yet sent to the server... */
429467
/* ... then skip this message */

contrib/multimaster/sockhub/sockhub.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
#ifndef __SOCKHUB_H__
22
#define __SOCKHUB_H__
33

4+
5+
#ifdef USE_EPOLL
6+
#include <sys/epoll.h>
7+
#define MAX_EVENTS 1024
8+
#else
49
#include <sys/select.h>
10+
#endif
11+
512

613
typedef struct {
714
unsigned int size : 24; /* size of message without header */
@@ -40,8 +47,12 @@ typedef struct
4047
{
4148
int output;
4249
int input;
50+
#ifdef USE_EPOLL
51+
int epollfd;
52+
#else
4353
int max_fd;
4454
fd_set inset;
55+
#endif
4556
char* in_buffer;
4657
char* out_buffer;
4758
int in_buffer_used;

0 commit comments

Comments
 (0)