19
19
20
20
#include "sockhub.h"
21
21
22
+ inline void ShubAddSocket (Shub * shub , int fd )
23
+ {
24
+ #ifdef USE_EPOLL
25
+ struct epoll_event ev ;
26
+ ev .events = EPOLLIN ;
27
+ ev .data .fd = fd ;
28
+ if (epoll_ctl (shub -> epollfd , EPOLL_CTL_ADD , fd , & ev ) < 0 ) {
29
+ shub -> params -> error_handler ("Failed to add socket to epoll set" , SHUB_FATAL_ERROR );
30
+ }
31
+ #else
32
+ FD_SET (fd , & shub -> inset );
33
+ if (fd > shub -> max_fd ) {
34
+ shub -> max_fd = fd ;
35
+ }
36
+ #endif
37
+ }
38
+
39
+
22
40
static void default_error_handler (char const * msg , ShubErrorSeverity severity )
23
41
{
24
42
perror (msg );
@@ -68,7 +86,13 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
68
86
static void close_socket (Shub * shub , int fd )
69
87
{
70
88
close (fd );
89
+ #ifdef USE_EPOLL
90
+ if (epoll_ctl (shub -> epollfd , EPOLL_CTL_DEL , fd , NULL ) < 0 ) {
91
+ shub -> params -> error_handler ("Failed to add socket to epoll set" , SHUB_RECOVERABLE_ERROR );
92
+ }
93
+ #else
71
94
FD_CLR (fd , & shub -> inset );
95
+ #endif
72
96
}
73
97
74
98
int ShubReadSocketEx (int sd , void * buf , int min_size , int max_size )
@@ -163,7 +187,8 @@ static void reconnect(Shub* shub)
163
187
} else {
164
188
int optval = 1 ;
165
189
setsockopt (shub -> output , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (optval ));
166
- FD_SET (shub -> output , & shub -> inset );
190
+
191
+ ShubAddSocket (shub , shub -> output );
167
192
if (sep != NULL ) {
168
193
* sep = ',' ;
169
194
}
@@ -196,6 +221,7 @@ static void notify_disconnect(Shub* shub, int chan)
196
221
197
222
static void recovery (Shub * shub )
198
223
{
224
+ #ifndef USE_EPOLL
199
225
int i , max_fd ;
200
226
201
227
for (i = 0 , max_fd = shub -> max_fd ; i <= max_fd ; i ++ ) {
@@ -212,6 +238,7 @@ static void recovery(Shub* shub)
212
238
}
213
239
}
214
240
}
241
+ #endif
215
242
}
216
243
217
244
void ShubInitialize (Shub * shub , ShubParams * params )
@@ -233,11 +260,14 @@ void ShubInitialize(Shub* shub, ShubParams* params)
233
260
if (listen (shub -> input , params -> queue_size ) < 0 ) {
234
261
shub -> params -> error_handler ("Failed to listen local socket" , SHUB_FATAL_ERROR );
235
262
}
236
- FD_ZERO (& shub -> inset );
237
- FD_SET (shub -> input , & shub -> inset );
238
-
239
263
shub -> output = -1 ;
240
- shub -> max_fd = shub -> input ;
264
+ #ifdef USE_EPOLL
265
+ shub -> epollfd = epoll_create (MAX_EVENTS );
266
+ #else
267
+ FD_ZERO (& shub -> inset );
268
+ shub -> max_fd = 0 ;
269
+ #endif
270
+ ShubAddSocket (shub , shub -> input );
241
271
reconnect (shub );
242
272
243
273
shub -> in_buffer = malloc (params -> buffer_size );
@@ -266,34 +296,42 @@ void ShubLoop(Shub* shub)
266
296
sigprocmask (SIG_UNBLOCK , & sset , NULL );
267
297
268
298
while (!stop ) {
299
+ int i , rc ;
300
+ #ifdef USE_EPOLL
301
+ struct epoll_event events [MAX_EVENTS ];
302
+ rc = epoll_wait (shub -> epollfd , events , MAX_EVENTS , shub -> params -> delay );
303
+ #else
269
304
fd_set events ;
270
305
struct timeval tm ;
271
- int i , rc ;
272
306
int max_fd = shub -> max_fd ;
273
307
274
308
tm .tv_sec = shub -> params -> delay /1000 ;
275
309
tm .tv_usec = shub -> params -> delay % 1000 * 1000 ;
276
310
277
- events = shub -> inset ;
278
311
rc = select (max_fd + 1 , & events , NULL , NULL , shub -> in_buffer_used == 0 ? NULL : & tm );
312
+ #endif
279
313
if (rc < 0 ) {
280
314
if (errno != EINTR ) {
281
315
shub -> params -> error_handler ("Select failed" , SHUB_RECOVERABLE_ERROR );
282
316
recovery (shub );
283
317
}
284
318
} else {
285
319
if (rc > 0 ) {
286
- for (i = 0 ; i <= max_fd ; i ++ ) {
320
+ #ifdef USE_EPOLL
321
+ int j ;
322
+ for (j = 0 ; j < rc ; j ++ ) {
323
+ {
324
+ i = events [j ].data .fd ;
325
+ #else
326
+ for (i = 0 ; i <= max_fd ; i ++ ) {
287
327
if (FD_ISSET (i , & events )) {
328
+ #endif
288
329
if (i == shub -> input ) { /* accept incomming connection */
289
330
int s = accept (i , NULL , NULL );
290
331
if (s < 0 ) {
291
332
shub -> params -> error_handler ("Failed to accept socket" , SHUB_RECOVERABLE_ERROR );
292
333
} else {
293
- if (s > shub -> max_fd ) {
294
- shub -> max_fd = s ;
295
- }
296
- FD_SET (s , & shub -> inset );
334
+ ShubAddSocket (shub , i );
297
335
}
298
336
} else if (i == shub -> output ) { /* receive response from server */
299
337
/* try to read as much as possible */
@@ -420,10 +458,10 @@ void ShubLoop(Shub* shub)
420
458
do {
421
459
unsigned int n = processed + size > buffer_size ? buffer_size - processed : size ;
422
460
if (chan >= 0 && !ShubReadSocket (chan , shub -> in_buffer + processed , n )) {
423
- char buf [1024 ];
424
- sprintf (buf , "Failed to read local socket rc=%d, len=%d, errno=%d" , rc , n , errno );
425
- shub -> params -> error_handler (buf , SHUB_RECOVERABLE_ERROR );
426
- //shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
461
+ char buf [1024 ];
462
+ sprintf (buf , "Failed to read local socket rc=%d, len=%d, errno=%d" , rc , n , errno );
463
+ shub -> params -> error_handler (buf , SHUB_RECOVERABLE_ERROR );
464
+ //shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
427
465
close_socket (shub , chan );
428
466
if (hdr != NULL ) { /* if message header is not yet sent to the server... */
429
467
/* ... then skip this message */
0 commit comments