Skip to content

Commit ca8e910

Browse files
committed
Add arbiter.c
1 parent 6287e39 commit ca8e910

File tree

1 file changed

+237
-0
lines changed

1 file changed

+237
-0
lines changed

contrib/mmts/arbiter.c

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* multimaster.c
3+
*
4+
* Multimaster based on logical replication
5+
*
6+
*/
7+
8+
#include <unistd.h>
9+
#include <sys/time.h>
10+
#include <time.h>
11+
12+
#include "postgres.h"
13+
#include "fmgr.h"
14+
#include "miscadmin.h"
15+
#include "libpq-fe.h"
16+
#include "postmaster/postmaster.h"
17+
#include "postmaster/bgworker.h"
18+
#include "storage/s_lock.h"
19+
#include "storage/spin.h"
20+
#include "storage/lmgr.h"
21+
#include "storage/shmem.h"
22+
#include "storage/ipc.h"
23+
#include "access/xlogdefs.h"
24+
#include "access/xact.h"
25+
#include "access/xtm.h"
26+
#include "access/transam.h"
27+
#include "access/subtrans.h"
28+
#include "access/commit_ts.h"
29+
#include "access/xlog.h"
30+
#include "storage/proc.h"
31+
#include "storage/procarray.h"
32+
#include "executor/executor.h"
33+
#include "access/twophase.h"
34+
#include "utils/guc.h"
35+
#include "utils/hsearch.h"
36+
#include "utils/tqual.h"
37+
#include "utils/array.h"
38+
#include "utils/builtins.h"
39+
#include "utils/memutils.h"
40+
#include "commands/dbcommands.h"
41+
#include "miscadmin.h"
42+
#include "postmaster/autovacuum.h"
43+
#include "storage/pmsignal.h"
44+
#include "storage/proc.h"
45+
#include "utils/syscache.h"
46+
#include "replication/walsender.h"
47+
#include "replication/slot.h"
48+
#include "port/atomics.h"
49+
#include "tcop/utility.h"
50+
51+
#include "multimaster.h"
52+
53+
#define MAX_CONNECT_ATTEMPTS 10
54+
#define TX_BUFFER_SIZE 1024
55+
56+
typedef struct
57+
{
58+
TransactionId xid;
59+
csn_t csn;
60+
} DtmCommitMessage;
61+
62+
typedef struct
63+
{
64+
DtmCOmmitMessage buf[TX_BUFFER_SIZE];
65+
int used;
66+
} DtmTxBuffer;
67+
68+
static int* sockets;
69+
static DtmCommitMessage** txBuffers;
70+
71+
static BackgroundWorker DtmSender = {
72+
"mm-sender",
73+
0, /* do not need connection to the database */
74+
BgWorkerStart_PostmasterStart,
75+
1, /* restrart in one second (is it possible to restort immediately?) */
76+
DtmTransSender
77+
};
78+
79+
static BackgroundWorker DtmRecevier = {
80+
"mm-receiver",
81+
0, /* do not need connection to the database */
82+
BgWorkerStart_PostmasterStart,
83+
1, /* restrart in one second (is it possible to restort immediately?) */
84+
DtmTransReceiver
85+
};
86+
87+
void MMArbiterInitialize()
88+
{
89+
RegisterBackgroundWorker(&DtmSender);
90+
RegisterBackgroundWorker(&DtmRecevier);
91+
}
92+
93+
94+
static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned* n_addrs)
95+
{
96+
struct sockaddr_in sin;
97+
struct hostent* hp;
98+
unsigned i;
99+
100+
sin.sin_addr.s_addr = inet_addr(hostname);
101+
if (sin.sin_addr.s_addr != INADDR_NONE) {
102+
memcpy(&addrs[0], &sin.sin_addr.s_addr, sizeof(sin.sin_addr.s_addr));
103+
*n_addrs = 1;
104+
return 1;
105+
}
106+
107+
hp = gethostbyname(hostname);
108+
if (hp == NULL || hp->h_addrtype != AF_INET) {
109+
return 0;
110+
}
111+
for (i = 0; hp->h_addr_list[i] != NULL && i < *n_addrs; i++) {
112+
memcpy(&addrs[i], hp->h_addr_list[i], sizeof(addrs[i]));
113+
}
114+
*n_addrs = i;
115+
return 1;
116+
}
117+
118+
static int connectSocket(char const* host, int port)
119+
{
120+
struct sockaddr_in sock_inet;
121+
unsigned addrs[128];
122+
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
123+
int max_attempts = MAX_CONNECT_ATTEMPTS;
124+
int sd;
125+
126+
sock_inet.sin_family = AF_INET;
127+
sock_inet.sin_port = htons(port);
128+
129+
if (!resolve_host_by_name(host, addrs, &n_addrs)) {
130+
elog(ERROR, "Failed to resolve host '%s' by name", host);
131+
}
132+
sd = socket(AF_INET, SOCK_STREAM, 0);
133+
if (sd < 0) {
134+
elog(ERROR, "Failed to create socket: %d", errno);
135+
}
136+
while (1) {
137+
int rc = -1;
138+
for (i = 0; i < n_addrs; ++i) {
139+
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
140+
do {
141+
rc = connect(sd, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
142+
} while (rc < 0 && errno == EINTR);
143+
144+
if (rc >= 0 || errno == EINPROGRESS) {
145+
break;
146+
}
147+
}
148+
if (rc < 0) {
149+
if ((errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) || max_attempts == 0) {
150+
elog(ERROR, "Sockhub failed to connect to %s:%d: %d", host, port, errno);
151+
} else {
152+
max_attempts -= 1;
153+
sleep(1);
154+
}
155+
continue;
156+
} else {
157+
int optval = 1;
158+
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
159+
return sd;
160+
}
161+
}
162+
}
163+
164+
static void openConnections()
165+
{
166+
int nNodes = dtm->nNodes;
167+
int i;
168+
char* connStr = pstrdup(MMConnStrs);
169+
170+
sockets = (int*)palloc(sizeof(int)*nNodes);
171+
172+
for (i = 0; i < nNodes; i++) {
173+
char* host = strstr(connStr, "host=");
174+
char* end;
175+
if (host == NULL) {
176+
elog(ERROR, "Invalid connection string: '%s'", MMConnStrs);
177+
}
178+
for (end = host+5; *end != ' ' && *end != ',' && end != '\0'; end++);
179+
*end = '\0';
180+
connStr = end + 1;
181+
sockets[i] = i+1 != MMNodeId ? connectSocket(host, MMArbiterPort + i) : -1;
182+
}
183+
}
184+
185+
static void acceptConnections()
186+
{
187+
int nNodes = dtm->nNodes-1;
188+
sockaddr_in sock_inet;
189+
int i;
190+
int sd;
191+
int on = 1;
192+
193+
sockets = (int*)palloc(sizeof(int)*nNodes);
194+
195+
sock_inet.sin_family = AF_INET;
196+
sock_inet.sin_addr.s_addr = htonl(INADDR_ANY);
197+
sock_inet.sin_port = htons(MMArbiterPort + MMNodeId);
198+
199+
sd = socket(u.sock.sa_family, SOCK_STREAM, 0);
200+
if (sd < 0) {
201+
elog(ERROR, "Failed to create socket: %d", errno);
202+
}
203+
setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on);
204+
205+
if (bind(fd, (sockaddr*)&sock_init, nNodes-1) < 0) {
206+
elog(ERROR, "Failed to bind socket: %d", errno);
207+
}
208+
209+
for (i = 0; i < nNodes-1; i++) {
210+
sockets[i] = accept(sd, NULL, NULL);
211+
if (sockets[i] < 0) {
212+
elog(ERROR, "Failed to accept socket: %d", errno);
213+
}
214+
}
215+
}
216+
217+
static void DtmTransSender(Datum arg)
218+
{
219+
txBuffer = (DtmCommitMessage*)
220+
openConnections();
221+
222+
while (true) {
223+
DtmTransState* ts;
224+
PGSemaphoreLock(&dtm->semphore);
225+
226+
LWLockAcquire(&dtm->hashLock, LW_EXCLUSIVE);
227+
for (ts = dtm->pendingTransactions; ts != NULL; ts = ts->nextPending) {
228+
int node = ts->gtid.node;
229+
Assert(node != MMNodeId);
230+
sockets
231+
}
232+
233+
static void DtmTransReceiver(Datum arg)
234+
{
235+
acceptConnections();
236+
}
237+

0 commit comments

Comments
 (0)