16
16
#include <netinet/tcp.h>
17
17
#include <netinet/in.h>
18
18
19
+ #ifdef USE_EPOLL
20
+ #include <sys/epoll.h>
21
+ #endif
22
+
19
23
#include "server.h"
20
24
#include "limits.h"
21
25
#include "util.h"
22
26
#include "sockhub.h"
23
27
24
28
typedef struct buffer_t {
25
- int ready ; // number of bytes that are ready to be sent/processed
29
+ int ready ; /* number of bytes that are ready to be sent/processed */
26
30
ShubMessageHdr * curmessage ;
27
- char * data ; // dynamically allocated buffer
31
+ char * data ; /* dynamically allocated buffer */
28
32
} buffer_t ;
29
33
30
34
typedef struct stream_data_t * stream_t ;
31
35
32
36
typedef struct client_data_t {
33
- stream_t stream ; // NULL: client value is empty
37
+ stream_t stream ; /* NULL: client value is empty */
34
38
void * userdata ;
35
39
unsigned int chan ;
36
40
} client_data_t ;
37
41
38
42
typedef struct stream_data_t {
39
43
int fd ;
40
- bool good ; // 'false': stop serving this stream and disconnect when possible
44
+ bool good ; /* 'false': stop serving this stream and disconnect when possible */
41
45
buffer_t input ;
42
46
buffer_t output ;
43
47
44
- // a map: 'chan' -> client_data_t
45
- // 'chan' is expected to be < MAX_FDS which is pretty low
46
- client_data_t * clients ; // dynamically allocated
48
+ /* a map: 'chan' -> client_data_t */
49
+ /* 'chan' is expected to be < MAX_FDS which is pretty low */
50
+ client_data_t * clients ; /* dynamically allocated */
51
+ struct stream_data_t * next ;
47
52
} stream_data_t ;
48
53
49
54
typedef struct server_data_t {
50
55
char * host ;
51
56
int port ;
52
57
53
- int listener ; // the listening socket
54
- fd_set all ; // all sockets including the listener
58
+ int listener ; /* the listening socket */
59
+ #ifdef USE_EPOLL
60
+ int epollfd ;
61
+ #else
62
+ fd_set all ; /* all sockets including the listener */
55
63
int maxfd ;
56
-
57
- int streamsnum ;
58
- stream_data_t streams [ MAX_STREAMS ] ;
64
+ #endif
65
+ stream_t used_chain ;
66
+ stream_t free_chain ;
59
67
60
68
onmessage_callback_t onmessage ;
61
69
onconnect_callback_t onconnect ;
62
70
ondisconnect_callback_t ondisconnect ;
63
71
} server_data_t ;
64
72
65
- // Returns the created socket, or -1 if failed.
73
+ /* Returns the created socket, or -1 if failed. */
66
74
static int create_listening_socket (const char * host , int port ) {
67
75
int s = socket (AF_INET , SOCK_STREAM , 0 );
68
76
if (s == -1 ) {
@@ -113,32 +121,56 @@ server_t server_init(
113
121
return server ;
114
122
}
115
123
124
+ bool register_socket (server_t server , int fd , stream_t stream )
125
+ {
126
+ #ifdef USE_EPOLL
127
+ struct epoll_event ev ;
128
+ ev .events = EPOLLIN ;
129
+ ev .data .ptr = (void * )stream ;
130
+ if (epoll_ctl (server -> epollfd , EPOLL_CTL_ADD , fd , & ev ) < 0 ) {
131
+ return false;
132
+ }
133
+ #else
134
+ FD_SET (fd , & server -> all );
135
+ if (fd > server -> maxfd ) {
136
+ server -> maxfd = fd ;
137
+ }
138
+ #endif
139
+ return true;
140
+ }
141
+
116
142
bool server_start (server_t server ) {
117
143
debug ("starting the server\n" );
118
- server -> streamsnum = 0 ;
119
-
144
+ server -> free_chain = NULL ;
145
+ server -> used_chain = NULL ;
146
+
120
147
server -> listener = create_listening_socket (server -> host , server -> port );
121
148
if (server -> listener == -1 ) {
122
149
return false;
123
150
}
124
151
152
+ #ifdef USE_EPOLL
153
+ server -> epollfd = epoll_create (MAX_EVENTS );
154
+ if (server -> epollfd < 0 ) {
155
+ return false;
156
+ }
157
+ #else
125
158
FD_ZERO (& server -> all );
126
- FD_SET (server -> listener , & server -> all );
127
- server -> maxfd = server -> listener ;
128
-
129
- return true;
159
+ server -> maxfd = 0 ;
160
+ #endif
161
+ return register_socket (server , server -> listener , NULL );
130
162
}
131
163
132
164
static bool stream_flush (stream_t stream ) {
133
165
int tosend = stream -> output .ready ;
134
166
if (tosend == 0 ) {
135
- // nothing to do
167
+ /* nothing to do */
136
168
return true;
137
169
}
138
170
139
171
char * cursor = stream -> output .data ;
140
172
while (tosend > 0 ) {
141
- // repeat sending until we send everything
173
+ /* repeat sending until we send everything */
142
174
int sent = send (stream -> fd , cursor , tosend , 0 );
143
175
if (sent == -1 ) {
144
176
shout ("failed to flush the stream\n" );
@@ -153,7 +185,7 @@ static bool stream_flush(stream_t stream) {
153
185
stream -> output .ready = 0 ;
154
186
ShubMessageHdr * msg = stream -> output .curmessage ;
155
187
if (msg ) {
156
- // move the unfinished message to the start of the buffer
188
+ /* move the unfinished message to the start of the buffer */
157
189
memmove (stream -> output .data , msg , msg -> size + sizeof (ShubMessageHdr ));
158
190
stream -> output .curmessage = (ShubMessageHdr * )stream -> output .data ;
159
191
}
@@ -163,10 +195,9 @@ static bool stream_flush(stream_t stream) {
163
195
164
196
static void server_flush (server_t server ) {
165
197
debug ("flushing the streams\n" );
166
- int i ;
167
- for (i = 0 ; i < server -> streamsnum ; i ++ ) {
168
- stream_t stream = server -> streams + i ;
169
- stream_flush (stream );
198
+ stream_t s ;
199
+ for (s = server -> used_chain ; s != NULL ; s = s -> next ) {
200
+ stream_flush (s );
170
201
}
171
202
}
172
203
@@ -187,7 +218,7 @@ static void stream_init(stream_t stream, int fd) {
187
218
188
219
stream -> clients = malloc (MAX_TRANSACTIONS * sizeof (client_data_t ));
189
220
assert (stream -> clients );
190
- // mark all clients as empty
221
+ /* mark all clients as empty */
191
222
for (i = 0 ; i < MAX_TRANSACTIONS ; i ++ ) {
192
223
stream -> clients [i ].stream = NULL ;
193
224
}
@@ -207,36 +238,28 @@ static void server_stream_destroy(server_t server, stream_t stream) {
207
238
}
208
239
}
209
240
}
210
-
211
- FD_CLR (stream -> fd , & server -> all );
241
+ #ifdef USE_EPOLL
242
+ epoll_ctl (server -> epollfd , EPOLL_CTL_DEL , stream -> fd , NULL );
243
+ #else
244
+ FD_CLR (stream -> fd , & server -> all );
245
+ #endif
212
246
close (stream -> fd );
213
247
free (stream -> clients );
214
248
free (stream -> input .data );
215
249
free (stream -> output .data );
216
250
}
217
251
218
- static void stream_move (stream_t dst , stream_t src ) {
219
- int i ;
220
- * dst = * src ;
221
- for (i = 0 ; i < MAX_TRANSACTIONS ; i ++ ) {
222
- if (dst -> clients [i ].stream ) {
223
- dst -> clients [i ].stream = dst ;
224
- }
225
- }
226
- }
227
-
228
252
static void server_close_bad_streams (server_t server ) {
229
- int i ;
230
- for (i = server -> streamsnum - 1 ; i >= 0 ; i -- ) {
231
- stream_t stream = server -> streams + i ;
232
- if (!stream -> good ) {
233
- server_stream_destroy (server , stream );
234
- if (i != server -> streamsnum - 1 ) {
235
- // move the last one here
236
- * stream = server -> streams [server -> streamsnum - 1 ];
237
- stream_move (stream , server -> streams + server -> streamsnum - 1 );
238
- }
239
- server -> streamsnum -- ;
253
+ stream_t s , next , * spp ;
254
+ for (spp = & server -> used_chain ; (s = * spp ) != NULL ; s = next ) {
255
+ next = s -> next ;
256
+ if (!s -> good ) {
257
+ server_stream_destroy (server , s );
258
+ * spp = next ;
259
+ s -> next = server -> free_chain ;
260
+ server -> free_chain = s ;
261
+ } else {
262
+ spp = & s -> next ;
240
263
}
241
264
}
242
265
}
@@ -279,7 +302,7 @@ static bool stream_message_append(stream_t stream, size_t len, void *data) {
279
302
280
303
int newsize = stream -> output .curmessage -> size + sizeof (ShubMessageHdr ) + len ;
281
304
if (newsize > BUFFER_SIZE ) {
282
- // the flushing will not help here
305
+ /* the flushing will not help here */
283
306
shout ("the message cannot be bigger than the buffer size\n" );
284
307
stream -> good = false;
285
308
return false;
@@ -326,7 +349,8 @@ bool client_message_finish(client_t client) {
326
349
return stream_message_finish (client -> stream );
327
350
}
328
351
329
- bool client_message_shortcut (client_t client , xid_t arg ) {
352
+ bool client_message_shortcut (client_t client , xid_t arg )
353
+ {
330
354
if (!stream_message_start (client -> stream , client -> chan )) {
331
355
return false;
332
356
}
@@ -348,36 +372,33 @@ static bool server_accept(server_t server) {
348
372
return false;
349
373
}
350
374
debug ("a new connection accepted\n" );
351
-
352
- if (server -> streamsnum >= MAX_STREAMS ) {
353
- shout ("streams limit hit, disconnecting the accepted connection\n" );
354
- close (fd );
355
- return false;
375
+
376
+ stream_t s = server -> free_chain ;
377
+ if (s == NULL ) {
378
+ s = malloc (sizeof (stream_data_t ));
379
+ } else {
380
+ server -> free_chain = s -> next ;
356
381
}
382
+ /* add new stream */
383
+ s -> next = server -> used_chain ;
384
+ server -> used_chain = s ;
357
385
358
- // add new stream
359
- stream_t s = server -> streams + server -> streamsnum ++ ;
360
386
stream_init (s , fd );
361
387
362
- FD_SET (fd , & server -> all );
363
- if (fd > server -> maxfd ) {
364
- server -> maxfd = fd ;
365
- }
366
-
367
- return true;
388
+ return register_socket (server , fd , s );
368
389
}
369
390
370
391
static client_t stream_get_client (stream_t stream , unsigned int chan , bool * isnew ) {
371
392
assert (chan < MAX_TRANSACTIONS );
372
393
client_t client = stream -> clients + chan ;
373
394
if (client -> stream == NULL ) {
374
- // client is new
395
+ /* client is new */
375
396
client -> stream = stream ;
376
397
client -> chan = chan ;
377
398
* isnew = true;
378
399
client -> userdata = NULL ;
379
400
} else {
380
- // collisions should not happen
401
+ /* collisions should not happen */
381
402
assert (client -> chan == chan );
382
403
* isnew = false;
383
404
}
@@ -412,7 +433,7 @@ static bool server_stream_handle(server_t server, stream_t stream) {
412
433
ShubMessageHdr * msg = (ShubMessageHdr * )cursor ;
413
434
int header_and_data = sizeof (ShubMessageHdr ) + msg -> size ;
414
435
if (header_and_data <= toprocess ) {
415
- // handle message
436
+ /* handle message */
416
437
bool isnew ;
417
438
client_t client = stream_get_client (stream , msg -> chan , & isnew );
418
439
if (isnew ) {
@@ -457,9 +478,30 @@ static bool server_stream_handle(server_t server, stream_t stream) {
457
478
void server_loop (server_t server ) {
458
479
while (1 ) {
459
480
int i ;
481
+ int numready ;
482
+ #ifdef USE_EPOLL
483
+ struct epoll_event events [MAX_EVENTS ];
484
+ numready = epoll_wait (server -> epollfd , events , MAX_EVENTS , -1 );
485
+ if (numready < 0 ) {
486
+ shout ("failed to select: %s\n" , strerror (errno ));
487
+ return ;
488
+ }
489
+ for (i = 0 ; i < numready ; i ++ ) {
490
+ stream_t stream = (stream_t )events [i ].data .ptr ;
491
+ if (stream == NULL ) {
492
+ server_accept (server );
493
+ } else {
494
+ if (events [i ].events & EPOLLERR ) {
495
+ stream -> good = false;
496
+ } else if (events [i ].events & EPOLLIN ) {
497
+ server_stream_handle (server , stream );
498
+ }
499
+ }
500
+ }
501
+ #else
460
502
fd_set readfds = server -> all ;
461
- debug ("selecting\n" );
462
503
int numready = select (server -> maxfd + 1 , & readfds , NULL , NULL , NULL );
504
+ stream_t s ;
463
505
if (numready == -1 ) {
464
506
shout ("failed to select: %s\n" , strerror (errno ));
465
507
return ;
@@ -470,14 +512,13 @@ void server_loop(server_t server) {
470
512
server_accept (server );
471
513
}
472
514
473
- for (i = 0 ; (i < server -> streamsnum ) && (numready > 0 ); i ++ ) {
474
- stream_t stream = server -> streams + i ;
475
- if (FD_ISSET (stream -> fd , & readfds )) {
476
- server_stream_handle (server , stream );
515
+ for (s = server_used_chain ; s != NULL && numready > 0 ; s = s -> next ) {
516
+ if (FD_ISSET (s -> fd , & readfds )) {
517
+ server_stream_handle (server , s );
477
518
numready -- ;
478
519
}
479
520
}
480
-
521
+ #endif
481
522
server_close_bad_streams (server );
482
523
server_flush (server );
483
524
}
@@ -501,7 +542,7 @@ unsigned client_get_ip_addr(client_t client)
501
542
}
502
543
503
544
#if 0
504
- // usage example
545
+ /* usage example */
505
546
506
547
void test_onconnect (client_t client ) {
507
548
char * name = "hello" ;
0 commit comments