Skip to content

Commit 61c5768

Browse files
committed
Add sockhub tests
1 parent 267b83c commit 61c5768

File tree

7 files changed

+317
-3
lines changed

7 files changed

+317
-3
lines changed

contrib/pg_dtm/sockhub/Makefile

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ LDFLAGS = -g
55
AR = ar
66
ARFLAGS = -cru
77

8-
all: sockhub libsockhub.a
8+
all: sockhub library tests
9+
10+
library: libsockhub.a
11+
12+
tests: test-client test-server
913

1014
sockhup.o: sockhub.c sockhub.h
1115
$(CC) $(CFLAGS) sockhub.c
@@ -20,5 +24,20 @@ libsockhub.a: sockhub.o
2024
sockhub: sockhub_main.o libsockhub.a
2125
$(LD) $(LDFLAGS) -o sockhub sockhub_main.o libsockhub.a
2226

27+
test-client.o: test-client.c sockhub.h
28+
$(CC) $(CFLAGS) test-client.c
29+
30+
test-client: test-client.o libsockhub.a
31+
$(LD) $(LDFLAGS) -o test-client test-client.o libsockhub.a
32+
33+
test-server.o: test-server.c sockhub.h
34+
$(CC) $(CFLAGS) test-server.c
35+
36+
test-server: test-server.o libsockhub.a
37+
$(LD) $(LDFLAGS) -o test-server test-server.o libsockhub.a
38+
2339
clean:
24-
rm *.o *.a
40+
rm -f *.o *.a
41+
42+
tgz: clean
43+
cd .. ; tar cvzf sockhub.tgz sockhub

contrib/pg_dtm/sockhub/sockhub.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ static void reconnect(Shub* shub)
139139
}
140140
} else {
141141
int optval = 1;
142-
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(int));
142+
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
143143
FD_SET(shub->output, &shub->inset);
144144
break;
145145
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
n_clients=10
2+
n_iters=100000
3+
for ((i=0;i<n_clients;i++))
4+
do
5+
./test-client $1 5001 $n_iters &
6+
done
7+
wait
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
n_clients=10
2+
n_iters=100000
3+
./sockhub -h $1 -p 5001 -f /tmp/p5002 &
4+
for ((i=0;i<n_clients;i++))
5+
do
6+
./test-client localhost 5002 $n_iters &
7+
done
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
./test-server 5001

contrib/pg_dtm/sockhub/test-client.c

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
#include <sys/ioctl.h>
2+
#include <fcntl.h>
3+
#include <time.h>
4+
#include <sys/time.h>
5+
#include <sys/types.h>
6+
#include <sys/socket.h>
7+
#include <sys/utsname.h>
8+
#include <sys/select.h>
9+
#include <netinet/in.h>
10+
#include <netinet/tcp.h>
11+
#include <arpa/inet.h>
12+
#include <stdio.h>
13+
#include <netdb.h>
14+
#include <stdlib.h>
15+
#include <unistd.h>
16+
#include <string.h>
17+
#include <errno.h>
18+
#include <stddef.h>
19+
#include <assert.h>
20+
21+
#include "sockhub.h"
22+
23+
#define MAX_CONNECT_ATTEMPTS 10
24+
25+
typedef struct
26+
{
27+
ShubMessageHdr hdr;
28+
int data;
29+
} Message;
30+
31+
static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned* n_addrs)
32+
{
33+
struct sockaddr_in sin;
34+
struct hostent* hp;
35+
unsigned i;
36+
37+
sin.sin_addr.s_addr = inet_addr(hostname);
38+
if (sin.sin_addr.s_addr != INADDR_NONE) {
39+
memcpy(&addrs[0], &sin.sin_addr.s_addr, sizeof(sin.sin_addr.s_addr));
40+
*n_addrs = 1;
41+
return 1;
42+
}
43+
44+
hp = gethostbyname(hostname);
45+
if (hp == NULL || hp->h_addrtype != AF_INET) {
46+
return 0;
47+
}
48+
for (i = 0; hp->h_addr_list[i] != NULL && i < *n_addrs; i++) {
49+
memcpy(&addrs[i], hp->h_addr_list[i], sizeof(addrs[i]));
50+
}
51+
*n_addrs = i;
52+
return 1;
53+
}
54+
55+
int connect_to_server(char const* host, int port, int max_attempts)
56+
{
57+
struct sockaddr_in sock_inet;
58+
unsigned addrs[128];
59+
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
60+
int rc;
61+
int sd;
62+
63+
if (strcmp(host, "localhost") == 0) {
64+
struct sockaddr sock;
65+
int len = offsetof(struct sockaddr, sa_data) + sprintf(sock.sa_data, "/tmp/p%u", port);
66+
sock.sa_family = AF_UNIX;
67+
sd = socket(sock.sa_family, SOCK_STREAM, 0);
68+
if (sd < 0) {
69+
return -1;
70+
}
71+
72+
while (1) {
73+
do {
74+
rc = connect(sd, &sock, len);
75+
} while (rc < 0 && EINTR);
76+
77+
if (rc < 0) {
78+
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) {
79+
return -1;
80+
}
81+
if (max_attempts-- != 0) {
82+
sleep(1);
83+
} else {
84+
return -1;
85+
}
86+
} else {
87+
break;
88+
}
89+
}
90+
} else {
91+
sock_inet.sin_family = AF_INET;
92+
sock_inet.sin_port = htons(port);
93+
if (!resolve_host_by_name(host, addrs, &n_addrs)) {
94+
return -1;
95+
}
96+
sd = socket(AF_INET, SOCK_STREAM, 0);
97+
if (sd < 0) {
98+
return -1;
99+
}
100+
while (1) {
101+
int rc = -1;
102+
for (i = 0; i < n_addrs; ++i) {
103+
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
104+
do {
105+
rc = connect(sd, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
106+
} while (rc < 0 && errno == EINTR);
107+
108+
if (rc >= 0 || errno == EINPROGRESS) {
109+
break;
110+
}
111+
}
112+
if (rc < 0) {
113+
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) {
114+
return -1;
115+
}
116+
if (max_attempts-- != 0) {
117+
sleep(1);
118+
} else {
119+
return -1;
120+
}
121+
} else {
122+
int optval = 1;
123+
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(int));
124+
break;
125+
}
126+
}
127+
}
128+
return sd;
129+
}
130+
131+
132+
133+
int main(int argc, char* argv[])
134+
{
135+
int sd;
136+
int rc;
137+
int i;
138+
int n_iter = 10000;
139+
time_t start;
140+
141+
if (argc < 3) {
142+
fprintf(stderr, "Usage: ./test-client HOST PORT [N_ITERATIONS]\n");
143+
return 1;
144+
}
145+
146+
sd = connect_to_server(argv[1], atoi(argv[2]), MAX_CONNECT_ATTEMPTS);
147+
if (sd < 0) {
148+
perror("Failed to connect to socket");
149+
return 1;
150+
}
151+
152+
if (argc >= 3) {
153+
n_iter = atoi(argv[3]);
154+
}
155+
156+
start = time(NULL);
157+
158+
for (i = 0; i < n_iter; i++) {
159+
Message msg;
160+
msg.data = i;
161+
msg.hdr.size = sizeof(Message) - sizeof(ShubMessageHdr);
162+
rc = send(sd, &msg, sizeof(msg), 0);
163+
assert(rc == sizeof(msg));
164+
rc = recv(sd, &msg, sizeof(msg), 0);
165+
assert(rc == sizeof(msg) && msg.data == i+1);
166+
}
167+
168+
printf("Elapsed time for %d iterations: %d\n", n_iter, (int)(time(NULL) - start));
169+
return 0;
170+
}
171+
172+
173+

contrib/pg_dtm/sockhub/test-server.c

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#include <sys/ioctl.h>
2+
#include <fcntl.h>
3+
#include <sys/time.h>
4+
#include <sys/types.h>
5+
#include <sys/socket.h>
6+
#include <sys/utsname.h>
7+
#include <sys/select.h>
8+
#include <netinet/in.h>
9+
#include <netinet/tcp.h>
10+
#include <arpa/inet.h>
11+
#include <stdio.h>
12+
#include <netdb.h>
13+
#include <stdlib.h>
14+
#include <unistd.h>
15+
#include <string.h>
16+
#include <errno.h>
17+
#include <assert.h>
18+
19+
#include "sockhub.h"
20+
21+
#define BUFFER_SIZE 64*1024
22+
#define LISTEN_QUEUE_SIZE 100
23+
24+
typedef struct
25+
{
26+
ShubMessageHdr hdr;
27+
int data;
28+
} Message;
29+
30+
31+
int main(int argc, char* argv[])
32+
{
33+
int sd;
34+
int i;
35+
int max_fd;
36+
struct sockaddr_in sock;
37+
fd_set inset;
38+
int port;
39+
int optval = 1;
40+
char buf[BUFFER_SIZE];
41+
42+
if (argc < 2) {
43+
fprintf(stderr, "Usage: ./test-server PORT\n");
44+
return 1;
45+
}
46+
port = atoi(argv[1]);
47+
48+
sd = socket(AF_INET, SOCK_STREAM, 0);
49+
if (sd < 0) {
50+
perror("Failed to connect to socket");
51+
return 1;
52+
}
53+
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
54+
setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char const*)&optval, sizeof(optval));
55+
56+
sock.sin_family = AF_INET;
57+
sock.sin_addr.s_addr = htonl(INADDR_ANY);
58+
sock.sin_port = htons(port);
59+
if (bind(sd, (struct sockaddr*)&sock, sizeof(sock))) {
60+
perror("Failed to bind socket");
61+
return 1;
62+
}
63+
if (listen(sd, LISTEN_QUEUE_SIZE) < 0) {
64+
perror("Failed to listen socket");
65+
return 1;
66+
}
67+
FD_ZERO(&inset);
68+
FD_SET(sd, &inset);
69+
max_fd = sd;
70+
71+
while (1) {
72+
fd_set events = inset;
73+
int rc = select(max_fd+1, &events, NULL, NULL, NULL);
74+
if (rc > 0) {
75+
for (i = 0; i <= max_fd; i++) {
76+
if (FD_ISSET(i, &events)) {
77+
if (i == sd) {
78+
int s = accept(sd, NULL, NULL);
79+
if (s < 0) {
80+
perror("Failed to accept socket");
81+
} else {
82+
FD_SET(s, &inset);
83+
if (s > max_fd) {
84+
max_fd = s;
85+
}
86+
}
87+
} else {
88+
rc = recv(i, buf, sizeof(buf), 0);
89+
if (rc > 0) {
90+
int pos;
91+
for (pos = 0; pos < rc; pos += sizeof(Message)) {
92+
Message* msg = (Message*)&buf[pos];
93+
msg->data += 1;
94+
assert(sizeof(ShubMessageHdr) + msg->hdr.size == sizeof(Message));
95+
}
96+
assert(pos == rc);
97+
rc = send(i, buf, pos, 0);
98+
assert(rc == pos);
99+
} else {
100+
FD_CLR(i, &inset);
101+
}
102+
}
103+
}
104+
}
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)