Skip to content

Commit c9f37d6

Browse files
committed
Review sockhub library
1 parent 3bc578b commit c9f37d6

File tree

1 file changed

+59
-37
lines changed

1 file changed

+59
-37
lines changed

contrib/pg_xtm/sockhub/sockhub.c

Lines changed: 59 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,11 @@ static void reconnect(Shub* shub)
125125
} while (rc < 0 && errno == EINTR);
126126

127127
if (rc >= 0 || errno == EINPROGRESS) {
128-
if (rc >= 0) {
129-
}
130128
break;
131129
}
132130
}
133131
if (rc < 0) {
134-
if (errno != ENOENT && errno != ECONNREFUSED) {
132+
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) {
135133
shub->params->error_handler("Connection can not be establish", SHUB_FATAL_ERROR);
136134
}
137135
if (max_attempts-- != 0) {
@@ -187,6 +185,7 @@ void ShubInitialize(Shub* shub, ShubParams* params)
187185
FD_ZERO(&shub->inset);
188186
FD_SET(shub->input, &shub->inset);
189187

188+
shub->output = -1;
190189
reconnect(shub);
191190

192191
shub->in_buffer = malloc(params->buffer_size);
@@ -207,52 +206,58 @@ void ShubLoop(Shub* shub)
207206
while (1) {
208207
fd_set events;
209208
struct timeval tm;
210-
int i, max_fd, rc;
211-
unsigned int n, size;
209+
int i, rc;
210+
int max_fd = shub->max_fd;
212211

213212
tm.tv_sec = shub->params->delay/1000;
214213
tm.tv_usec = shub->params->delay % 1000 * 1000;
215214

216215
events = shub->inset;
217-
rc = select(shub->max_fd+1, &events, NULL, NULL, shub->in_buffer_used == 0 ? NULL : &tm);
216+
rc = select(max_fd+1, &events, NULL, NULL, shub->in_buffer_used == 0 ? NULL : &tm);
218217
if (rc < 0) {
219218
if (errno != EINTR) {
220219
shub->params->error_handler("Select failed", SHUB_RECOVERABLE_ERROR);
221220
recovery(shub);
222221
}
223222
} else {
224223
if (rc > 0) {
225-
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
224+
for (i = 0; i <= max_fd; i++) {
226225
if (FD_ISSET(i, &events)) {
227-
if (i == shub->input) {
226+
if (i == shub->input) { /* accept incomming connection */
228227
int s = accept(i, NULL, NULL);
229228
if (s < 0) {
230229
shub->params->error_handler("Failed to accept socket", SHUB_RECOVERABLE_ERROR);
231230
} else {
232-
if (s > max_fd) {
231+
if (s > shub->max_fd) {
233232
shub->max_fd = s;
234233
}
235234
FD_SET(s, &shub->inset);
236235
}
237-
} else if (i == shub->output) {
236+
} else if (i == shub->output) { /* receive response from server */
237+
/* try to read as much as possible */
238238
int available = recv(shub->output, shub->out_buffer + shub->out_buffer_used, buffer_size - shub->out_buffer_used, 0);
239239
int pos = 0;
240240
if (available <= 0) {
241241
shub->params->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
242242
reconnect(shub);
243+
continue;
243244
}
244245
shub->out_buffer_used += available;
246+
247+
/* loop through all received responses */
245248
while (pos + sizeof(ShubMessageHdr) <= shub->out_buffer_used) {
246-
ShubMessageHdr* hdr = (ShubMessageHdr*)(shub->out_buffer + pos);
249+
ShubMessageHdr* hdr = (ShubMessageHdr*)&shub->out_buffer[pos];
247250
int chan = hdr->chan;
248-
n = pos + sizeof(ShubMessageHdr) + hdr->size <= shub->out_buffer_used ? hdr->size + sizeof(ShubMessageHdr) : shub->out_buffer_used - pos;
251+
unsigned int n = pos + sizeof(ShubMessageHdr) + hdr->size <= shub->out_buffer_used
252+
? hdr->size + sizeof(ShubMessageHdr)
253+
: shub->out_buffer_used - pos;
249254
if (!write_socket(chan, (char*)hdr, n)) {
250255
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
251256
close_socket(shub, chan);
252257
chan = -1;
253258
}
254-
/* read rest of message if it doesn't fit in buffer */
255259
if (n != hdr->size + sizeof(ShubMessageHdr)) {
260+
/* read rest of message if it doesn't fit in the buffer */
256261
int tail = hdr->size + sizeof(ShubMessageHdr) - n;
257262
do {
258263
n = tail < buffer_size ? tail : buffer_size;
@@ -274,56 +279,73 @@ void ShubLoop(Shub* shub)
274279
}
275280
pos += n;
276281
}
282+
/* Move partly fetched message header (if any) to the beginning of byffer */
277283
memcpy(shub->out_buffer, shub->out_buffer + pos, shub->out_buffer_used - pos);
278284
shub->out_buffer_used -= pos;
279-
} else {
285+
} else { /* receive request from client */
280286
ShubMessageHdr* hdr = (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
281-
if (!read_socket(i, (char*)hdr, sizeof(ShubMessageHdr))) {
287+
int chan = i;
288+
if (!read_socket(chan, (char*)hdr, sizeof(ShubMessageHdr))) { /* fetch message header */
282289
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
283290
close_socket(shub, i);
284291
} else {
285-
size = hdr->size;
286-
hdr->chan = i;
292+
unsigned int size = hdr->size;
293+
hdr->chan = chan; /* remember socket descriptor from which this message was read */
287294
if (size + shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
288-
if (shub->in_buffer_used != 0) {
295+
/* message doesn't completely fit in buffer */
296+
if (shub->in_buffer_used != 0) { /* if buffer is not empty...*/
297+
/* ... then send it */
289298
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
290299
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
291300
reconnect(shub);
292301
}
302+
/* move received message header to the beginning of the buffer */
293303
memcpy(shub->in_buffer, shub->in_buffer + shub->in_buffer_used, sizeof(ShubMessageHdr));
294304
shub->in_buffer_used = 0;
295305
}
296306
}
297307
shub->in_buffer_used += sizeof(ShubMessageHdr);
298308

299-
while (1) {
309+
do {
300310
unsigned int n = size + shub->in_buffer_used > buffer_size ? buffer_size - shub->in_buffer_used : size;
301-
if (!read_socket(i, shub->in_buffer + shub->in_buffer_used, n)) {
311+
/* fetch message body */
312+
if (chan >= 0 && !read_socket(chan, shub->in_buffer + shub->in_buffer_used, n)) {
302313
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
303-
close_socket(shub, i);
304-
break;
305-
} else {
306-
if (n != size) {
307-
while (!write_socket(shub->output, shub->in_buffer, n)) {
308-
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
309-
reconnect(shub);
310-
}
311-
size -= n;
312-
shub->in_buffer_used = 0;
313-
} else {
314-
shub->in_buffer_used += n;
314+
close_socket(shub, chan);
315+
if (hdr != NULL) { /* if message header is not yet sent to the server... */
316+
/* ... then skip this message */
317+
shub->in_buffer_used = (char*)hdr - shub->in_buffer;
315318
break;
319+
} else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
320+
chan = -1; /* do not try to read rest of body of this message */
316321
}
322+
}
323+
shub->in_buffer_used += n;
324+
size -= n;
325+
/* if there is no more free space in the buffer to receive new message header... */
326+
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
327+
328+
/* ... then send buffer to the server */
329+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
330+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
331+
reconnect(shub);
332+
}
333+
hdr = NULL; /* message is partly sent to the server: can not skip it any more */
334+
shub->in_buffer_used = 0;
317335
}
318-
}
336+
} while (size != 0); /* repeat until all message body is received */
319337
}
320338
}
321339
}
322340
}
323-
} else if (shub->in_buffer_used != 0) {
324-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
325-
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
326-
reconnect(shub);
341+
} else { /* timeout expired */
342+
if (shub->in_buffer_used != 0) { /* if buffer is not empty... */
343+
/* ...then send it */
344+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
345+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
346+
reconnect(shub);
347+
}
348+
shub->in_buffer_used = 0;
327349
}
328350
}
329351
}

0 commit comments

Comments
 (0)