Skip to content

Commit 32eaf15

Browse files
committed
merge
2 parents a404eed + f7cf59c commit 32eaf15

File tree

3 files changed

+102
-63
lines changed

3 files changed

+102
-63
lines changed

contrib/pg_dtm/libdtm.c

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -74,37 +74,57 @@ static DTMConn DtmConnect(char *host, int port)
7474
hint.ai_family = AF_INET;
7575
snprintf(portstr, 6, "%d", port);
7676
hint.ai_protocol = getprotobyname("tcp")->p_proto;
77-
if (getaddrinfo(host, portstr, &hint, &addrs))
78-
{
79-
perror("failed to resolve address");
80-
return NULL;
81-
}
8277

83-
for (a = addrs; a != NULL; a = a->ai_next)
84-
{
85-
int one = 1;
86-
sd = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
87-
if (sd == -1)
88-
{
89-
perror("failed to create a socket");
90-
continue;
91-
}
92-
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
93-
94-
if (connect(sd, a->ai_addr, a->ai_addrlen) == -1)
95-
{
96-
perror("failed to connect to an address");
97-
close(sd);
98-
continue;
99-
}
100-
101-
// success
102-
freeaddrinfo(addrs);
103-
dtm = malloc(sizeof(DTMConnData));
104-
dtm->sock = sd;
105-
return dtm;
106-
}
107-
freeaddrinfo(addrs);
78+
while (1) {
79+
char* sep = strchr(host, ',');
80+
if (sep != NULL) {
81+
*sep = '\0';
82+
}
83+
if (getaddrinfo(host, portstr, &hint, &addrs))
84+
{
85+
perror("failed to resolve address");
86+
if (sep == NULL) {
87+
return NULL;
88+
} else {
89+
goto TryNextHost;
90+
}
91+
}
92+
93+
for (a = addrs; a != NULL; a = a->ai_next)
94+
{
95+
int one = 1;
96+
sd = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
97+
if (sd == -1)
98+
{
99+
perror("failed to create a socket");
100+
goto TryNextHost;
101+
}
102+
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
103+
104+
if (connect(sd, a->ai_addr, a->ai_addrlen) == -1)
105+
{
106+
perror("failed to connect to an address");
107+
close(sd);
108+
goto TryNextHost;
109+
}
110+
111+
// success
112+
freeaddrinfo(addrs);
113+
dtm = malloc(sizeof(DTMConnData));
114+
dtm->sock = sd;
115+
if (sep != NULL) {
116+
*sep = ',';
117+
}
118+
return dtm;
119+
}
120+
freeaddrinfo(addrs);
121+
TryNextHost:
122+
if (sep == NULL) {
123+
break;
124+
}
125+
*sep = ',';
126+
host = sep + 1;
127+
}
108128
}
109129
fprintf(stderr, "could not connect\n");
110130
return NULL;

contrib/pg_dtm/sockhub/sockhub.c

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -107,47 +107,66 @@ static void reconnect(Shub* shub)
107107
unsigned addrs[128];
108108
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
109109
int max_attempts = shub->params->max_attempts;
110-
110+
char* host = (char*)shub->params->host;
111111
if (shub->output >= 0) {
112112
close_socket(shub, shub->output);
113113
}
114114

115115
sock_inet.sin_family = AF_INET;
116116
sock_inet.sin_port = htons(shub->params->port);
117-
if (!resolve_host_by_name(shub->params->host, addrs, &n_addrs)) {
118-
shub->params->error_handler("Failed to resolve host by name", SHUB_FATAL_ERROR);
119-
}
120-
shub->output = socket(AF_INET, SOCK_STREAM, 0);
121-
if (shub->output < 0) {
122-
shub->params->error_handler("Failed to create inet socket", SHUB_FATAL_ERROR);
123-
}
117+
124118
while (1) {
125-
int rc = -1;
126-
for (i = 0; i < n_addrs; ++i) {
127-
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
128-
do {
129-
rc = connect(shub->output, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
130-
} while (rc < 0 && errno == EINTR);
131-
132-
if (rc >= 0 || errno == EINPROGRESS) {
133-
break;
134-
}
119+
char* sep = strchr(host, ',');
120+
ShubErrorSeverity severity = SHUB_FATAL_ERROR;
121+
if (sep != NULL) {
122+
*sep = '\0';
123+
severity = SHUB_RECOVERABLE_ERROR;
124+
}
125+
if (!resolve_host_by_name(host, addrs, &n_addrs)) {
126+
shub->params->error_handler("Failed to resolve host by name", severity);
127+
goto TryNextHost;
128+
}
129+
shub->output = socket(AF_INET, SOCK_STREAM, 0);
130+
if (shub->output < 0) {
131+
shub->params->error_handler("Failed to create inet socket", severity);
132+
goto TryNextHost;
135133
}
136-
if (rc < 0) {
137-
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) {
138-
shub->params->error_handler("Connection can not be establish", SHUB_FATAL_ERROR);
134+
while (1) {
135+
int rc = -1;
136+
for (i = 0; i < n_addrs; ++i) {
137+
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
138+
do {
139+
rc = connect(shub->output, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
140+
} while (rc < 0 && errno == EINTR);
141+
142+
if (rc >= 0 || errno == EINPROGRESS) {
143+
break;
144+
}
145+
}
146+
if (rc < 0) {
147+
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) {
148+
shub->params->error_handler("Connection can not be establish", severity);
149+
goto TryNextHost;
150+
}
151+
if (max_attempts-- != 0) {
152+
sleep(1);
153+
} else {
154+
shub->params->error_handler("Failed to connect to host", severity);
155+
goto TryNextHost;
156+
}
157+
} else {
158+
int optval = 1;
159+
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
160+
FD_SET(shub->output, &shub->inset);
161+
if (sep != NULL) {
162+
*sep = ',';
163+
}
164+
return;
139165
}
140-
if (max_attempts-- != 0) {
141-
sleep(1);
142-
} else {
143-
shub->params->error_handler("Failed to connect to host", SHUB_FATAL_ERROR);
144-
}
145-
} else {
146-
int optval = 1;
147-
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
148-
FD_SET(shub->output, &shub->inset);
149-
break;
150166
}
167+
TryNextHost:
168+
*sep = ',';
169+
host = sep + 1;
151170
}
152171
}
153172

contrib/pg_dtm/tests/transfers.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ go run transfers.go \
55
-d 'dbname=postgres port=5433' \
66
-v \
77
-m \
8-
-u 10000 \
9-
-w 3 \
8+
-u 100000 \
9+
-w 10 \
1010
-g

0 commit comments

Comments
 (0)