Skip to content

Commit 517d806

Browse files
committed
Make sockhub working again.
1 parent 0dc4435 commit 517d806

File tree

5 files changed

+81
-45
lines changed

5 files changed

+81
-45
lines changed

contrib/pg_dtm/dtmd/src/clog.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,5 +207,8 @@ xid_t clog_find_last_used(clog_t clog) {
207207
last_used = xid;
208208
}
209209
}
210+
if (last_used < MIN_XID) {
211+
last_used = MIN_XID;
212+
}
210213
return last_used;
211214
}

contrib/pg_dtm/libdtm.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ static DTMConn GetConnection()
310310
}
311311
else
312312
{
313-
elog(WARNING, "Failed to connect to DTMD at unix %d", c->port);
313+
elog(WARNING, "Failed to connect to DTMD at unix socket");
314314
}
315315
}
316316
}

contrib/pg_dtm/pg_dtm.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ static TransactionManager DtmTM = {
120120
};
121121

122122
static char *DtmServers;
123+
static char *DtmServersCopy;
123124
static int DtmBufferSize;
124125

125126
static BackgroundWorker DtmWorker = {
@@ -836,7 +837,7 @@ _PG_init(void)
836837

837838
DefineCustomStringVariable(
838839
"dtm.servers",
839-
"The space separated host:port pairs where DTM daemons reside",
840+
"The comma separated host:port pairs where DTM daemons reside",
840841
NULL,
841842
&DtmServers,
842843
"127.0.0.1:5431",
@@ -847,6 +848,7 @@ _PG_init(void)
847848
NULL // GucShowHook show_hook
848849
);
849850

851+
DtmServersCopy = strdup(DtmServers);
850852
if (DtmBufferSize != 0)
851853
{
852854
DtmGlobalConfig(DtmServers, Unix_socket_directories);
@@ -956,7 +958,7 @@ void DtmBackgroundWorker(Datum arg)
956958

957959
ShubInitParams(&params);
958960

959-
params.hosts = DtmServers;
961+
ShubParamsSetHosts(&params, DtmServersCopy);
960962
params.file = unix_sock_path;
961963
params.buffer_size = DtmBufferSize;
962964

contrib/pg_dtm/sockhub/sockhub.c

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,58 @@ void ShubInitParams(ShubParams* params)
3535
params->queue_size = 100;
3636
params->max_attempts = 10;
3737
params->error_handler = default_error_handler;
38+
params->leader = NULL;
3839
}
3940

41+
void ShubParamsSetHosts(ShubParams* params, char* hoststring)
42+
{
43+
char *hstate, *pstate;
44+
char *hostport, *host, *portstr;
45+
int port;
46+
47+
char *hosts = strdup(hoststring);
48+
fprintf(stderr, "sockhub parsing hosts = '%s'\n", hosts);
49+
hostport = strtok_r(hosts, ",", &hstate);
50+
51+
while (hostport) {
52+
fprintf(stderr, "hostport = '%s'\n", hostport);
53+
host = strtok_r(hostport, ":", &pstate);
54+
if (!host) {
55+
fprintf(stderr, "wrong host in host list\n");
56+
break;
57+
}
58+
59+
portstr = strtok_r(NULL, ":", &pstate);
60+
if (portstr) {
61+
port = atoi(portstr);
62+
} else {
63+
port = 5431;
64+
}
65+
66+
fprintf(stderr, "adding host %s:%d\n", host, port);
67+
host_t *h = malloc(sizeof(host_t));
68+
h->host = strdup(host);
69+
h->port = port;
70+
if (params->leader) {
71+
// update pointers from
72+
h->prev = params->leader->prev;
73+
h->next = params->leader;
74+
75+
// update pointers to
76+
h->prev->next = h;
77+
h->next->prev = h;
78+
} else {
79+
// the list is empty
80+
params->leader = h;
81+
h->prev = h;
82+
h->next = h;
83+
}
84+
85+
hostport = strtok_r(NULL, ",", &hstate);
86+
}
87+
88+
free(hosts);
89+
}
4090

4191
static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned* n_addrs)
4292
{
@@ -103,57 +153,37 @@ int ShubWriteSocket(int sd, void const* buf, int size)
103153
return 1;
104154
}
105155

106-
107156
static void reconnect(Shub* shub)
108157
{
109-
static int skip_hosts = 0;
110-
printf("will connect to host #%d\n", skip_hosts);
111-
112158
struct sockaddr_in sock_inet;
113159
unsigned addrs[128];
114160
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
115161
int max_attempts = shub->params->max_attempts;
116-
char *hosts = strdup(shub->params->hosts);
117162
if (shub->output >= 0) {
118163
close_socket(shub, shub->output);
119164
}
120165

121166
sock_inet.sin_family = AF_INET;
122167

123-
char *hstate, *pstate;
124-
char *hostport, *host, *portstr;
125-
int port;
126-
hostport = strtok_r(hosts, ",", &hstate);
127-
int hosti = 0;
128-
while (hostport) {
129-
ShubErrorSeverity severity = SHUB_RECOVERABLE_ERROR;
168+
while (shub->params->leader) {
169+
char *host = shub->params->leader->host;
170+
int port = shub->params->leader->port;
130171

131-
if (hosti < skip_hosts) {
132-
goto trynext;
133-
}
172+
fprintf(stderr, "shub leader = %s:%d\n", host, port);
134173

135-
host = strtok_r(hostport, ":", &pstate);
136-
if (!host) {
137-
severity = SHUB_FATAL_ERROR;
138-
break;
139-
}
174+
shub->params->leader = shub->params->leader->next;
140175

141-
portstr = strtok_r(NULL, ":", &pstate);
142-
if (portstr) {
143-
port = atoi(portstr);
144-
} else {
145-
port = 5431;
146-
}
176+
ShubErrorSeverity severity = SHUB_RECOVERABLE_ERROR;
147177
sock_inet.sin_port = htons(port);
148178

149179
if (!resolve_host_by_name(host, addrs, &n_addrs)) {
150180
shub->params->error_handler("Failed to resolve host by name", severity);
151-
goto trynext;
181+
continue;
152182
}
153183
shub->output = socket(AF_INET, SOCK_STREAM, 0);
154184
if (shub->output < 0) {
155185
shub->params->error_handler("Failed to create inet socket", severity);
156-
goto trynext;
186+
continue;
157187
}
158188
while (1) {
159189
int rc = -1;
@@ -170,32 +200,22 @@ static void reconnect(Shub* shub)
170200
if (rc < 0) {
171201
if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) {
172202
shub->params->error_handler("Connection can not be establish", severity);
173-
goto trynext;
203+
continue;
174204
}
175205
if (max_attempts-- != 0) {
176206
sleep(1);
177207
} else {
178208
shub->params->error_handler("Failed to connect to host", severity);
179-
goto trynext;
209+
continue;
180210
}
181211
} else {
182212
int optval = 1;
183213
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
184214
FD_SET(shub->output, &shub->inset);
185-
goto finish;
215+
return;
186216
}
187217
}
188-
trynext:
189-
hostport = strtok_r(NULL, ",", &hstate);
190-
hosti++;
191-
}
192-
finish:
193-
if (hosti < skip_hosts) {
194-
skip_hosts = 0;
195-
} else {
196-
skip_hosts++;
197218
}
198-
free(hosts);
199219
}
200220

201221
static void notify_disconnect(Shub* shub, int chan)

contrib/pg_dtm/sockhub/sockhub.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,22 @@ typedef enum
2424

2525
typedef void(*ShubErrorHandler)(char const* msg, ShubErrorSeverity severity);
2626

27+
typedef struct host_t
28+
{
29+
char *host;
30+
int port;
31+
struct host_t *next;
32+
struct host_t *prev;
33+
} host_t;
34+
2735
typedef struct
2836
{
2937
int buffer_size;
3038
int delay;
3139
int queue_size;
3240
int max_attempts;
3341
char const* file;
34-
char const* hosts;
42+
host_t *leader;
3543
ShubErrorHandler error_handler;
3644
} ShubParams;
3745

@@ -53,7 +61,10 @@ int ShubReadSocket(int sd, void* buf, int size);
5361
int ShubWriteSocket(int sd, void const* buf, int size);
5462

5563
void ShubInitParams(ShubParams* params);
64+
void ShubParamsSetHosts(ShubParams* params, char* hoststring);
5665
void ShubInitialize(Shub* shub, ShubParams* params);
5766
void ShubLoop(Shub* shub);
5867

5968
#endif
69+
70+
// vim: sts=4 ts=4 sw=4 expandtab

0 commit comments

Comments
 (0)