22
22
#define SOCKHUB_BUFFER_SIZE (1024*1024)
23
23
#define ERR_BUF_SIZE 1024
24
24
25
+ #define SHUB_TRACE (fmt , ...)
26
+ /* #define SHUB_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__) */
27
+
25
28
void ShubAddSocket (Shub * shub , int fd );
26
29
27
30
inline void ShubAddSocket (Shub * shub , int fd )
@@ -63,21 +66,23 @@ void ShubInitParams(ShubParams* params)
63
66
params -> leader = NULL ;
64
67
}
65
68
66
- void ShubParamsSetHosts (ShubParams * params , char * hoststring )
69
+ int ShubParamsSetHosts (ShubParams * params , char * hoststring )
67
70
{
68
71
char * hstate , * pstate ;
69
72
char * hostport , * host , * portstr ;
70
73
int port ;
74
+ int ok = 1 ;
71
75
72
76
char * hosts = strdup (hoststring );
73
- fprintf ( stderr , "sockhub parsing hosts = '%s'\n" , hosts );
77
+ SHUB_TRACE ( "sockhub parsing hosts = '%s'\n" , hosts );
74
78
hostport = strtok_r (hosts , "," , & hstate );
75
79
76
80
while (hostport ) {
77
- fprintf ( stderr , "hostport = '%s'\n" , hostport );
81
+ SHUB_TRACE ( "hostport = '%s'\n" , hostport );
78
82
host = strtok_r (hostport , ":" , & pstate );
79
83
if (!host ) {
80
- fprintf (stderr , "wrong host in host list\n" );
84
+ SHUB_TRACE ("wrong host in host list\n" );
85
+ ok = 0 ;
81
86
break ;
82
87
}
83
88
@@ -88,20 +93,20 @@ void ShubParamsSetHosts(ShubParams* params, char* hoststring)
88
93
port = 5431 ;
89
94
}
90
95
91
- fprintf ( stderr , "adding host %s:%d\n" , host , port );
96
+ SHUB_TRACE ( "adding host %s:%d\n" , host , port );
92
97
host_t * h = malloc (sizeof (host_t ));
93
98
h -> host = strdup (host );
94
99
h -> port = port ;
95
100
if (params -> leader ) {
96
- // update pointers from
101
+ /* update pointers from */
97
102
h -> prev = params -> leader -> prev ;
98
103
h -> next = params -> leader ;
99
104
100
- // update pointers to
105
+ /* update pointers to */
101
106
h -> prev -> next = h ;
102
107
h -> next -> prev = h ;
103
108
} else {
104
- // the list is empty
109
+ /* the list is empty */
105
110
params -> leader = h ;
106
111
h -> prev = h ;
107
112
h -> next = h ;
@@ -111,6 +116,7 @@ void ShubParamsSetHosts(ShubParams* params, char* hoststring)
111
116
}
112
117
113
118
free (hosts );
119
+ return ok ;
114
120
}
115
121
116
122
static int resolve_host_by_name (const char * hostname , unsigned * addrs , unsigned * n_addrs )
@@ -202,20 +208,20 @@ static void reconnect(Shub* shub)
202
208
char * host = shub -> params -> leader -> host ;
203
209
int port = shub -> params -> leader -> port ;
204
210
205
- fprintf ( stderr , "shub leader = %s:%d\n" , host , port );
211
+ SHUB_TRACE ( "shub leader = %s:%d\n" , host , port );
206
212
207
213
shub -> params -> leader = shub -> params -> leader -> next ;
208
214
209
215
ShubErrorSeverity severity = SHUB_RECOVERABLE_ERROR ;
210
216
sock_inet .sin_port = htons (port );
211
217
212
218
if (!resolve_host_by_name (host , addrs , & n_addrs )) {
213
- shub -> params -> error_handler ("Failed to resolve host by name" , severity );
219
+ shub -> params -> error_handler ("Sockhub failed to resolve host by name" , severity );
214
220
continue ;
215
221
}
216
222
shub -> output = socket (AF_INET , SOCK_STREAM , 0 );
217
223
if (shub -> output < 0 ) {
218
- shub -> params -> error_handler ("Failed to create inet socket" , severity );
224
+ shub -> params -> error_handler ("Sockhub failed to create inet socket" , severity );
219
225
continue ;
220
226
}
221
227
while (1 ) {
@@ -231,16 +237,16 @@ static void reconnect(Shub* shub)
231
237
}
232
238
}
233
239
if (rc < 0 ) {
234
- if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS ) {
235
- shub -> params -> error_handler ( "Connection can not be establish" , severity ) ;
236
- continue ;
237
- }
238
- if ( max_attempts -- != 0 ) {
239
- sleep ( 1 );
240
- } else {
241
- shub -> params -> error_handler ( "Failed to connect to host" , severity );
242
- continue ;
243
- }
240
+ if (( errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS ) || max_attempts == 0 ) {
241
+ char buf [ ERR_BUF_SIZE ] ;
242
+ sprintf ( buf , "Sockhub failed to connect to %s:%d: %d" , host , port , errno ) ;
243
+ shub -> params -> error_handler ( buf , severity );
244
+ max_attempts = shub -> params -> max_attempts ;
245
+ } else {
246
+ max_attempts -= 1 ;
247
+ sleep ( 1 );
248
+ }
249
+ continue ;
244
250
} else {
245
251
int optval = 1 ;
246
252
setsockopt (shub -> output , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (optval ));
@@ -493,7 +499,6 @@ void ShubLoop(Shub* shub)
493
499
char buf [ERR_BUF_SIZE ];
494
500
sprintf (buf , "Failed to read local socket chan=%d, rc=%d, min requested=%ld, max requested=%d, errno=%d" , chan , rc , sizeof (ShubMessageHdr ) - available , buffer_size - pos - available , errno );
495
501
shub -> params -> error_handler (buf , SHUB_RECOVERABLE_ERROR );
496
- //shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
497
502
close_socket (shub , i );
498
503
shub -> in_buffer_used = pos ;
499
504
notify_disconnect (shub , i );
@@ -532,7 +537,6 @@ void ShubLoop(Shub* shub)
532
537
char buf [ERR_BUF_SIZE ];
533
538
sprintf (buf , "Failed to read local socket rc=%d, len=%d, errno=%d" , rc , n , errno );
534
539
shub -> params -> error_handler (buf , SHUB_RECOVERABLE_ERROR );
535
- //shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
536
540
close_socket (shub , chan );
537
541
if (hdr != NULL ) { /* if message header is not yet sent to the server... */
538
542
/* ... then skip this message */
@@ -607,4 +611,3 @@ void ShubLoop(Shub* shub)
607
611
}
608
612
}
609
613
610
- // vim: sts=4 ts=4 sw=4 expandtab
0 commit comments