14
14
#include <unistd.h>
15
15
#include <string.h>
16
16
#include <errno.h>
17
+ #include <assert.h>
17
18
18
19
#include "sockhub.h"
19
20
@@ -151,6 +152,17 @@ static void reconnect(Shub* shub)
151
152
}
152
153
}
153
154
155
+ static void notify_disconnect (Shub * shub , int chan )
156
+ {
157
+ ShubMessageHdr * hdr ;
158
+ assert (shub -> in_buffer_used + sizeof (ShubMessageHdr ) < shub -> params -> buffer_size );
159
+ hdr = (ShubMessageHdr * )& shub -> in_buffer [shub -> in_buffer_used ];
160
+ hdr -> size = 0 ;
161
+ hdr -> chan = chan ;
162
+ hdr -> code = MSG_DISCONNECT ;
163
+ shub -> in_buffer_used += sizeof (ShubMessageHdr );
164
+ }
165
+
154
166
static void recovery (Shub * shub )
155
167
{
156
168
int i , max_fd ;
@@ -162,6 +174,9 @@ static void recovery(Shub* shub)
162
174
FD_ZERO (& tryset );
163
175
FD_SET (i , & tryset );
164
176
if (select (i + 1 , & tryset , NULL , NULL , & tm ) < 0 ) {
177
+ if (i != shub -> input && i != shub -> output ) {
178
+ notify_disconnect (shub , i );
179
+ }
165
180
close_socket (shub , i );
166
181
}
167
182
}
@@ -259,6 +274,7 @@ void ShubLoop(Shub* shub)
259
274
if (!write_socket (chan , (char * )hdr , n )) {
260
275
shub -> params -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
261
276
close_socket (shub , chan );
277
+ notify_disconnect (shub , chan );
262
278
chan = -1 ;
263
279
}
264
280
if (n != hdr -> size + sizeof (ShubMessageHdr )) {
@@ -274,6 +290,7 @@ void ShubLoop(Shub* shub)
274
290
if (chan >= 0 && !write_socket (chan , shub -> out_buffer , n )) {
275
291
shub -> params -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
276
292
close_socket (shub , chan );
293
+ notify_disconnect (shub , chan );
277
294
chan = -1 ;
278
295
}
279
296
tail -= n ;
@@ -295,6 +312,7 @@ void ShubLoop(Shub* shub)
295
312
if (available < sizeof (ShubMessageHdr )) {
296
313
shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
297
314
close_socket (shub , i );
315
+ notify_disconnect (shub , i );
298
316
} else {
299
317
int pos = 0 ;
300
318
/* loop through all fetched messages */
@@ -333,6 +351,7 @@ void ShubLoop(Shub* shub)
333
351
if (hdr != NULL ) { /* if message header is not yet sent to the server... */
334
352
/* ... then skip this message */
335
353
shub -> in_buffer_used = (char * )hdr - shub -> in_buffer ;
354
+ notify_disconnect (shub , chan );
336
355
break ;
337
356
} else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
338
357
chan = -1 ; /* do not try to read rest of body of this message */
@@ -351,6 +370,10 @@ void ShubLoop(Shub* shub)
351
370
shub -> in_buffer_used = 0 ;
352
371
}
353
372
} while (size != 0 ); /* repeat until all message body is received */
373
+
374
+ if (chan < 0 ) {
375
+ notify_disconnect (shub , i );
376
+ }
354
377
355
378
pos = available ;
356
379
break ;
0 commit comments