Skip to content

Commit cb6377b

Browse files
committed
Update sockhub
1 parent e9335ab commit cb6377b

File tree

2 files changed

+16
-21
lines changed

2 files changed

+16
-21
lines changed

contrib/pg_xtm/sockhub/sockhub.c

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ static void default_error_handler(char const* msg, ShubErrorSeverity severity)
2828

2929
void ShubInitParams(ShubParams* params)
3030
{
31-
memset(params, 0, sizeof params);
31+
memset(params, 0, sizeof(*params));
3232
params->buffer_size = 64*1025;
3333
params->port = 54321;
3434
params->queue_size = 100;
@@ -65,22 +65,14 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
6565

6666
static void close_socket(Shub* shub, int fd)
6767
{
68-
int i, max_fd;
69-
fd_set copy;
70-
FD_ZERO(&copy);
7168
close(fd);
72-
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
73-
if (i != fd && FD_ISSET(i, &shub->inset)) {
74-
FD_SET(i, &copy);
75-
}
76-
}
77-
FD_COPY(&copy, &shub->inset);
69+
FD_CLR(fd, &shub->inset);
7870
}
7971

8072
static int read_socket(int sd, char* buf, int size)
8173
{
8274
while (size != 0) {
83-
int n = recv(sd, buf, size , 0);
75+
int n = recv(sd, buf, size, 0);
8476
if (n <= 0) {
8577
return 0;
8678
}
@@ -159,27 +151,26 @@ static void reconnect(Shub* shub)
159151
static void recovery(Shub* shub)
160152
{
161153
int i, max_fd;
162-
fd_set okset;
163-
fd_set tryset;
164154

165155
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
166156
if (FD_ISSET(i, &shub->inset)) {
167157
struct timeval tm = {0,0};
158+
fd_set tryset;
168159
FD_ZERO(&tryset);
169160
FD_SET(i, &tryset);
170-
if (select(i+1, &tryset, NULL, NULL, &tm) >= 0) {
171-
FD_SET(i, &okset);
161+
if (select(i+1, &tryset, NULL, NULL, &tm) < 0) {
162+
close_socket(shub, i);
172163
}
173164
}
174165
}
175-
FD_COPY(&okset, &shub->inset);
176166
}
177167

178168
void ShubInitialize(Shub* shub, ShubParams* params)
179169
{
180170
struct sockaddr sock;
181171

182172
shub->params = params;
173+
183174
sock.sa_family = AF_UNIX;
184175
strcpy(sock.sa_data, params->file);
185176
unlink(params->file);
@@ -203,6 +194,9 @@ void ShubInitialize(Shub* shub, ShubParams* params)
203194
if (shub->in_buffer == NULL || shub->out_buffer == NULL) {
204195
shub->params->error_handler("Failed to allocate buffer", SHUB_FATAL_ERROR);
205196
}
197+
shub->in_buffer_used = 0;
198+
shub->out_buffer_used = 0;
199+
shub->max_fd = -1;
206200
}
207201

208202

@@ -219,8 +213,7 @@ void ShubLoop(Shub* shub)
219213
tm.tv_sec = shub->params->delay/1000;
220214
tm.tv_usec = shub->params->delay % 1000 * 1000;
221215

222-
223-
FD_COPY(&shub->inset, &events);
216+
events = shub->inset;
224217
rc = select(shub->max_fd+1, &events, NULL, NULL, shub->in_buffer_used == 0 ? NULL : &tm);
225218
if (rc < 0) {
226219
if (errno != EINTR) {
@@ -250,15 +243,16 @@ void ShubLoop(Shub* shub)
250243
}
251244
shub->out_buffer_used += available;
252245
while (pos + sizeof(ShubMessageHdr) <= shub->out_buffer_used) {
253-
ShubMessageHdr* hdr = (ShubMessageHdr*)shub->out_buffer;
246+
ShubMessageHdr* hdr = (ShubMessageHdr*)(shub->out_buffer + pos);
254247
int chan = hdr->chan;
248+
n = pos + sizeof(ShubMessageHdr) + hdr->size <= shub->out_buffer_used ? hdr->size + sizeof(ShubMessageHdr) : shub->out_buffer_used - pos;
255249
pos += sizeof(ShubMessageHdr);
256-
n = pos + hdr->size <= shub->out_buffer_used ? hdr->size + sizeof(ShubMessageHdr) : shub->out_buffer_used - pos;
257250
if (!write_socket(chan, (char*)hdr, n)) {
258251
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
259252
close_socket(shub, chan);
260253
chan = -1;
261254
}
255+
/* read rest of message if it doesn't fit in buffer */
262256
if (n != hdr->size + sizeof(ShubMessageHdr)) {
263257
int tail = hdr->size + sizeof(ShubMessageHdr) - n;
264258
do {
@@ -275,6 +269,7 @@ void ShubLoop(Shub* shub)
275269
}
276270
tail -= n;
277271
} while (tail != 0);
272+
pos =;
278273
}
279274
}
280275
memcpy(shub->out_buffer, shub->out_buffer + pos, shub->out_buffer_used - pos);

contrib/pg_xtm/sockhub/sockhub.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ typedef struct
3737
int max_fd;
3838
fd_set inset;
3939
char* in_buffer;
40-
int in_buffer_used;
4140
char* out_buffer;
41+
int in_buffer_used;
4242
int out_buffer_used;
4343
ShubParams* params;
4444
} Shub;

0 commit comments

Comments
 (0)