Skip to content

Commit edfb332

Browse files
committed
Make it possible to specify alternative DTMD addresses
1 parent 0360372 commit edfb332

File tree

3 files changed

+101
-62
lines changed

3 files changed

+101
-62
lines changed

contrib/pg_dtm/libdtm.c

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -72,37 +72,57 @@ static DTMConn DtmConnect(char *host, int port)
7272
hint.ai_family = AF_INET;
7373
snprintf(portstr, 6, "%d", port);
7474
hint.ai_protocol = getprotobyname("tcp")->p_proto;
75-
if (getaddrinfo(host, portstr, &hint, &addrs))
76-
{
77-
perror("failed to resolve address");
78-
return NULL;
79-
}
8075

81-
for (a = addrs; a != NULL; a = a->ai_next)
82-
{
83-
int one = 1;
84-
sd = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
85-
if (sd == -1)
86-
{
87-
perror("failed to create a socket");
88-
continue;
89-
}
90-
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
91-
92-
if (connect(sd, a->ai_addr, a->ai_addrlen) == -1)
93-
{
94-
perror("failed to connect to an address");
95-
close(sd);
96-
continue;
97-
}
98-
99-
// success
100-
freeaddrinfo(addrs);
101-
dtm = malloc(sizeof(DTMConnData));
102-
dtm->sock = sd;
103-
return dtm;
104-
}
105-
freeaddrinfo(addrs);
76+
while (1) {
77+
char* sep = strchr(host, ',');
78+
if (sep != NULL) {
79+
*sep = '\0';
80+
}
81+
if (getaddrinfo(host, portstr, &hint, &addrs))
82+
{
83+
perror("failed to resolve address");
84+
if (sep == NULL) {
85+
return NULL;
86+
} else {
87+
goto TryNextHost;
88+
}
89+
}
90+
91+
for (a = addrs; a != NULL; a = a->ai_next)
92+
{
93+
int one = 1;
94+
sd = socket(a->ai_family, a->ai_socktype, a->ai_protocol);
95+
if (sd == -1)
96+
{
97+
perror("failed to create a socket");
98+
goto TryNextHost;
99+
}
100+
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
101+
102+
if (connect(sd, a->ai_addr, a->ai_addrlen) == -1)
103+
{
104+
perror("failed to connect to an address");
105+
close(sd);
106+
goto TryNextHost;
107+
}
108+
109+
// success
110+
freeaddrinfo(addrs);
111+
dtm = malloc(sizeof(DTMConnData));
112+
dtm->sock = sd;
113+
if (sep != NULL) {
114+
*sep = ',';
115+
}
116+
return dtm;
117+
}
118+
freeaddrinfo(addrs);
119+
TryNextHost:
120+
if (sep == NULL) {
121+
break;
122+
}
123+
*sep = ',';
124+
host = sep + 1;
125+
}
106126
}
107127
fprintf(stderr, "could not connect\n");
108128
return NULL;

contrib/pg_dtm/sockhub/sockhub.c

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -107,47 +107,66 @@ static void reconnect(Shub* shub)
107107
unsigned addrs[128];
108108
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
109109
int max_attempts = shub->params->max_attempts;
110-
110+
char* host = (char*)shub->params->host;
111111
if (shub->output >= 0) {
112112
close_socket(shub, shub->output);
113113
}
114114

115115
sock_inet.sin_family = AF_INET;
116116
sock_inet.sin_port = htons(shub->params->port);
117-
if (!resolve_host_by_name(shub->params->host, addrs, &n_addrs)) {
118-
shub->params->error_handler("Failed to resolve host by name", SHUB_FATAL_ERROR);
119-
}
120-
shub->output = socket(AF_INET, SOCK_STREAM, 0);
121-
if (shub->output < 0) {
122-
shub->params->error_handler("Failed to create inet socket", SHUB_FATAL_ERROR);
123-
}
117+
124118
while (1) {
125-
int rc = -1;
126-
for (i = 0; i < n_addrs; ++i) {
127-
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
128-
do {
129-
rc = connect(shub->output, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
130-
} while (rc < 0 && errno == EINTR);
131-
132-
if (rc >= 0 || errno == EINPROGRESS) {
133-
break;
134-
}
119+
char* sep = strchr(host, ',');
120+
ShubErrorSeverity severity = SHUB_FATAL_ERROR;
121+
if (sep != NULL) {
122+
*sep = '\0';
123+
severity = SHUB_RECOVERABLE_ERROR;
124+
}
125+
if (!resolve_host_by_name(host, addrs, &n_addrs)) {
126+
shub->params->error_handler("Failed to resolve host by name", severity);
127+
goto TryNextHost;
128+
}
129+
shub->output = socket(AF_INET, SOCK_STREAM, 0);
130+
if (shub->output < 0) {
131+
shub->params->error_handler("Failed to create inet socket", severity);
132+
goto TryNextHost;
135133
}
136-
if (rc < 0) {
137-
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) {
138-
shub->params->error_handler("Connection can not be establish", SHUB_FATAL_ERROR);
134+
while (1) {
135+
int rc = -1;
136+
for (i = 0; i < n_addrs; ++i) {
137+
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
138+
do {
139+
rc = connect(shub->output, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
140+
} while (rc < 0 && errno == EINTR);
141+
142+
if (rc >= 0 || errno == EINPROGRESS) {
143+
break;
144+
}
145+
}
146+
if (rc < 0) {
147+
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) {
148+
shub->params->error_handler("Connection can not be establish", severity);
149+
goto TryNextHost;
150+
}
151+
if (max_attempts-- != 0) {
152+
sleep(1);
153+
} else {
154+
shub->params->error_handler("Failed to connect to host", severity);
155+
goto TryNextHost;
156+
}
157+
} else {
158+
int optval = 1;
159+
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
160+
FD_SET(shub->output, &shub->inset);
161+
if (sep != NULL) {
162+
*sep = ',';
163+
}
164+
return;
139165
}
140-
if (max_attempts-- != 0) {
141-
sleep(1);
142-
} else {
143-
shub->params->error_handler("Failed to connect to host", SHUB_FATAL_ERROR);
144-
}
145-
} else {
146-
int optval = 1;
147-
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
148-
FD_SET(shub->output, &shub->inset);
149-
break;
150166
}
167+
TryNextHost:
168+
*sep = ',';
169+
host = sep + 1;
151170
}
152171
}
153172

contrib/pg_dtm/tests/transfers.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ go run transfers.go \
55
-d 'dbname=postgres port=5433' \
66
-v \
77
-m \
8-
-u 10000 \
8+
-u 100000 \
99
-w 10 \
1010
-g

0 commit comments

Comments
 (0)