@@ -35,8 +35,58 @@ void ShubInitParams(ShubParams* params)
35
35
params -> queue_size = 100 ;
36
36
params -> max_attempts = 10 ;
37
37
params -> error_handler = default_error_handler ;
38
+ params -> leader = NULL ;
38
39
}
39
40
41
+ void ShubParamsSetHosts (ShubParams * params , char * hoststring )
42
+ {
43
+ char * hstate , * pstate ;
44
+ char * hostport , * host , * portstr ;
45
+ int port ;
46
+
47
+ char * hosts = strdup (hoststring );
48
+ fprintf (stderr , "sockhub parsing hosts = '%s'\n" , hosts );
49
+ hostport = strtok_r (hosts , "," , & hstate );
50
+
51
+ while (hostport ) {
52
+ fprintf (stderr , "hostport = '%s'\n" , hostport );
53
+ host = strtok_r (hostport , ":" , & pstate );
54
+ if (!host ) {
55
+ fprintf (stderr , "wrong host in host list\n" );
56
+ break ;
57
+ }
58
+
59
+ portstr = strtok_r (NULL , ":" , & pstate );
60
+ if (portstr ) {
61
+ port = atoi (portstr );
62
+ } else {
63
+ port = 5431 ;
64
+ }
65
+
66
+ fprintf (stderr , "adding host %s:%d\n" , host , port );
67
+ host_t * h = malloc (sizeof (host_t ));
68
+ h -> host = strdup (host );
69
+ h -> port = port ;
70
+ if (params -> leader ) {
71
+ // update pointers from
72
+ h -> prev = params -> leader -> prev ;
73
+ h -> next = params -> leader ;
74
+
75
+ // update pointers to
76
+ h -> prev -> next = h ;
77
+ h -> next -> prev = h ;
78
+ } else {
79
+ // the list is empty
80
+ params -> leader = h ;
81
+ h -> prev = h ;
82
+ h -> next = h ;
83
+ }
84
+
85
+ hostport = strtok_r (NULL , "," , & hstate );
86
+ }
87
+
88
+ free (hosts );
89
+ }
40
90
41
91
static int resolve_host_by_name (const char * hostname , unsigned * addrs , unsigned * n_addrs )
42
92
{
@@ -103,57 +153,37 @@ int ShubWriteSocket(int sd, void const* buf, int size)
103
153
return 1 ;
104
154
}
105
155
106
-
107
156
static void reconnect (Shub * shub )
108
157
{
109
- static int skip_hosts = 0 ;
110
- printf ("will connect to host #%d\n" , skip_hosts );
111
-
112
158
struct sockaddr_in sock_inet ;
113
159
unsigned addrs [128 ];
114
160
unsigned i , n_addrs = sizeof (addrs ) / sizeof (addrs [0 ]);
115
161
int max_attempts = shub -> params -> max_attempts ;
116
- char * hosts = strdup (shub -> params -> hosts );
117
162
if (shub -> output >= 0 ) {
118
163
close_socket (shub , shub -> output );
119
164
}
120
165
121
166
sock_inet .sin_family = AF_INET ;
122
167
123
- char * hstate , * pstate ;
124
- char * hostport , * host , * portstr ;
125
- int port ;
126
- hostport = strtok_r (hosts , "," , & hstate );
127
- int hosti = 0 ;
128
- while (hostport ) {
129
- ShubErrorSeverity severity = SHUB_RECOVERABLE_ERROR ;
168
+ while (shub -> params -> leader ) {
169
+ char * host = shub -> params -> leader -> host ;
170
+ int port = shub -> params -> leader -> port ;
130
171
131
- if (hosti < skip_hosts ) {
132
- goto trynext ;
133
- }
172
+ fprintf (stderr , "shub leader = %s:%d\n" , host , port );
134
173
135
- host = strtok_r (hostport , ":" , & pstate );
136
- if (!host ) {
137
- severity = SHUB_FATAL_ERROR ;
138
- break ;
139
- }
174
+ shub -> params -> leader = shub -> params -> leader -> next ;
140
175
141
- portstr = strtok_r (NULL , ":" , & pstate );
142
- if (portstr ) {
143
- port = atoi (portstr );
144
- } else {
145
- port = 5431 ;
146
- }
176
+ ShubErrorSeverity severity = SHUB_RECOVERABLE_ERROR ;
147
177
sock_inet .sin_port = htons (port );
148
178
149
179
if (!resolve_host_by_name (host , addrs , & n_addrs )) {
150
180
shub -> params -> error_handler ("Failed to resolve host by name" , severity );
151
- goto trynext ;
181
+ continue ;
152
182
}
153
183
shub -> output = socket (AF_INET , SOCK_STREAM , 0 );
154
184
if (shub -> output < 0 ) {
155
185
shub -> params -> error_handler ("Failed to create inet socket" , severity );
156
- goto trynext ;
186
+ continue ;
157
187
}
158
188
while (1 ) {
159
189
int rc = -1 ;
@@ -170,32 +200,22 @@ static void reconnect(Shub* shub)
170
200
if (rc < 0 ) {
171
201
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS ) {
172
202
shub -> params -> error_handler ("Connection can not be establish" , severity );
173
- goto trynext ;
203
+ continue ;
174
204
}
175
205
if (max_attempts -- != 0 ) {
176
206
sleep (1 );
177
207
} else {
178
208
shub -> params -> error_handler ("Failed to connect to host" , severity );
179
- goto trynext ;
209
+ continue ;
180
210
}
181
211
} else {
182
212
int optval = 1 ;
183
213
setsockopt (shub -> output , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (optval ));
184
214
FD_SET (shub -> output , & shub -> inset );
185
- goto finish ;
215
+ return ;
186
216
}
187
217
}
188
- trynext :
189
- hostport = strtok_r (NULL , "," , & hstate );
190
- hosti ++ ;
191
- }
192
- finish :
193
- if (hosti < skip_hosts ) {
194
- skip_hosts = 0 ;
195
- } else {
196
- skip_hosts ++ ;
197
218
}
198
- free (hosts );
199
219
}
200
220
201
221
static void notify_disconnect (Shub * shub , int chan )
0 commit comments