15
15
#include <string.h>
16
16
#include <signal.h>
17
17
#include <errno.h>
18
+ #include <assert.h>
18
19
19
20
#include "sockhub.h"
20
21
@@ -70,11 +71,11 @@ static void close_socket(Shub* shub, int fd)
70
71
FD_CLR (fd , & shub -> inset );
71
72
}
72
73
73
- static int read_socket_ex (int sd , char * buf , int min_size , int max_size )
74
+ int ShubReadSocketEx (int sd , void * buf , int min_size , int max_size )
74
75
{
75
76
int received = 0 ;
76
77
while (received < min_size ) {
77
- int n = recv (sd , buf + received , max_size - received , 0 );
78
+ int n = recv (sd , ( char * ) buf + received , max_size - received , 0 );
78
79
if (n <= 0 ) {
79
80
break ;
80
81
}
@@ -83,20 +84,21 @@ static int read_socket_ex(int sd, char* buf, int min_size, int max_size)
83
84
return received ;
84
85
}
85
86
86
- static int read_socket (int sd , char * buf , int size )
87
+ int ShubReadSocket (int sd , void * buf , int size )
87
88
{
88
- return read_socket_ex (sd , buf , size , size ) == size ;
89
+ return ShubReadSocketEx (sd , buf , size , size ) == size ;
89
90
}
90
91
91
- static int write_socket (int sd , char const * buf , int size )
92
+ int ShubWriteSocket (int sd , void const * buf , int size )
92
93
{
94
+ char * src = (char * )buf ;
93
95
while (size != 0 ) {
94
- int n = send (sd , buf , size , 0 );
96
+ int n = send (sd , src , size , 0 );
95
97
if (n <= 0 ) {
96
98
return 0 ;
97
99
}
98
100
size -= n ;
99
- buf += n ;
101
+ src += n ;
100
102
}
101
103
return 1 ;
102
104
}
@@ -180,7 +182,7 @@ static void notify_disconnect(Shub* shub, int chan)
180
182
hdr -> code = MSG_DISCONNECT ;
181
183
shub -> in_buffer_used += sizeof (ShubMessageHdr );
182
184
if (shub -> in_buffer_used + sizeof (ShubMessageHdr ) > shub -> params -> buffer_size ) {
183
- while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
185
+ while (!ShubWriteSocket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
184
186
shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
185
187
reconnect (shub );
186
188
}
@@ -291,7 +293,7 @@ void ShubLoop(Shub* shub)
291
293
}
292
294
} else if (i == shub -> output ) { /* receive response from server */
293
295
/* try to read as much as possible */
294
- int available = read_socket_ex (shub -> output , shub -> out_buffer + shub -> out_buffer_used , sizeof (ShubMessageHdr ), buffer_size - shub -> out_buffer_used );
296
+ int available = ShubReadSocketEx (shub -> output , shub -> out_buffer + shub -> out_buffer_used , sizeof (ShubMessageHdr ), buffer_size - shub -> out_buffer_used );
295
297
int pos = 0 ;
296
298
if (available < sizeof (ShubMessageHdr )) {
297
299
shub -> params -> error_handler ("Failed to read inet socket" , SHUB_RECOVERABLE_ERROR );
@@ -307,7 +309,7 @@ void ShubLoop(Shub* shub)
307
309
unsigned int n = pos + sizeof (ShubMessageHdr ) + hdr -> size <= shub -> out_buffer_used
308
310
? hdr -> size + sizeof (ShubMessageHdr )
309
311
: shub -> out_buffer_used - pos ;
310
- if (!write_socket (chan , (char * )hdr , n )) {
312
+ if (!ShubWriteSocket (chan , (char * )hdr , n )) {
311
313
shub -> params -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
312
314
close_socket (shub , chan );
313
315
notify_disconnect (shub , chan );
@@ -318,12 +320,12 @@ void ShubLoop(Shub* shub)
318
320
int tail = hdr -> size + sizeof (ShubMessageHdr ) - n ;
319
321
do {
320
322
n = tail < buffer_size ? tail : buffer_size ;
321
- if (!read_socket (shub -> output , shub -> out_buffer , n )) {
323
+ if (!ShubReadSocket (shub -> output , shub -> out_buffer , n )) {
322
324
shub -> params -> error_handler ("Failed to read inet socket" , SHUB_RECOVERABLE_ERROR );
323
325
reconnect (shub );
324
326
continue ;
325
327
}
326
- if (chan >= 0 && !write_socket (chan , shub -> out_buffer , n )) {
328
+ if (chan >= 0 && !ShubWriteSocket (chan , shub -> out_buffer , n )) {
327
329
shub -> params -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
328
330
close_socket (shub , chan );
329
331
notify_disconnect (shub , chan );
@@ -344,7 +346,8 @@ void ShubLoop(Shub* shub)
344
346
int chan = i ;
345
347
int available = 0 ;
346
348
while (1 ) {
347
- available += read_socket_ex (chan , & shub -> in_buffer [shub -> in_buffer_used + available ], sizeof (ShubMessageHdr ) - available , buffer_size - shub -> in_buffer_used - available );
349
+ assert (sizeof (ShubMessageHdr ) > available );
350
+ available += ShubReadSocketEx (chan , & shub -> in_buffer [shub -> in_buffer_used + available ], sizeof (ShubMessageHdr ) - available , buffer_size - shub -> in_buffer_used - available );
348
351
if (available < sizeof (ShubMessageHdr )) {
349
352
shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
350
353
close_socket (shub , i );
@@ -366,12 +369,12 @@ void ShubLoop(Shub* shub)
366
369
/* message doesn't completely fit in buffer */
367
370
if (shub -> in_buffer_used != 0 ) { /* if buffer is not empty...*/
368
371
/* ... then send it */
369
- while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
372
+ while (!ShubWriteSocket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
370
373
shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
371
374
reconnect (shub );
372
375
}
373
376
/* move received message header to the beginning of the buffer */
374
- memcpy (shub -> in_buffer , shub -> in_buffer + shub -> in_buffer_used , buffer_size - shub -> in_buffer_used );
377
+ memcpy (shub -> in_buffer , shub -> in_buffer + shub -> in_buffer_used , available - shub -> in_buffer_used );
375
378
shub -> in_buffer_used = 0 ;
376
379
}
377
380
}
@@ -381,7 +384,7 @@ void ShubLoop(Shub* shub)
381
384
do {
382
385
unsigned int n = size + shub -> in_buffer_used > buffer_size ? buffer_size - shub -> in_buffer_used : size ;
383
386
/* fetch rest of message body */
384
- if (chan >= 0 && !read_socket (chan , shub -> in_buffer + shub -> in_buffer_used , n )) {
387
+ if (chan >= 0 && !ShubReadSocket (chan , shub -> in_buffer + shub -> in_buffer_used , n )) {
385
388
shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
386
389
close_socket (shub , chan );
387
390
if (hdr != NULL ) { /* if message header is not yet sent to the server... */
@@ -398,7 +401,7 @@ void ShubLoop(Shub* shub)
398
401
/* if there is no more free space in the buffer to receive new message header... */
399
402
if (shub -> in_buffer_used + sizeof (ShubMessageHdr ) > buffer_size ) {
400
403
/* ... then send buffer to the server */
401
- while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
404
+ while (!ShubWriteSocket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
402
405
shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
403
406
reconnect (shub );
404
407
}
@@ -417,7 +420,7 @@ void ShubLoop(Shub* shub)
417
420
if (chan >= 0 && pos != available ) { /* partly fetched message header */
418
421
if (shub -> in_buffer_used + sizeof (ShubMessageHdr ) > buffer_size ) {
419
422
/* message doesn't completely fit in buffer */
420
- while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
423
+ while (!ShubWriteSocket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
421
424
shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
422
425
reconnect (shub );
423
426
}
@@ -448,7 +451,7 @@ void ShubLoop(Shub* shub)
448
451
printf ("Average sent buffer size: %ld\n" , total_sent /total_count );
449
452
}
450
453
#endif
451
- while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
454
+ while (!ShubWriteSocket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
452
455
shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
453
456
reconnect (shub );
454
457
}
0 commit comments