Skip to content

Commit d82f023

Browse files
committed
Move sockhub to separate directory
1 parent 3824b90 commit d82f023

File tree

3 files changed

+43
-40
lines changed

3 files changed

+43
-40
lines changed

contrib/pg_xtm/sockhub.c renamed to contrib/pg_xtm/sockhub/sockhub.c

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include <string.h>
1616
#include <errno.h>
1717

18+
#include "sockhub.h"
19+
1820
static void default_error_handler(char const* msg, ShubErrorSeverity severity)
1921
{
2022
perror(msg);
@@ -24,7 +26,7 @@ static void default_error_handler(char const* msg, ShubErrorSeverity severity)
2426
}
2527

2628

27-
void ShubInitPrams(ShubParams* params)
29+
void ShubInitParams(ShubParams* params)
2830
{
2931
memset(params, 0, sizeof params);
3032
params->buffer_size = 64*1025;
@@ -107,27 +109,27 @@ static void reconnect(Shub* shub)
107109
struct sockaddr_in sock_inet;
108110
unsigned addrs[128];
109111
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
110-
int max_attemtps = shub->params->max_attempts;
112+
int max_attempts = shub->params->max_attempts;
111113

112114
if (shub->output >= 0) {
113115
close_socket(shub, shub->output);
114116
}
115117

116118
sock_inet.sin_family = AF_INET;
117-
sock_inet.sin_port = htons(port);
118-
if (!resolve_host_by_name(host, addrs, &n_addrs)) {
119-
shub->error_handler("Failed to resolve host by name", SHUB_FATAL_ERROR);
119+
sock_inet.sin_port = htons(shub->params->port);
120+
if (!resolve_host_by_name(shub->params->host, addrs, &n_addrs)) {
121+
shub->params->error_handler("Failed to resolve host by name", SHUB_FATAL_ERROR);
120122
}
121123
shub->output = socket(AF_INET, SOCK_STREAM, 0);
122124
if (shub->output < 0) {
123-
shub->error_handler("Failed to create inet socket", SHUB_FATAL_ERROR);
125+
shub->params->error_handler("Failed to create inet socket", SHUB_FATAL_ERROR);
124126
}
125127
while (1) {
126128
int rc = -1;
127129
for (i = 0; i < n_addrs; ++i) {
128130
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
129131
do {
130-
rc = connect(output, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
132+
rc = connect(shub->output, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
131133
} while (rc < 0 && errno == EINTR);
132134

133135
if (rc >= 0 || errno == EINPROGRESS) {
@@ -138,17 +140,17 @@ static void reconnect(Shub* shub)
138140
}
139141
if (rc < 0) {
140142
if (errno != ENOENT && errno != ECONNREFUSED) {
141-
shub->error_handler("Connection can not be establish", SHUB_FATAL_ERROR);
143+
shub->params->error_handler("Connection can not be establish", SHUB_FATAL_ERROR);
142144
}
143145
if (max_attempts-- != 0) {
144146
sleep(1);
145147
} else {
146-
shub->error_handler("Failed to connect to host", SHUB_FATAL_ERROR);
148+
shub->params->error_handler("Failed to connect to host", SHUB_FATAL_ERROR);
147149
}
148150
} else {
149151
int optval = 1;
150152
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(int));
151-
FD_SET(shub->output, &inset);
153+
FD_SET(shub->output, &shub->inset);
152154
break;
153155
}
154156
}
@@ -161,7 +163,7 @@ static void recovery(Shub* shub)
161163
fd_set tryset;
162164

163165
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
164-
if (FD_ISSET(i, &inset)) {
166+
if (FD_ISSET(i, &shub->inset)) {
165167
struct timeval tm = {0,0};
166168
FD_ZERO(&tryset);
167169
FD_SET(i, &tryset);
@@ -173,7 +175,7 @@ static void recovery(Shub* shub)
173175
FD_COPY(&okset, &shub->inset);
174176
}
175177

176-
void ShubIntialize(Shub* shub, ShubParams* params)
178+
void ShubInitialize(Shub* shub, ShubParams* params)
177179
{
178180
struct sockaddr sock;
179181

@@ -183,13 +185,13 @@ void ShubIntialize(Shub* shub, ShubParams* params)
183185
unlink(params->file);
184186
shub->input = socket(AF_UNIX, SOCK_STREAM, 0);
185187
if (shub->input < 0) {
186-
shub->error_handler("Failed to create local socket", SHUB_FATAL_ERROR);
188+
shub->params->error_handler("Failed to create local socket", SHUB_FATAL_ERROR);
187189
}
188190
if (bind(shub->input, &sock, ((char*)sock.sa_data - (char*)&sock) + strlen(params->file)) < 0) {
189-
shub->error_handler("Failed to bind local socket", SHUB_FATAL_ERROR);
191+
shub->params->error_handler("Failed to bind local socket", SHUB_FATAL_ERROR);
190192
}
191193
if (listen(shub->input, params->queue_size) < 0) {
192-
shub->error_handler("Failed to listen local socket");
194+
shub->params->error_handler("Failed to listen local socket", SHUB_FATAL_ERROR);
193195
}
194196
FD_ZERO(&shub->inset);
195197
FD_SET(shub->input, &shub->inset);
@@ -199,7 +201,7 @@ void ShubIntialize(Shub* shub, ShubParams* params)
199201
shub->in_buffer = malloc(params->buffer_size);
200202
shub->out_buffer = malloc(params->buffer_size);
201203
if (shub->in_buffer == NULL || shub->out_buffer == NULL) {
202-
shub->error_handler("Failed to allocate buffer", SHUB_FATAL_ERROR);
204+
shub->params->error_handler("Failed to allocate buffer", SHUB_FATAL_ERROR);
203205
}
204206
}
205207

@@ -214,15 +216,15 @@ void ShubLoop(Shub* shub)
214216
int i, max_fd, rc;
215217
unsigned int n, size;
216218

217-
tm.tv_sec = delay/1000;
218-
tm.tv_usec = delay % 1000 * 1000;
219+
tm.tv_sec = shub->params->delay/1000;
220+
tm.tv_usec = shub->params->delay % 1000 * 1000;
219221

220222

221223
FD_COPY(&shub->inset, &events);
222224
rc = select(shub->max_fd+1, &events, NULL, NULL, shub->in_buffer_used == 0 ? NULL : &tm);
223225
if (rc < 0) {
224226
if (errno != EINTR) {
225-
shub->error_handler("Select failed", SHUB_RECOVERABLE_ERROR);
227+
shub->params->error_handler("Select failed", SHUB_RECOVERABLE_ERROR);
226228
recovery(shub);
227229
}
228230
} else {
@@ -232,7 +234,7 @@ void ShubLoop(Shub* shub)
232234
if (i == shub->input) {
233235
int s = accept(i, NULL, NULL);
234236
if (s < 0) {
235-
shub->error_handler("Failed to accept socket", SHUB_RECOVERABLE_ERROR);
237+
shub->params->error_handler("Failed to accept socket", SHUB_RECOVERABLE_ERROR);
236238
} else {
237239
if (s > max_fd) {
238240
shub->max_fd = s;
@@ -243,7 +245,7 @@ void ShubLoop(Shub* shub)
243245
int available = recv(shub->output, shub->out_buffer + shub->out_buffer_used, buffer_size - shub->out_buffer_used, 0);
244246
int pos = 0;
245247
if (available <= 0) {
246-
pshub->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
248+
shub->params->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
247249
reconnect(shub);
248250
}
249251
shub->out_buffer_used += available;
@@ -252,22 +254,22 @@ void ShubLoop(Shub* shub)
252254
int chan = hdr->chan;
253255
pos += sizeof(ShubMessageHdr);
254256
n = pos + hdr->size <= shub->out_buffer_used ? hdr->size + sizeof(ShubMessageHdr) : shub->out_buffer_used - pos;
255-
if (!write_socket(chan, hdr, n)) {
256-
shub->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
257+
if (!write_socket(chan, (char*)hdr, n)) {
258+
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
257259
close_socket(shub, chan);
258260
chan = -1;
259261
}
260262
if (n != hdr->size + sizeof(ShubMessageHdr)) {
261263
int tail = hdr->size + sizeof(ShubMessageHdr) - n;
262264
do {
263-
n = tail < shub->out_buffer_size ? tail : shub->out_buffer_size;
264-
if (!read_socket(output, out_buffer, n)) {
265-
shub->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
265+
n = tail < buffer_size ? tail : buffer_size;
266+
if (!read_socket(shub->output, shub->out_buffer, n)) {
267+
shub->params->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
266268
reconnect(shub);
267269
continue;
268270
}
269-
if (chan >= 0 && !write_socket(chan, out_buffer, n)) {
270-
shub->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
271+
if (chan >= 0 && !write_socket(chan, shub->out_buffer, n)) {
272+
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
271273
close_socket(shub, chan);
272274
chan = -1;
273275
}
@@ -278,17 +280,17 @@ void ShubLoop(Shub* shub)
278280
memcpy(shub->out_buffer, shub->out_buffer + pos, shub->out_buffer_used - pos);
279281
shub->out_buffer_used -= pos;
280282
} else {
281-
ShubMessageHdr* hdr = (MessgeHdr*)&shub->in_buffer[shub->in_buffer_used];
282-
if (!read_socket(i, hdr, sizeof(ShubMessageHdr))) {
283-
shub->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
283+
ShubMessageHdr* hdr = (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
284+
if (!read_socket(i, (char*)hdr, sizeof(ShubMessageHdr))) {
285+
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
284286
close_socket(shub, i);
285287
} else {
286-
unsigned int size = hdr->size;
288+
size = hdr->size;
287289
hdr->chan = i;
288290
if (size + shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
289291
if (shub->in_buffer_used != 0) {
290292
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
291-
shub->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
293+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
292294
reconnect(shub);
293295
}
294296
memcpy(shub->in_buffer, shub->in_buffer + shub->in_buffer_used, sizeof(ShubMessageHdr));
@@ -300,13 +302,13 @@ void ShubLoop(Shub* shub)
300302
while (1) {
301303
unsigned int n = size + shub->in_buffer_used > buffer_size ? buffer_size - shub->in_buffer_used : size;
302304
if (!read_socket(i, shub->in_buffer + shub->in_buffer_used, n)) {
303-
shub->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
305+
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
304306
close_socket(shub, i);
305307
break;
306308
} else {
307309
if (n != size) {
308-
while (!write_socket(output, in_buffer, n)) {
309-
shub->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
310+
while (!write_socket(shub->output, shub->in_buffer, n)) {
311+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
310312
reconnect(shub);
311313
}
312314
size -= n;
@@ -323,7 +325,7 @@ void ShubLoop(Shub* shub)
323325
}
324326
} else if (shub->in_buffer_used != 0) {
325327
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
326-
shub->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
328+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
327329
reconnect(shub);
328330
}
329331
}

contrib/pg_xtm/sockhub.h renamed to contrib/pg_xtm/sockhub/sockhub.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ typedef struct
2828
char const* file;
2929
char const* host;
3030
ShubErrorHandler error_handler;
31-
} ShubParms;
31+
} ShubParams;
3232

3333
typedef struct
3434
{
@@ -44,7 +44,7 @@ typedef struct
4444
} Shub;
4545

4646
void ShubInitParams(ShubParams* params);
47-
void ShubIntialize(Shub* shub, ShubParams* params);
47+
void ShubInitialize(Shub* shub, ShubParams* params);
4848
void ShubLoop(Shub* shub);
4949

5050
#endif

contrib/pg_xtm/sockhub_main.c renamed to contrib/pg_xtm/sockhub/sockhub_main.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <stdio.h>
2+
#include <stdlib.h>
23
#include "sockhub.h"
34

45
int main(int argc, char* argv[])
@@ -25,7 +26,7 @@ int main(int argc, char* argv[])
2526
params.delay = atoi(argv[++i]);
2627
continue;
2728
case 'b':
28-
params.buf_size = atoi(argv[++i]);
29+
params.buffer_size = atoi(argv[++i]);
2930
continue;
3031
case 'r':
3132
params.max_attempts = atoi(argv[++i]);

0 commit comments

Comments
 (0)