Skip to content

Commit 3824b90

Browse files
committed
Add SocketHUB library
1 parent 0f1b88d commit 3824b90

File tree

3 files changed

+441
-0
lines changed

3 files changed

+441
-0
lines changed

contrib/pg_xtm/sockhub.c

Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
#include <sys/ioctl.h>
2+
#include <fcntl.h>
3+
#include <sys/time.h>
4+
#include <sys/types.h>
5+
#include <sys/socket.h>
6+
#include <sys/utsname.h>
7+
#include <sys/select.h>
8+
#include <netinet/in.h>
9+
#include <netinet/tcp.h>
10+
#include <arpa/inet.h>
11+
#include <stdio.h>
12+
#include <netdb.h>
13+
#include <stdlib.h>
14+
#include <unistd.h>
15+
#include <string.h>
16+
#include <errno.h>
17+
18+
static void default_error_handler(char const* msg, ShubErrorSeverity severity)
19+
{
20+
perror(msg);
21+
if (severity == SHUB_FATAL_ERROR) {
22+
exit(1);
23+
}
24+
}
25+
26+
27+
void ShubInitPrams(ShubParams* params)
28+
{
29+
memset(params, 0, sizeof params);
30+
params->buffer_size = 64*1025;
31+
params->port = 54321;
32+
params->queue_size = 100;
33+
params->max_attempts = 10;
34+
params->error_handler = default_error_handler;
35+
}
36+
37+
38+
static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned* n_addrs)
39+
{
40+
struct sockaddr_in sin;
41+
struct hostent* hp;
42+
unsigned i;
43+
44+
sin.sin_addr.s_addr = inet_addr(hostname);
45+
if (sin.sin_addr.s_addr != INADDR_NONE) {
46+
memcpy(&addrs[0], &sin.sin_addr.s_addr, sizeof(sin.sin_addr.s_addr));
47+
*n_addrs = 1;
48+
return 1;
49+
}
50+
51+
hp = gethostbyname(hostname);
52+
if (hp == NULL || hp->h_addrtype != AF_INET) {
53+
return 0;
54+
}
55+
for (i = 0; hp->h_addr_list[i] != NULL && i < *n_addrs; i++) {
56+
memcpy(&addrs[i], hp->h_addr_list[i], sizeof(addrs[i]));
57+
}
58+
*n_addrs = i;
59+
return 1;
60+
}
61+
62+
63+
64+
static void close_socket(Shub* shub, int fd)
65+
{
66+
int i, max_fd;
67+
fd_set copy;
68+
FD_ZERO(&copy);
69+
close(fd);
70+
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
71+
if (i != fd && FD_ISSET(i, &shub->inset)) {
72+
FD_SET(i, &copy);
73+
}
74+
}
75+
FD_COPY(&copy, &shub->inset);
76+
}
77+
78+
static int read_socket(int sd, char* buf, int size)
79+
{
80+
while (size != 0) {
81+
int n = recv(sd, buf, size , 0);
82+
if (n <= 0) {
83+
return 0;
84+
}
85+
size -= n;
86+
buf += n;
87+
}
88+
return 1;
89+
}
90+
91+
static int write_socket(int sd, char const* buf, int size)
92+
{
93+
while (size != 0) {
94+
int n = send(sd, buf, size, 0);
95+
if (n <= 0) {
96+
return 0;
97+
}
98+
size -= n;
99+
buf += n;
100+
}
101+
return 1;
102+
}
103+
104+
105+
static void reconnect(Shub* shub)
106+
{
107+
struct sockaddr_in sock_inet;
108+
unsigned addrs[128];
109+
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
110+
int max_attemtps = shub->params->max_attempts;
111+
112+
if (shub->output >= 0) {
113+
close_socket(shub, shub->output);
114+
}
115+
116+
sock_inet.sin_family = AF_INET;
117+
sock_inet.sin_port = htons(port);
118+
if (!resolve_host_by_name(host, addrs, &n_addrs)) {
119+
shub->error_handler("Failed to resolve host by name", SHUB_FATAL_ERROR);
120+
}
121+
shub->output = socket(AF_INET, SOCK_STREAM, 0);
122+
if (shub->output < 0) {
123+
shub->error_handler("Failed to create inet socket", SHUB_FATAL_ERROR);
124+
}
125+
while (1) {
126+
int rc = -1;
127+
for (i = 0; i < n_addrs; ++i) {
128+
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
129+
do {
130+
rc = connect(output, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
131+
} while (rc < 0 && errno == EINTR);
132+
133+
if (rc >= 0 || errno == EINPROGRESS) {
134+
if (rc >= 0) {
135+
}
136+
break;
137+
}
138+
}
139+
if (rc < 0) {
140+
if (errno != ENOENT && errno != ECONNREFUSED) {
141+
shub->error_handler("Connection can not be establish", SHUB_FATAL_ERROR);
142+
}
143+
if (max_attempts-- != 0) {
144+
sleep(1);
145+
} else {
146+
shub->error_handler("Failed to connect to host", SHUB_FATAL_ERROR);
147+
}
148+
} else {
149+
int optval = 1;
150+
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(int));
151+
FD_SET(shub->output, &inset);
152+
break;
153+
}
154+
}
155+
}
156+
157+
static void recovery(Shub* shub)
158+
{
159+
int i, max_fd;
160+
fd_set okset;
161+
fd_set tryset;
162+
163+
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
164+
if (FD_ISSET(i, &inset)) {
165+
struct timeval tm = {0,0};
166+
FD_ZERO(&tryset);
167+
FD_SET(i, &tryset);
168+
if (select(i+1, &tryset, NULL, NULL, &tm) >= 0) {
169+
FD_SET(i, &okset);
170+
}
171+
}
172+
}
173+
FD_COPY(&okset, &shub->inset);
174+
}
175+
176+
void ShubIntialize(Shub* shub, ShubParams* params)
177+
{
178+
struct sockaddr sock;
179+
180+
shub->params = params;
181+
sock.sa_family = AF_UNIX;
182+
strcpy(sock.sa_data, params->file);
183+
unlink(params->file);
184+
shub->input = socket(AF_UNIX, SOCK_STREAM, 0);
185+
if (shub->input < 0) {
186+
shub->error_handler("Failed to create local socket", SHUB_FATAL_ERROR);
187+
}
188+
if (bind(shub->input, &sock, ((char*)sock.sa_data - (char*)&sock) + strlen(params->file)) < 0) {
189+
shub->error_handler("Failed to bind local socket", SHUB_FATAL_ERROR);
190+
}
191+
if (listen(shub->input, params->queue_size) < 0) {
192+
shub->error_handler("Failed to listen local socket");
193+
}
194+
FD_ZERO(&shub->inset);
195+
FD_SET(shub->input, &shub->inset);
196+
197+
reconnect(shub);
198+
199+
shub->in_buffer = malloc(params->buffer_size);
200+
shub->out_buffer = malloc(params->buffer_size);
201+
if (shub->in_buffer == NULL || shub->out_buffer == NULL) {
202+
shub->error_handler("Failed to allocate buffer", SHUB_FATAL_ERROR);
203+
}
204+
}
205+
206+
207+
void ShubLoop(Shub* shub)
208+
{
209+
int buffer_size = shub->params->buffer_size;
210+
211+
while (1) {
212+
fd_set events;
213+
struct timeval tm;
214+
int i, max_fd, rc;
215+
unsigned int n, size;
216+
217+
tm.tv_sec = delay/1000;
218+
tm.tv_usec = delay % 1000 * 1000;
219+
220+
221+
FD_COPY(&shub->inset, &events);
222+
rc = select(shub->max_fd+1, &events, NULL, NULL, shub->in_buffer_used == 0 ? NULL : &tm);
223+
if (rc < 0) {
224+
if (errno != EINTR) {
225+
shub->error_handler("Select failed", SHUB_RECOVERABLE_ERROR);
226+
recovery(shub);
227+
}
228+
} else {
229+
if (rc > 0) {
230+
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
231+
if (FD_ISSET(i, &events)) {
232+
if (i == shub->input) {
233+
int s = accept(i, NULL, NULL);
234+
if (s < 0) {
235+
shub->error_handler("Failed to accept socket", SHUB_RECOVERABLE_ERROR);
236+
} else {
237+
if (s > max_fd) {
238+
shub->max_fd = s;
239+
}
240+
FD_SET(s, &shub->inset);
241+
}
242+
} else if (i == shub->output) {
243+
int available = recv(shub->output, shub->out_buffer + shub->out_buffer_used, buffer_size - shub->out_buffer_used, 0);
244+
int pos = 0;
245+
if (available <= 0) {
246+
pshub->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
247+
reconnect(shub);
248+
}
249+
shub->out_buffer_used += available;
250+
while (pos + sizeof(ShubMessageHdr) <= shub->out_buffer_used) {
251+
ShubMessageHdr* hdr = (ShubMessageHdr*)shub->out_buffer;
252+
int chan = hdr->chan;
253+
pos += sizeof(ShubMessageHdr);
254+
n = pos + hdr->size <= shub->out_buffer_used ? hdr->size + sizeof(ShubMessageHdr) : shub->out_buffer_used - pos;
255+
if (!write_socket(chan, hdr, n)) {
256+
shub->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
257+
close_socket(shub, chan);
258+
chan = -1;
259+
}
260+
if (n != hdr->size + sizeof(ShubMessageHdr)) {
261+
int tail = hdr->size + sizeof(ShubMessageHdr) - n;
262+
do {
263+
n = tail < shub->out_buffer_size ? tail : shub->out_buffer_size;
264+
if (!read_socket(output, out_buffer, n)) {
265+
shub->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
266+
reconnect(shub);
267+
continue;
268+
}
269+
if (chan >= 0 && !write_socket(chan, out_buffer, n)) {
270+
shub->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
271+
close_socket(shub, chan);
272+
chan = -1;
273+
}
274+
tail -= n;
275+
} while (tail != 0);
276+
}
277+
}
278+
memcpy(shub->out_buffer, shub->out_buffer + pos, shub->out_buffer_used - pos);
279+
shub->out_buffer_used -= pos;
280+
} else {
281+
ShubMessageHdr* hdr = (MessgeHdr*)&shub->in_buffer[shub->in_buffer_used];
282+
if (!read_socket(i, hdr, sizeof(ShubMessageHdr))) {
283+
shub->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
284+
close_socket(shub, i);
285+
} else {
286+
unsigned int size = hdr->size;
287+
hdr->chan = i;
288+
if (size + shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
289+
if (shub->in_buffer_used != 0) {
290+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
291+
shub->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
292+
reconnect(shub);
293+
}
294+
memcpy(shub->in_buffer, shub->in_buffer + shub->in_buffer_used, sizeof(ShubMessageHdr));
295+
shub->in_buffer_used = 0;
296+
}
297+
}
298+
shub->in_buffer_used += sizeof(ShubMessageHdr);
299+
300+
while (1) {
301+
unsigned int n = size + shub->in_buffer_used > buffer_size ? buffer_size - shub->in_buffer_used : size;
302+
if (!read_socket(i, shub->in_buffer + shub->in_buffer_used, n)) {
303+
shub->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
304+
close_socket(shub, i);
305+
break;
306+
} else {
307+
if (n != size) {
308+
while (!write_socket(output, in_buffer, n)) {
309+
shub->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
310+
reconnect(shub);
311+
}
312+
size -= n;
313+
shub->in_buffer_used = 0;
314+
} else {
315+
shub->in_buffer_used += n;
316+
break;
317+
}
318+
}
319+
}
320+
}
321+
}
322+
}
323+
}
324+
} else if (shub->in_buffer_used != 0) {
325+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
326+
shub->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
327+
reconnect(shub);
328+
}
329+
}
330+
}
331+
}
332+
}
333+

contrib/pg_xtm/sockhub.h

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#ifndef __SOCKHUB_H__
2+
#define __SOCKHUB_H__
3+
4+
#include <sys/select.h>
5+
6+
typedef struct {
7+
unsigned int size : 24; /* size of message without header */
8+
unsigned int code : 8; /* any user defined code */
9+
unsigned int chan; /* local socket: set by SockHUB */
10+
} ShubMessageHdr;
11+
12+
typedef enum
13+
{
14+
SHUB_FATAL_ERROR,
15+
SHUB_RECOVERABLE_ERROR,
16+
SHUB_MINOR_ERROR,
17+
} ShubErrorSeverity;
18+
19+
typedef void(*ShubErrorHandler)(char const* msg, ShubErrorSeverity severity);
20+
21+
typedef struct
22+
{
23+
int buffer_size;
24+
int delay;
25+
int port;
26+
int queue_size;
27+
int max_attempts;
28+
char const* file;
29+
char const* host;
30+
ShubErrorHandler error_handler;
31+
} ShubParms;
32+
33+
typedef struct
34+
{
35+
int output;
36+
int input;
37+
int max_fd;
38+
fd_set inset;
39+
char* in_buffer;
40+
int in_buffer_used;
41+
char* out_buffer;
42+
int out_buffer_used;
43+
ShubParams* params;
44+
} Shub;
45+
46+
void ShubInitParams(ShubParams* params);
47+
void ShubIntialize(Shub* shub, ShubParams* params);
48+
void ShubLoop(Shub* shub);
49+
50+
#endif

0 commit comments

Comments
 (0)