Skip to content

Commit 06e1028

Browse files
committed
Add more sckhub tests
1 parent 582d9f5 commit 06e1028

File tree

7 files changed

+266
-27
lines changed

7 files changed

+266
-27
lines changed

contrib/pg_dtm/sockhub/Makefile

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC = gcc
2-
CFLAGS = -c -I. -Wall -O2 -g -fPIC
2+
CFLAGS = -c -I. -Wall -O0 -g -fPIC
33
LD = $(CC)
44
LDFLAGS = -g
55
AR = ar
@@ -9,7 +9,7 @@ all: sockhub library tests
99

1010
library: libsockhub.a
1111

12-
tests: test-client test-server
12+
tests: test-client test-async-client test-server
1313

1414
sockhup.o: sockhub.c sockhub.h
1515
$(CC) $(CFLAGS) sockhub.c
@@ -30,6 +30,12 @@ test-client.o: test-client.c sockhub.h
3030
test-client: test-client.o libsockhub.a
3131
$(LD) $(LDFLAGS) -o test-client test-client.o libsockhub.a
3232

33+
test-async-client.o: test-async-client.c sockhub.h
34+
$(CC) $(CFLAGS) test-async-client.c
35+
36+
test-async-client: test-async-client.o libsockhub.a
37+
$(LD) $(LDFLAGS) -o test-async-client test-async-client.o libsockhub.a
38+
3339
test-server.o: test-server.c sockhub.h
3440
$(CC) $(CFLAGS) test-server.c
3541

contrib/pg_dtm/sockhub/sockhub.c

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <string.h>
1616
#include <signal.h>
1717
#include <errno.h>
18+
#include <assert.h>
1819

1920
#include "sockhub.h"
2021

@@ -70,11 +71,11 @@ static void close_socket(Shub* shub, int fd)
7071
FD_CLR(fd, &shub->inset);
7172
}
7273

73-
static int read_socket_ex(int sd, char* buf, int min_size, int max_size)
74+
int ShubReadSocketEx(int sd, void* buf, int min_size, int max_size)
7475
{
7576
int received = 0;
7677
while (received < min_size) {
77-
int n = recv(sd, buf + received, max_size - received, 0);
78+
int n = recv(sd, (char*)buf + received, max_size - received, 0);
7879
if (n <= 0) {
7980
break;
8081
}
@@ -83,20 +84,21 @@ static int read_socket_ex(int sd, char* buf, int min_size, int max_size)
8384
return received;
8485
}
8586

86-
static int read_socket(int sd, char* buf, int size)
87+
int ShubReadSocket(int sd, void* buf, int size)
8788
{
88-
return read_socket_ex(sd, buf, size, size) == size;
89+
return ShubReadSocketEx(sd, buf, size, size) == size;
8990
}
9091

91-
static int write_socket(int sd, char const* buf, int size)
92+
int ShubWriteSocket(int sd, void const* buf, int size)
9293
{
94+
char* src = (char*)buf;
9395
while (size != 0) {
94-
int n = send(sd, buf, size, 0);
96+
int n = send(sd, src, size, 0);
9597
if (n <= 0) {
9698
return 0;
9799
}
98100
size -= n;
99-
buf += n;
101+
src += n;
100102
}
101103
return 1;
102104
}
@@ -180,7 +182,7 @@ static void notify_disconnect(Shub* shub, int chan)
180182
hdr->code = MSG_DISCONNECT;
181183
shub->in_buffer_used += sizeof(ShubMessageHdr);
182184
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > shub->params->buffer_size) {
183-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
185+
while (!ShubWriteSocket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
184186
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
185187
reconnect(shub);
186188
}
@@ -291,7 +293,7 @@ void ShubLoop(Shub* shub)
291293
}
292294
} else if (i == shub->output) { /* receive response from server */
293295
/* try to read as much as possible */
294-
int available = read_socket_ex(shub->output, shub->out_buffer + shub->out_buffer_used, sizeof(ShubMessageHdr), buffer_size - shub->out_buffer_used);
296+
int available = ShubReadSocketEx(shub->output, shub->out_buffer + shub->out_buffer_used, sizeof(ShubMessageHdr), buffer_size - shub->out_buffer_used);
295297
int pos = 0;
296298
if (available < sizeof(ShubMessageHdr)) {
297299
shub->params->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
@@ -307,7 +309,7 @@ void ShubLoop(Shub* shub)
307309
unsigned int n = pos + sizeof(ShubMessageHdr) + hdr->size <= shub->out_buffer_used
308310
? hdr->size + sizeof(ShubMessageHdr)
309311
: shub->out_buffer_used - pos;
310-
if (!write_socket(chan, (char*)hdr, n)) {
312+
if (!ShubWriteSocket(chan, (char*)hdr, n)) {
311313
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
312314
close_socket(shub, chan);
313315
notify_disconnect(shub, chan);
@@ -318,12 +320,12 @@ void ShubLoop(Shub* shub)
318320
int tail = hdr->size + sizeof(ShubMessageHdr) - n;
319321
do {
320322
n = tail < buffer_size ? tail : buffer_size;
321-
if (!read_socket(shub->output, shub->out_buffer, n)) {
323+
if (!ShubReadSocket(shub->output, shub->out_buffer, n)) {
322324
shub->params->error_handler("Failed to read inet socket", SHUB_RECOVERABLE_ERROR);
323325
reconnect(shub);
324326
continue;
325327
}
326-
if (chan >= 0 && !write_socket(chan, shub->out_buffer, n)) {
328+
if (chan >= 0 && !ShubWriteSocket(chan, shub->out_buffer, n)) {
327329
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
328330
close_socket(shub, chan);
329331
notify_disconnect(shub, chan);
@@ -344,7 +346,8 @@ void ShubLoop(Shub* shub)
344346
int chan = i;
345347
int available = 0;
346348
while (1) {
347-
available += read_socket_ex(chan, &shub->in_buffer[shub->in_buffer_used + available], sizeof(ShubMessageHdr) - available, buffer_size - shub->in_buffer_used - available);
349+
assert(sizeof(ShubMessageHdr) > available);
350+
available += ShubReadSocketEx(chan, &shub->in_buffer[shub->in_buffer_used + available], sizeof(ShubMessageHdr) - available, buffer_size - shub->in_buffer_used - available);
348351
if (available < sizeof(ShubMessageHdr)) {
349352
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
350353
close_socket(shub, i);
@@ -366,12 +369,12 @@ void ShubLoop(Shub* shub)
366369
/* message doesn't completely fit in buffer */
367370
if (shub->in_buffer_used != 0) { /* if buffer is not empty...*/
368371
/* ... then send it */
369-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
372+
while (!ShubWriteSocket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
370373
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
371374
reconnect(shub);
372375
}
373376
/* move received message header to the beginning of the buffer */
374-
memcpy(shub->in_buffer, shub->in_buffer + shub->in_buffer_used, buffer_size - shub->in_buffer_used);
377+
memcpy(shub->in_buffer, shub->in_buffer + shub->in_buffer_used, available - shub->in_buffer_used);
375378
shub->in_buffer_used = 0;
376379
}
377380
}
@@ -381,7 +384,7 @@ void ShubLoop(Shub* shub)
381384
do {
382385
unsigned int n = size + shub->in_buffer_used > buffer_size ? buffer_size - shub->in_buffer_used : size;
383386
/* fetch rest of message body */
384-
if (chan >= 0 && !read_socket(chan, shub->in_buffer + shub->in_buffer_used, n)) {
387+
if (chan >= 0 && !ShubReadSocket(chan, shub->in_buffer + shub->in_buffer_used, n)) {
385388
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
386389
close_socket(shub, chan);
387390
if (hdr != NULL) { /* if message header is not yet sent to the server... */
@@ -398,7 +401,7 @@ void ShubLoop(Shub* shub)
398401
/* if there is no more free space in the buffer to receive new message header... */
399402
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
400403
/* ... then send buffer to the server */
401-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
404+
while (!ShubWriteSocket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
402405
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
403406
reconnect(shub);
404407
}
@@ -417,7 +420,7 @@ void ShubLoop(Shub* shub)
417420
if (chan >= 0 && pos != available) { /* partly fetched message header */
418421
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > buffer_size) {
419422
/* message doesn't completely fit in buffer */
420-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
423+
while (!ShubWriteSocket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
421424
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
422425
reconnect(shub);
423426
}
@@ -448,7 +451,7 @@ void ShubLoop(Shub* shub)
448451
printf("Average sent buffer size: %ld\n", total_sent/total_count);
449452
}
450453
#endif
451-
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
454+
while (!ShubWriteSocket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
452455
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
453456
reconnect(shub);
454457
}

contrib/pg_dtm/sockhub/sockhub.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ typedef struct
4949
ShubParams* params;
5050
} Shub;
5151

52+
int ShubReadSocketEx(int sd, void* buf, int min_size, int max_size);
53+
int ShubReadSocket(int sd, void* buf, int size);
54+
int ShubWriteSocket(int sd, void const* buf, int size);
55+
5256
void ShubInitParams(ShubParams* params);
5357
void ShubInitialize(Shub* shub, ShubParams* params);
5458
void ShubLoop(Shub* shub);
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
n_clients=10
2+
n_iters=1000
3+
pkill -9 sockub
4+
pkill -9 test-async-client
5+
./sockhub -h $1 -p 5001 -f /tmp/p5002 &
6+
for ((i=0;i<n_clients;i++))
7+
do
8+
./test-async-client -h localhost -p 5002 -i $n_iters &
9+
done

0 commit comments

Comments
 (0)