@@ -69,19 +69,24 @@ static void close_socket(Shub* shub, int fd)
69
69
FD_CLR (fd , & shub -> inset );
70
70
}
71
71
72
- static int read_socket (int sd , char * buf , int size )
72
+ static int read_socket_ex (int sd , char * buf , int min_size , int max_size )
73
73
{
74
- while (size != 0 ) {
75
- int n = recv (sd , buf , size , 0 );
74
+ int received = 0 ;
75
+ while (received < min_size ) {
76
+ int n = recv (sd , buf + received , max_size - received , 0 );
76
77
if (n <= 0 ) {
77
- return 0 ;
78
+ break ;
78
79
}
79
- size -= n ;
80
- buf += n ;
80
+ received += n ;
81
81
}
82
- return 1 ;
82
+ return received ;
83
83
}
84
84
85
+ static int read_socket (int sd , char * buf , int size )
86
+ {
87
+ return read_socket_ex (sd , buf , size , size ) == size ;
88
+ }
89
+
85
90
static int write_socket (int sd , char const * buf , int size )
86
91
{
87
92
while (size != 0 ) {
@@ -235,9 +240,9 @@ void ShubLoop(Shub* shub)
235
240
}
236
241
} else if (i == shub -> output ) { /* receive response from server */
237
242
/* try to read as much as possible */
238
- int available = recv (shub -> output , shub -> out_buffer + shub -> out_buffer_used , buffer_size - shub -> out_buffer_used , 0 );
243
+ int available = read_socket_ex (shub -> output , shub -> out_buffer + shub -> out_buffer_used , sizeof ( ShubMessageHdr ), buffer_size - shub -> out_buffer_used );
239
244
int pos = 0 ;
240
- if (available <= 0 ) {
245
+ if (available < sizeof ( ShubMessageHdr ) ) {
241
246
shub -> params -> error_handler ("Failed to read inet socket" , SHUB_RECOVERABLE_ERROR );
242
247
reconnect (shub );
243
248
continue ;
@@ -279,74 +284,116 @@ void ShubLoop(Shub* shub)
279
284
}
280
285
pos += n ;
281
286
}
282
- /* Move partly fetched message header (if any) to the beginning of byffer */
287
+ /* Move partly fetched message header (if any) to the beginning of buffer */
283
288
memcpy (shub -> out_buffer , shub -> out_buffer + pos , shub -> out_buffer_used - pos );
284
289
shub -> out_buffer_used -= pos ;
285
290
} else { /* receive request from client */
286
- ShubMessageHdr * hdr = (ShubMessageHdr * )& shub -> in_buffer [shub -> in_buffer_used ];
287
291
int chan = i ;
288
- if (!read_socket (chan , (char * )hdr , sizeof (ShubMessageHdr ))) { /* fetch message header */
289
- shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
290
- close_socket (shub , i );
291
- } else {
292
- unsigned int size = hdr -> size ;
293
- hdr -> chan = chan ; /* remember socket descriptor from which this message was read */
294
- if (size + shub -> in_buffer_used + sizeof (ShubMessageHdr ) > buffer_size ) {
295
- /* message doesn't completely fit in buffer */
296
- if (shub -> in_buffer_used != 0 ) { /* if buffer is not empty...*/
297
- /* ... then send it */
298
- while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
299
- shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
300
- reconnect (shub );
301
- }
302
- /* move received message header to the beginning of the buffer */
303
- memcpy (shub -> in_buffer , shub -> in_buffer + shub -> in_buffer_used , sizeof (ShubMessageHdr ));
304
- shub -> in_buffer_used = 0 ;
305
- }
306
- }
307
- shub -> in_buffer_used += sizeof (ShubMessageHdr );
308
-
309
- do {
310
- unsigned int n = size + shub -> in_buffer_used > buffer_size ? buffer_size - shub -> in_buffer_used : size ;
311
- /* fetch message body */
312
- if (chan >= 0 && !read_socket (chan , shub -> in_buffer + shub -> in_buffer_used , n )) {
313
- shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
314
- close_socket (shub , chan );
315
- if (hdr != NULL ) { /* if message header is not yet sent to the server... */
316
- /* ... then skip this message */
317
- shub -> in_buffer_used = (char * )hdr - shub -> in_buffer ;
318
- break ;
319
- } else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
320
- chan = -1 ; /* do not try to read rest of body of this message */
292
+ int available = 0 ;
293
+ while (1 ) {
294
+ available += read_socket_ex (chan , & shub -> in_buffer [shub -> in_buffer_used + available ], sizeof (ShubMessageHdr ) - available , buffer_size - shub -> in_buffer_used - available );
295
+ if (available < sizeof (ShubMessageHdr )) {
296
+ shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
297
+ close_socket (shub , i );
298
+ } else {
299
+ int pos = 0 ;
300
+ /* loop through all fetched messages */
301
+ while (pos + sizeof (ShubMessageHdr ) <= available ) {
302
+ ShubMessageHdr * hdr = (ShubMessageHdr * )& shub -> in_buffer [shub -> in_buffer_used ];
303
+ unsigned int size = hdr -> size ;
304
+ pos += sizeof (ShubMessageHdr ) + size ;
305
+ hdr -> chan = chan ; /* remember socket descriptor from which this message was read */
306
+ if (pos <= available ) {
307
+ shub -> in_buffer_used += sizeof (ShubMessageHdr ) + size ;
308
+ continue ;
321
309
}
322
- }
323
- shub -> in_buffer_used += n ;
324
- size -= n ;
325
- /* if there is no more free space in the buffer to receive new message header... */
326
- if (shub -> in_buffer_used + sizeof (ShubMessageHdr ) > buffer_size ) {
327
310
328
- /* ... then send buffer to the server */
329
- while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
330
- shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
331
- reconnect (shub );
332
- }
333
- hdr = NULL ; /* message is partly sent to the server: can not skip it any more */
334
- shub -> in_buffer_used = 0 ;
311
+ if (shub -> in_buffer_used + sizeof (ShubMessageHdr ) + size > buffer_size ) {
312
+ /* message doesn't completely fit in buffer */
313
+ if (shub -> in_buffer_used != 0 ) { /* if buffer is not empty...*/
314
+ /* ... then send it */
315
+ while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
316
+ shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
317
+ reconnect (shub );
318
+ }
319
+ /* move received message header to the beginning of the buffer */
320
+ memcpy (shub -> in_buffer , shub -> in_buffer + shub -> in_buffer_used , buffer_size - shub -> in_buffer_used );
321
+ shub -> in_buffer_used = 0 ;
322
+ }
323
+ }
324
+ shub -> in_buffer_used += sizeof (ShubMessageHdr ) + size - (pos - available );
325
+ size = pos - available ;
326
+
327
+ do {
328
+ unsigned int n = size + shub -> in_buffer_used > buffer_size ? buffer_size - shub -> in_buffer_used : size ;
329
+ /* fetch rest of message body */
330
+ if (chan >= 0 && !read_socket (chan , shub -> in_buffer + shub -> in_buffer_used , n )) {
331
+ shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
332
+ close_socket (shub , chan );
333
+ if (hdr != NULL ) { /* if message header is not yet sent to the server... */
334
+ /* ... then skip this message */
335
+ shub -> in_buffer_used = (char * )hdr - shub -> in_buffer ;
336
+ break ;
337
+ } else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
338
+ chan = -1 ; /* do not try to read rest of body of this message */
339
+ }
340
+ }
341
+ shub -> in_buffer_used += n ;
342
+ size -= n ;
343
+ /* if there is no more free space in the buffer to receive new message header... */
344
+ if (shub -> in_buffer_used + sizeof (ShubMessageHdr ) > buffer_size ) {
345
+ /* ... then send buffer to the server */
346
+ while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
347
+ shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
348
+ reconnect (shub );
349
+ }
350
+ hdr = NULL ; /* message is partly sent to the server: can not skip it any more */
351
+ shub -> in_buffer_used = 0 ;
352
+ }
353
+ } while (size != 0 ); /* repeat until all message body is received */
354
+
355
+ pos = available ;
356
+ break ;
335
357
}
336
- } while (size != 0 ); /* repeat until all message body is received */
337
- }
358
+ if (chan >= 0 && pos != available ) { /* partly fetched message header */
359
+ if (shub -> in_buffer_used + sizeof (ShubMessageHdr ) > buffer_size ) {
360
+ /* message doesn't completely fit in buffer */
361
+ while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
362
+ shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
363
+ reconnect (shub );
364
+ }
365
+ /* move received message header to the beginning of the buffer */
366
+ memcpy (shub -> in_buffer , shub -> in_buffer + shub -> in_buffer_used , available - pos );
367
+ shub -> in_buffer_used = 0 ;
368
+ }
369
+ available -= pos ;
370
+ continue ;
371
+ }
372
+ }
373
+ break ;
374
+ }
338
375
}
339
376
}
340
377
}
341
- } else { /* timeout expired */
342
- if (shub -> in_buffer_used != 0 ) { /* if buffer is not empty... */
343
- /* ...then send it */
344
- while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
345
- shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
346
- reconnect (shub );
347
- }
348
- shub -> in_buffer_used = 0 ;
349
- }
378
+ if (shub -> params -> delay != 0 ) {
379
+ continue ;
380
+ }
381
+ }
382
+ if (shub -> in_buffer_used != 0 ) { /* if buffer is not empty... */
383
+ /* ...then send it */
384
+ #if SHOW_SENT_STATISTIC
385
+ static size_t total_sent ;
386
+ static size_t total_count ;
387
+ total_sent += shub -> in_buffer_used ;
388
+ if (++ total_count % 1024 == 0 ) {
389
+ printf ("Average sent buffer size: %ld\n" , total_sent /total_count );
390
+ }
391
+ #endif
392
+ while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
393
+ shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
394
+ reconnect (shub );
395
+ }
396
+ shub -> in_buffer_used = 0 ;
350
397
}
351
398
}
352
399
}
0 commit comments