@@ -28,7 +28,7 @@ static void default_error_handler(char const* msg, ShubErrorSeverity severity)
28
28
29
29
void ShubInitParams (ShubParams * params )
30
30
{
31
- memset (params , 0 , sizeof params );
31
+ memset (params , 0 , sizeof ( * params ) );
32
32
params -> buffer_size = 64 * 1025 ;
33
33
params -> port = 54321 ;
34
34
params -> queue_size = 100 ;
@@ -65,22 +65,14 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
65
65
66
66
static void close_socket (Shub * shub , int fd )
67
67
{
68
- int i , max_fd ;
69
- fd_set copy ;
70
- FD_ZERO (& copy );
71
68
close (fd );
72
- for (i = 0 , max_fd = shub -> max_fd ; i <= max_fd ; i ++ ) {
73
- if (i != fd && FD_ISSET (i , & shub -> inset )) {
74
- FD_SET (i , & copy );
75
- }
76
- }
77
- FD_COPY (& copy , & shub -> inset );
69
+ FD_CLR (fd , & shub -> inset );
78
70
}
79
71
80
72
static int read_socket (int sd , char * buf , int size )
81
73
{
82
74
while (size != 0 ) {
83
- int n = recv (sd , buf , size , 0 );
75
+ int n = recv (sd , buf , size , 0 );
84
76
if (n <= 0 ) {
85
77
return 0 ;
86
78
}
@@ -159,27 +151,26 @@ static void reconnect(Shub* shub)
159
151
static void recovery (Shub * shub )
160
152
{
161
153
int i , max_fd ;
162
- fd_set okset ;
163
- fd_set tryset ;
164
154
165
155
for (i = 0 , max_fd = shub -> max_fd ; i <= max_fd ; i ++ ) {
166
156
if (FD_ISSET (i , & shub -> inset )) {
167
157
struct timeval tm = {0 ,0 };
158
+ fd_set tryset ;
168
159
FD_ZERO (& tryset );
169
160
FD_SET (i , & tryset );
170
- if (select (i + 1 , & tryset , NULL , NULL , & tm ) >= 0 ) {
171
- FD_SET ( i , & okset );
161
+ if (select (i + 1 , & tryset , NULL , NULL , & tm ) < 0 ) {
162
+ close_socket ( shub , i );
172
163
}
173
164
}
174
165
}
175
- FD_COPY (& okset , & shub -> inset );
176
166
}
177
167
178
168
void ShubInitialize (Shub * shub , ShubParams * params )
179
169
{
180
170
struct sockaddr sock ;
181
171
182
172
shub -> params = params ;
173
+
183
174
sock .sa_family = AF_UNIX ;
184
175
strcpy (sock .sa_data , params -> file );
185
176
unlink (params -> file );
@@ -203,6 +194,9 @@ void ShubInitialize(Shub* shub, ShubParams* params)
203
194
if (shub -> in_buffer == NULL || shub -> out_buffer == NULL ) {
204
195
shub -> params -> error_handler ("Failed to allocate buffer" , SHUB_FATAL_ERROR );
205
196
}
197
+ shub -> in_buffer_used = 0 ;
198
+ shub -> out_buffer_used = 0 ;
199
+ shub -> max_fd = -1 ;
206
200
}
207
201
208
202
@@ -219,8 +213,7 @@ void ShubLoop(Shub* shub)
219
213
tm .tv_sec = shub -> params -> delay /1000 ;
220
214
tm .tv_usec = shub -> params -> delay % 1000 * 1000 ;
221
215
222
-
223
- FD_COPY (& shub -> inset , & events );
216
+ events = shub -> inset ;
224
217
rc = select (shub -> max_fd + 1 , & events , NULL , NULL , shub -> in_buffer_used == 0 ? NULL : & tm );
225
218
if (rc < 0 ) {
226
219
if (errno != EINTR ) {
@@ -250,15 +243,16 @@ void ShubLoop(Shub* shub)
250
243
}
251
244
shub -> out_buffer_used += available ;
252
245
while (pos + sizeof (ShubMessageHdr ) <= shub -> out_buffer_used ) {
253
- ShubMessageHdr * hdr = (ShubMessageHdr * )shub -> out_buffer ;
246
+ ShubMessageHdr * hdr = (ShubMessageHdr * )( shub -> out_buffer + pos ) ;
254
247
int chan = hdr -> chan ;
248
+ n = pos + sizeof (ShubMessageHdr ) + hdr -> size <= shub -> out_buffer_used ? hdr -> size + sizeof (ShubMessageHdr ) : shub -> out_buffer_used - pos ;
255
249
pos += sizeof (ShubMessageHdr );
256
- n = pos + hdr -> size <= shub -> out_buffer_used ? hdr -> size + sizeof (ShubMessageHdr ) : shub -> out_buffer_used - pos ;
257
250
if (!write_socket (chan , (char * )hdr , n )) {
258
251
shub -> params -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
259
252
close_socket (shub , chan );
260
253
chan = -1 ;
261
254
}
255
+ /* read rest of message if it doesn't fit in buffer */
262
256
if (n != hdr -> size + sizeof (ShubMessageHdr )) {
263
257
int tail = hdr -> size + sizeof (ShubMessageHdr ) - n ;
264
258
do {
@@ -275,6 +269,7 @@ void ShubLoop(Shub* shub)
275
269
}
276
270
tail -= n ;
277
271
} while (tail != 0 );
272
+ pos = ;
278
273
}
279
274
}
280
275
memcpy (shub -> out_buffer , shub -> out_buffer + pos , shub -> out_buffer_used - pos );
0 commit comments