15
15
#include <string.h>
16
16
#include <errno.h>
17
17
18
+ #include "sockhub.h"
19
+
18
20
static void default_error_handler (char const * msg , ShubErrorSeverity severity )
19
21
{
20
22
perror (msg );
@@ -24,7 +26,7 @@ static void default_error_handler(char const* msg, ShubErrorSeverity severity)
24
26
}
25
27
26
28
27
- void ShubInitPrams (ShubParams * params )
29
+ void ShubInitParams (ShubParams * params )
28
30
{
29
31
memset (params , 0 , sizeof params );
30
32
params -> buffer_size = 64 * 1025 ;
@@ -107,27 +109,27 @@ static void reconnect(Shub* shub)
107
109
struct sockaddr_in sock_inet ;
108
110
unsigned addrs [128 ];
109
111
unsigned i , n_addrs = sizeof (addrs ) / sizeof (addrs [0 ]);
110
- int max_attemtps = shub -> params -> max_attempts ;
112
+ int max_attempts = shub -> params -> max_attempts ;
111
113
112
114
if (shub -> output >= 0 ) {
113
115
close_socket (shub , shub -> output );
114
116
}
115
117
116
118
sock_inet .sin_family = AF_INET ;
117
- sock_inet .sin_port = htons (port );
118
- if (!resolve_host_by_name (host , addrs , & n_addrs )) {
119
- shub -> error_handler ("Failed to resolve host by name" , SHUB_FATAL_ERROR );
119
+ sock_inet .sin_port = htons (shub -> params -> port );
120
+ if (!resolve_host_by_name (shub -> params -> host , addrs , & n_addrs )) {
121
+ shub -> params -> error_handler ("Failed to resolve host by name" , SHUB_FATAL_ERROR );
120
122
}
121
123
shub -> output = socket (AF_INET , SOCK_STREAM , 0 );
122
124
if (shub -> output < 0 ) {
123
- shub -> error_handler ("Failed to create inet socket" , SHUB_FATAL_ERROR );
125
+ shub -> params -> error_handler ("Failed to create inet socket" , SHUB_FATAL_ERROR );
124
126
}
125
127
while (1 ) {
126
128
int rc = -1 ;
127
129
for (i = 0 ; i < n_addrs ; ++ i ) {
128
130
memcpy (& sock_inet .sin_addr , & addrs [i ], sizeof sock_inet .sin_addr );
129
131
do {
130
- rc = connect (output , (struct sockaddr * )& sock_inet , sizeof (sock_inet ));
132
+ rc = connect (shub -> output , (struct sockaddr * )& sock_inet , sizeof (sock_inet ));
131
133
} while (rc < 0 && errno == EINTR );
132
134
133
135
if (rc >= 0 || errno == EINPROGRESS ) {
@@ -138,17 +140,17 @@ static void reconnect(Shub* shub)
138
140
}
139
141
if (rc < 0 ) {
140
142
if (errno != ENOENT && errno != ECONNREFUSED ) {
141
- shub -> error_handler ("Connection can not be establish" , SHUB_FATAL_ERROR );
143
+ shub -> params -> error_handler ("Connection can not be establish" , SHUB_FATAL_ERROR );
142
144
}
143
145
if (max_attempts -- != 0 ) {
144
146
sleep (1 );
145
147
} else {
146
- shub -> error_handler ("Failed to connect to host" , SHUB_FATAL_ERROR );
148
+ shub -> params -> error_handler ("Failed to connect to host" , SHUB_FATAL_ERROR );
147
149
}
148
150
} else {
149
151
int optval = 1 ;
150
152
setsockopt (shub -> output , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (int ));
151
- FD_SET (shub -> output , & inset );
153
+ FD_SET (shub -> output , & shub -> inset );
152
154
break ;
153
155
}
154
156
}
@@ -161,7 +163,7 @@ static void recovery(Shub* shub)
161
163
fd_set tryset ;
162
164
163
165
for (i = 0 , max_fd = shub -> max_fd ; i <= max_fd ; i ++ ) {
164
- if (FD_ISSET (i , & inset )) {
166
+ if (FD_ISSET (i , & shub -> inset )) {
165
167
struct timeval tm = {0 ,0 };
166
168
FD_ZERO (& tryset );
167
169
FD_SET (i , & tryset );
@@ -173,7 +175,7 @@ static void recovery(Shub* shub)
173
175
FD_COPY (& okset , & shub -> inset );
174
176
}
175
177
176
- void ShubIntialize (Shub * shub , ShubParams * params )
178
+ void ShubInitialize (Shub * shub , ShubParams * params )
177
179
{
178
180
struct sockaddr sock ;
179
181
@@ -183,13 +185,13 @@ void ShubIntialize(Shub* shub, ShubParams* params)
183
185
unlink (params -> file );
184
186
shub -> input = socket (AF_UNIX , SOCK_STREAM , 0 );
185
187
if (shub -> input < 0 ) {
186
- shub -> error_handler ("Failed to create local socket" , SHUB_FATAL_ERROR );
188
+ shub -> params -> error_handler ("Failed to create local socket" , SHUB_FATAL_ERROR );
187
189
}
188
190
if (bind (shub -> input , & sock , ((char * )sock .sa_data - (char * )& sock ) + strlen (params -> file )) < 0 ) {
189
- shub -> error_handler ("Failed to bind local socket" , SHUB_FATAL_ERROR );
191
+ shub -> params -> error_handler ("Failed to bind local socket" , SHUB_FATAL_ERROR );
190
192
}
191
193
if (listen (shub -> input , params -> queue_size ) < 0 ) {
192
- shub -> error_handler ("Failed to listen local socket" );
194
+ shub -> params -> error_handler ("Failed to listen local socket" , SHUB_FATAL_ERROR );
193
195
}
194
196
FD_ZERO (& shub -> inset );
195
197
FD_SET (shub -> input , & shub -> inset );
@@ -199,7 +201,7 @@ void ShubIntialize(Shub* shub, ShubParams* params)
199
201
shub -> in_buffer = malloc (params -> buffer_size );
200
202
shub -> out_buffer = malloc (params -> buffer_size );
201
203
if (shub -> in_buffer == NULL || shub -> out_buffer == NULL ) {
202
- shub -> error_handler ("Failed to allocate buffer" , SHUB_FATAL_ERROR );
204
+ shub -> params -> error_handler ("Failed to allocate buffer" , SHUB_FATAL_ERROR );
203
205
}
204
206
}
205
207
@@ -214,15 +216,15 @@ void ShubLoop(Shub* shub)
214
216
int i , max_fd , rc ;
215
217
unsigned int n , size ;
216
218
217
- tm .tv_sec = delay /1000 ;
218
- tm .tv_usec = delay % 1000 * 1000 ;
219
+ tm .tv_sec = shub -> params -> delay /1000 ;
220
+ tm .tv_usec = shub -> params -> delay % 1000 * 1000 ;
219
221
220
222
221
223
FD_COPY (& shub -> inset , & events );
222
224
rc = select (shub -> max_fd + 1 , & events , NULL , NULL , shub -> in_buffer_used == 0 ? NULL : & tm );
223
225
if (rc < 0 ) {
224
226
if (errno != EINTR ) {
225
- shub -> error_handler ("Select failed" , SHUB_RECOVERABLE_ERROR );
227
+ shub -> params -> error_handler ("Select failed" , SHUB_RECOVERABLE_ERROR );
226
228
recovery (shub );
227
229
}
228
230
} else {
@@ -232,7 +234,7 @@ void ShubLoop(Shub* shub)
232
234
if (i == shub -> input ) {
233
235
int s = accept (i , NULL , NULL );
234
236
if (s < 0 ) {
235
- shub -> error_handler ("Failed to accept socket" , SHUB_RECOVERABLE_ERROR );
237
+ shub -> params -> error_handler ("Failed to accept socket" , SHUB_RECOVERABLE_ERROR );
236
238
} else {
237
239
if (s > max_fd ) {
238
240
shub -> max_fd = s ;
@@ -243,7 +245,7 @@ void ShubLoop(Shub* shub)
243
245
int available = recv (shub -> output , shub -> out_buffer + shub -> out_buffer_used , buffer_size - shub -> out_buffer_used , 0 );
244
246
int pos = 0 ;
245
247
if (available <= 0 ) {
246
- pshub -> error_handler ("Failed to read inet socket" , SHUB_RECOVERABLE_ERROR );
248
+ shub -> params -> error_handler ("Failed to read inet socket" , SHUB_RECOVERABLE_ERROR );
247
249
reconnect (shub );
248
250
}
249
251
shub -> out_buffer_used += available ;
@@ -252,22 +254,22 @@ void ShubLoop(Shub* shub)
252
254
int chan = hdr -> chan ;
253
255
pos += sizeof (ShubMessageHdr );
254
256
n = pos + hdr -> size <= shub -> out_buffer_used ? hdr -> size + sizeof (ShubMessageHdr ) : shub -> out_buffer_used - pos ;
255
- if (!write_socket (chan , hdr , n )) {
256
- shub -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
257
+ if (!write_socket (chan , ( char * ) hdr , n )) {
258
+ shub -> params -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
257
259
close_socket (shub , chan );
258
260
chan = -1 ;
259
261
}
260
262
if (n != hdr -> size + sizeof (ShubMessageHdr )) {
261
263
int tail = hdr -> size + sizeof (ShubMessageHdr ) - n ;
262
264
do {
263
- n = tail < shub -> out_buffer_size ? tail : shub -> out_buffer_size ;
264
- if (!read_socket (output , out_buffer , n )) {
265
- shub -> error_handler ("Failed to read inet socket" , SHUB_RECOVERABLE_ERROR );
265
+ n = tail < buffer_size ? tail : buffer_size ;
266
+ if (!read_socket (shub -> output , shub -> out_buffer , n )) {
267
+ shub -> params -> error_handler ("Failed to read inet socket" , SHUB_RECOVERABLE_ERROR );
266
268
reconnect (shub );
267
269
continue ;
268
270
}
269
- if (chan >= 0 && !write_socket (chan , out_buffer , n )) {
270
- shub -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
271
+ if (chan >= 0 && !write_socket (chan , shub -> out_buffer , n )) {
272
+ shub -> params -> error_handler ("Failed to write to local socket" , SHUB_RECOVERABLE_ERROR );
271
273
close_socket (shub , chan );
272
274
chan = -1 ;
273
275
}
@@ -278,17 +280,17 @@ void ShubLoop(Shub* shub)
278
280
memcpy (shub -> out_buffer , shub -> out_buffer + pos , shub -> out_buffer_used - pos );
279
281
shub -> out_buffer_used -= pos ;
280
282
} else {
281
- ShubMessageHdr * hdr = (MessgeHdr * )& shub -> in_buffer [shub -> in_buffer_used ];
282
- if (!read_socket (i , hdr , sizeof (ShubMessageHdr ))) {
283
- shub -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
283
+ ShubMessageHdr * hdr = (ShubMessageHdr * )& shub -> in_buffer [shub -> in_buffer_used ];
284
+ if (!read_socket (i , ( char * ) hdr , sizeof (ShubMessageHdr ))) {
285
+ shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
284
286
close_socket (shub , i );
285
287
} else {
286
- unsigned int size = hdr -> size ;
288
+ size = hdr -> size ;
287
289
hdr -> chan = i ;
288
290
if (size + shub -> in_buffer_used + sizeof (ShubMessageHdr ) > buffer_size ) {
289
291
if (shub -> in_buffer_used != 0 ) {
290
292
while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
291
- shub -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
293
+ shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
292
294
reconnect (shub );
293
295
}
294
296
memcpy (shub -> in_buffer , shub -> in_buffer + shub -> in_buffer_used , sizeof (ShubMessageHdr ));
@@ -300,13 +302,13 @@ void ShubLoop(Shub* shub)
300
302
while (1 ) {
301
303
unsigned int n = size + shub -> in_buffer_used > buffer_size ? buffer_size - shub -> in_buffer_used : size ;
302
304
if (!read_socket (i , shub -> in_buffer + shub -> in_buffer_used , n )) {
303
- shub -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
305
+ shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
304
306
close_socket (shub , i );
305
307
break ;
306
308
} else {
307
309
if (n != size ) {
308
- while (!write_socket (output , in_buffer , n )) {
309
- shub -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
310
+ while (!write_socket (shub -> output , shub -> in_buffer , n )) {
311
+ shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
310
312
reconnect (shub );
311
313
}
312
314
size -= n ;
@@ -323,7 +325,7 @@ void ShubLoop(Shub* shub)
323
325
}
324
326
} else if (shub -> in_buffer_used != 0 ) {
325
327
while (!write_socket (shub -> output , shub -> in_buffer , shub -> in_buffer_used )) {
326
- shub -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
328
+ shub -> params -> error_handler ("Failed to write to inet socket" , SHUB_RECOVERABLE_ERROR );
327
329
reconnect (shub );
328
330
}
329
331
}
0 commit comments