@@ -125,13 +125,11 @@ static void reconnect(Shub* shub)
125
125
} while (rc < 0 && errno == EINTR );
126
126
127
127
if (rc >= 0 || errno == EINPROGRESS ) {
128
- if (rc >= 0 ) {
129
- }
130
128
break ;
131
129
}
132
130
}
133
131
if (rc < 0 ) {
134
- if (errno != ENOENT && errno != ECONNREFUSED ) {
132
+ if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS ) {
135
133
shub -> params -> error_handler ("Connection can not be establish" , SHUB_FATAL_ERROR );
136
134
}
137
135
if (max_attempts -- != 0 ) {
@@ -187,6 +185,7 @@ void ShubInitialize(Shub* shub, ShubParams* params)
187
185
FD_ZERO (& shub -> inset );
188
186
FD_SET (shub -> input , & shub -> inset );
189
187
188
+ shub -> output = -1 ;
190
189
reconnect (shub );
191
190
192
191
shub -> in_buffer = malloc (params -> buffer_size );
@@ -207,52 +206,58 @@ void ShubLoop(Shub* shub)
207
206
while (1 ) {
208
207
fd_set events ;
209
208
struct timeval tm ;
210
- int i , max_fd , rc ;
211
- unsigned int n , size ;
209
+ int i , rc ;
210
+ int max_fd = shub -> max_fd ;
212
211
213
212
tm .tv_sec = shub -> params -> delay /1000 ;
214
213
tm .tv_usec = shub -> params -> delay % 1000 * 1000 ;
215
214
216
215
events = shub -> inset ;
217
- rc = select (shub -> max_fd + 1 , & events , NULL , NULL , shub -> in_buffer_used == 0 ? NULL : & tm );
216
+ rc = select (max_fd + 1 , & events , NULL , NULL , shub -> in_buffer_used == 0 ? NULL : & tm );
218
217
if (rc < 0 ) {
219
218
if (errno != EINTR ) {
220
219
shub -> params -> error_handler ("Select failed" , SHUB_RECOVERABLE_ERROR );
221
220
recovery (shub );
222
221
}
223
222
} else {
224
223
if (rc > 0 ) {
225
- for (i = 0 , max_fd = shub -> max_fd ; i <= max_fd ; i ++ ) {
224
+ for (i = 0 ; i <= max_fd ; i ++ ) {
226
225
if (FD_ISSET (i , & events )) {
227
- if (i == shub -> input ) {
226
+ if (i == shub -> input ) { /* accept incomming connection */
228
227
int s = accept (i , NULL , NULL );
229
228
if (s < 0 ) {
230
229
shub -> params -> error_handler ("Failed to accept socket" , SHUB_RECOVERABLE_ERROR );
231
230
} else {
232
- if (s > max_fd ) {
231
+ if (s > shub -> max_fd ) {
233
232
shub -> max_fd = s ;
234
233
}
235
234
FD_SET (s , & shub -> inset );
236
235
}
237
- } else if (i == shub -> output ) {
236
+ } else if (i == shub -> output ) { /* receive response from server */
237
+ /* try to read as much as possible */
238
238
int available = recv (shub -> output , shub -> out_buffer + shub -> out_buffer_used , buffer_size - shub -> out_buffer_used , 0 );
239
239
int pos = 0 ;
240
240
if (available <= 0 ) {
241
241
shub -> params -> error_handler ("Failed to read inet socket" , SHUB_RECOVERABLE_ERROR );
242
242
reconnect (shub );
243
+ continue ;
243
244
}
244
245
shub -> out_buffer_used += available ;
246
+
247
+ /* loop through all received responses */
245
248
while (pos + sizeof (ShubMessageHdr ) <= shub -> out_buffer_used ) {
246
- ShubMessageHdr * hdr = (ShubMessageHdr * )( shub -> out_buffer + pos ) ;
249
+ ShubMessageHdr * hdr = (ShubMessageHdr * )& shub -> out_buffer [ pos ] ;
247
250
int chan = hdr -> chan ;
248
- n = pos + sizeof (ShubMessageHdr ) + hdr -> size <= shub -> out_buffer_used ? hdr -> size + sizeof (ShubMessageHdr ) : shub -> out_buffer_used - pos ;
251
+ unsigned int n = pos + sizeof (ShubMessageHdr ) + hdr -> size <= shub -> out_buffer_used
252
+ ? hdr -> size + sizeof (ShubMessageHdr )
253
+ : shub -> out_buffer_used - pos ;
249
254
if (!write_socket (chan , (char * )hdr , n )) {
250
255
shub -> params -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
251
256
close_socket (shub , chan );
252
257
chan = -1 ;
253
258
}
254
- /* read rest of message if it doesn't fit in buffer */
255
259
if (n != hdr -> size + sizeof (ShubMessageHdr )) {
260
+ /* read rest of message if it doesn't fit in the buffer */
256
261
int tail = hdr -> size + sizeof (ShubMessageHdr ) - n ;
257
262
do {
258
263
n = tail < buffer_size ? tail : buffer_size ;
@@ -274,56 +279,73 @@ void ShubLoop(Shub* shub)
274
279
}
275
280
pos += n ;
276
281
}
282
+ /* Move partly fetched message header (if any) to the beginning of byffer */
277
283
memcpy (shub -> out_buffer , shub -> out_buffer + pos , shub -> out_buffer_used - pos );
278
284
shub -> out_buffer_used -= pos ;
279
- } else {
285
+ } else { /* receive request from client */
280
286
ShubMessageHdr * hdr = (ShubMessageHdr * )& shub -> in_buffer [shub -> in_buffer_used ];
281
- if (!read_socket (i , (char * )hdr , sizeof (ShubMessageHdr ))) {
287
+ int chan = i ;
288
+ if (!read_socket (chan , (char * )hdr , sizeof (ShubMessageHdr ))) { /* fetch message header */
282
289
shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
283
290
close_socket (shub , i );
284
291
} else {
285
- size = hdr -> size ;
286
- hdr -> chan = i ;
292
+ unsigned int size = hdr -> size ;
293
+ hdr -> chan = chan ; /* remember socket descriptor from which this message was read */
287
294
if (size + shub -> in_buffer_used + sizeof (ShubMessageHdr ) > buffer_size ) {
288
- if (shub -> in_buffer_used != 0 ) {
295
+ /* message doesn't completely fit in buffer */
296
+ if (shub -> in_buffer_used != 0 ) { /* if buffer is not empty...*/
297
+ /* ... then send it */
289
298
while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
290
299
shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
291
300
reconnect (shub );
292
301
}
302
+ /* move received message header to the beginning of the buffer */
293
303
memcpy (shub -> in_buffer , shub -> in_buffer + shub -> in_buffer_used , sizeof (ShubMessageHdr ));
294
304
shub -> in_buffer_used = 0 ;
295
305
}
296
306
}
297
307
shub -> in_buffer_used += sizeof (ShubMessageHdr );
298
308
299
- while ( 1 ) {
309
+ do {
300
310
unsigned int n = size + shub -> in_buffer_used > buffer_size ? buffer_size - shub -> in_buffer_used : size ;
301
- if (!read_socket (i , shub -> in_buffer + shub -> in_buffer_used , n )) {
311
+ /* fetch message body */
312
+ if (chan >= 0 && !read_socket (chan , shub -> in_buffer + shub -> in_buffer_used , n )) {
302
313
shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
303
- close_socket (shub , i );
304
- break ;
305
- } else {
306
- if (n != size ) {
307
- while (!write_socket (shub -> output , shub -> in_buffer , n )) {
308
- shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
309
- reconnect (shub );
310
- }
311
- size -= n ;
312
- shub -> in_buffer_used = 0 ;
313
- } else {
314
- shub -> in_buffer_used += n ;
314
+ close_socket (shub , chan );
315
+ if (hdr != NULL ) { /* if message header is not yet sent to the server... */
316
+ /* ... then skip this message */
317
+ shub -> in_buffer_used = (char * )hdr - shub -> in_buffer ;
315
318
break ;
319
+ } else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
320
+ chan = -1 ; /* do not try to read rest of body of this message */
316
321
}
322
+ }
323
+ shub -> in_buffer_used += n ;
324
+ size -= n ;
325
+ /* if there is no more free space in the buffer to receive new message header... */
326
+ if (shub -> in_buffer_used + sizeof (ShubMessageHdr ) > buffer_size ) {
327
+
328
+ /* ... then send buffer to the server */
329
+ while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
330
+ shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
331
+ reconnect (shub );
332
+ }
333
+ hdr = NULL ; /* message is partly sent to the server: can not skip it any more */
334
+ shub -> in_buffer_used = 0 ;
317
335
}
318
- }
336
+ } while ( size != 0 ); /* repeat until all message body is received */
319
337
}
320
338
}
321
339
}
322
340
}
323
- } else if (shub -> in_buffer_used != 0 ) {
324
- while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
325
- shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
326
- reconnect (shub );
341
+ } else { /* timeout expired */
342
+ if (shub -> in_buffer_used != 0 ) { /* if buffer is not empty... */
343
+ /* ...then send it */
344
+ while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
345
+ shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
346
+ reconnect (shub );
347
+ }
348
+ shub -> in_buffer_used = 0 ;
327
349
}
328
350
}
329
351
}
0 commit comments