Skip to content

Commit be47745

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents e728adb + a0be307 commit be47745

File tree

4 files changed

+707
-240
lines changed

4 files changed

+707
-240
lines changed

contrib/mmts/arbiter.c

Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
/*
2+
* multimaster.c
3+
*
4+
* Multimaster based on logical replication
5+
*
6+
*/
7+
8+
#include <unistd.h>
9+
#include <sys/time.h>
10+
#include <time.h>
11+
12+
#include "postgres.h"
13+
#include "fmgr.h"
14+
#include "miscadmin.h"
15+
#include "libpq-fe.h"
16+
#include "postmaster/postmaster.h"
17+
#include "postmaster/bgworker.h"
18+
#include "storage/s_lock.h"
19+
#include "storage/spin.h"
20+
#include "storage/lmgr.h"
21+
#include "storage/shmem.h"
22+
#include "storage/ipc.h"
23+
#include "access/xlogdefs.h"
24+
#include "access/xact.h"
25+
#include "access/xtm.h"
26+
#include "access/transam.h"
27+
#include "access/subtrans.h"
28+
#include "access/commit_ts.h"
29+
#include "access/xlog.h"
30+
#include "storage/proc.h"
31+
#include "storage/procarray.h"
32+
#include "executor/executor.h"
33+
#include "access/twophase.h"
34+
#include "utils/guc.h"
35+
#include "utils/hsearch.h"
36+
#include "utils/tqual.h"
37+
#include "utils/array.h"
38+
#include "utils/builtins.h"
39+
#include "utils/memutils.h"
40+
#include "commands/dbcommands.h"
41+
#include "miscadmin.h"
42+
#include "postmaster/autovacuum.h"
43+
#include "storage/pmsignal.h"
44+
#include "storage/proc.h"
45+
#include "utils/syscache.h"
46+
#include "replication/walsender.h"
47+
#include "replication/slot.h"
48+
#include "port/atomics.h"
49+
#include "tcop/utility.h"
50+
51+
#include "multimaster.h"
52+
53+
#define MAX_CONNECT_ATTEMPTS 10
54+
#define BUFFER_SIZE 1024
55+
#define BUFFER_SIZE 1024
56+
57+
typedef struct
58+
{
59+
TransactionId xid;
60+
csn_t csn;
61+
} DtmCommitMessage;
62+
63+
typedef struct
64+
{
65+
DtmCommitMessage data[BUFFER_SIZE];
66+
int used;
67+
} DtmBuffer;
68+
69+
static int* sockets;
70+
71+
static BackgroundWorker DtmSender = {
72+
"mm-sender",
73+
0, /* do not need connection to the database */
74+
BgWorkerStart_PostmasterStart,
75+
1, /* restrart in one second (is it possible to restort immediately?) */
76+
DtmTransSender
77+
};
78+
79+
static BackgroundWorker DtmRecevier = {
80+
"mm-receiver",
81+
0, /* do not need connection to the database */
82+
BgWorkerStart_PostmasterStart,
83+
1, /* restrart in one second (is it possible to restort immediately?) */
84+
DtmTransReceiver
85+
};
86+
87+
void MMArbiterInitialize()
88+
{
89+
RegisterBackgroundWorker(&DtmSender);
90+
RegisterBackgroundWorker(&DtmRecevier);
91+
}
92+
93+
94+
static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned* n_addrs)
95+
{
96+
struct sockaddr_in sin;
97+
struct hostent* hp;
98+
unsigned i;
99+
100+
sin.sin_addr.s_addr = inet_addr(hostname);
101+
if (sin.sin_addr.s_addr != INADDR_NONE) {
102+
memcpy(&addrs[0], &sin.sin_addr.s_addr, sizeof(sin.sin_addr.s_addr));
103+
*n_addrs = 1;
104+
return 1;
105+
}
106+
107+
hp = gethostbyname(hostname);
108+
if (hp == NULL || hp->h_addrtype != AF_INET) {
109+
return 0;
110+
}
111+
for (i = 0; hp->h_addr_list[i] != NULL && i < *n_addrs; i++) {
112+
memcpy(&addrs[i], hp->h_addr_list[i], sizeof(addrs[i]));
113+
}
114+
*n_addrs = i;
115+
return 1;
116+
}
117+
118+
#ifdef USE_EPOLL
119+
static int epollfd;
120+
#else
121+
static int max_fd;
122+
static fd_set inset;
123+
#endif
124+
125+
inline void registerSocket(int fd, int i)
126+
{
127+
#ifdef USE_EPOLL
128+
struct epoll_event ev;
129+
ev.events = EPOLLIN;
130+
ev.data.u32 = i;
131+
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
132+
char buf[ERR_BUF_SIZE];
133+
sprintf(buf, "Failed to add socket %d to epoll set", fd);
134+
shub->params->error_handler(buf, SHUB_FATAL_ERROR);
135+
}
136+
#else
137+
FD_SET(fd, &inset);
138+
if (fd > max_fd) {
139+
max_fd = fd;
140+
}
141+
#endif
142+
}
143+
144+
145+
146+
static int connectSocket(char const* host, int port)
147+
{
148+
struct sockaddr_in sock_inet;
149+
unsigned addrs[128];
150+
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
151+
int max_attempts = MAX_CONNECT_ATTEMPTS;
152+
int sd;
153+
154+
sock_inet.sin_family = AF_INET;
155+
sock_inet.sin_port = htons(port);
156+
157+
if (!resolve_host_by_name(host, addrs, &n_addrs)) {
158+
elog(ERROR, "Failed to resolve host '%s' by name", host);
159+
}
160+
sd = socket(AF_INET, SOCK_STREAM, 0);
161+
if (sd < 0) {
162+
elog(ERROR, "Failed to create socket: %d", errno);
163+
}
164+
while (1) {
165+
int rc = -1;
166+
for (i = 0; i < n_addrs; ++i) {
167+
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
168+
do {
169+
rc = connect(sd, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
170+
} while (rc < 0 && errno == EINTR);
171+
172+
if (rc >= 0 || errno == EINPROGRESS) {
173+
break;
174+
}
175+
}
176+
if (rc < 0) {
177+
if ((errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) || max_attempts == 0) {
178+
elog(ERROR, "Sockhub failed to connect to %s:%d: %d", host, port, errno);
179+
} else {
180+
max_attempts -= 1;
181+
sleep(1);
182+
}
183+
continue;
184+
} else {
185+
int optval = 1;
186+
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
187+
return sd;
188+
}
189+
}
190+
}
191+
192+
static void openConnections()
193+
{
194+
int nNodes = dtm->nNodes;
195+
int i;
196+
char* connStr = pstrdup(MMConnStrs);
197+
198+
sockets = (int*)palloc(sizeof(int)*nNodes);
199+
200+
for (i = 0; i < nNodes; i++) {
201+
char* host = strstr(connStr, "host=");
202+
char* end;
203+
if (host == NULL) {
204+
elog(ERROR, "Invalid connection string: '%s'", MMConnStrs);
205+
}
206+
for (end = host+5; *end != ' ' && *end != ',' && end != '\0'; end++);
207+
*end = '\0';
208+
connStr = end + 1;
209+
sockets[i] = i+1 != MMNodeId ? connectSocket(host, MMArbiterPort + i) : -1;
210+
}
211+
}
212+
213+
static void acceptConnections()
214+
{
215+
int nNodes = dtm->nNodes-1;
216+
sockaddr_in sock_inet;
217+
int i;
218+
int sd;
219+
int on = 1;
220+
221+
sockets = (int*)palloc(sizeof(int)*nNodes);
222+
223+
sock_inet.sin_family = AF_INET;
224+
sock_inet.sin_addr.s_addr = htonl(INADDR_ANY);
225+
sock_inet.sin_port = htons(MMArbiterPort + MMNodeId);
226+
227+
sd = socket(u.sock.sa_family, SOCK_STREAM, 0);
228+
if (sd < 0) {
229+
elog(ERROR, "Failed to create socket: %d", errno);
230+
}
231+
setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on);
232+
233+
if (bind(fd, (sockaddr*)&sock_init, nNodes-1) < 0) {
234+
elog(ERROR, "Failed to bind socket: %d", errno);
235+
}
236+
237+
for (i = 0; i < nNodes; i++) {
238+
int fd = accept(sd, NULL, NULL);
239+
if (fd < 0) {
240+
elog(ERROR, "Failed to accept socket: %d", errno);
241+
}
242+
registerSocket(fd, i);
243+
sockets[i] = fd;
244+
}
245+
}
246+
247+
static void WriteSocket(int sd, void const* buf, int size)
248+
{
249+
char* src = (char*)buf;
250+
while (size != 0) {
251+
int n = send(sd, src, size, 0);
252+
if (n <= 0) {
253+
return 0;
254+
}
255+
size -= n;
256+
src += n;
257+
}
258+
}
259+
260+
static int ReadSocket(int sd, void* buf, int buf_size)
261+
{
262+
int rc = recv(sd, buf, buf_size, 0);
263+
if (rc <= 0) {
264+
elog(ERROR, "Arbiter failed to read socket: %d", rc);
265+
}
266+
return rc;
267+
}
268+
269+
270+
static void DtmTransSender(Datum arg)
271+
{
272+
int nNodes = dtm->nNodes;
273+
int i;
274+
DtmTxBuffer* txBuffer = (DtmTxBuffer*)palloc(sizeof(DtmTxBuffer)*nNodes);
275+
276+
sockets = (int*)palloc(sizeof(int)*nNodes);
277+
278+
openConnections();
279+
280+
for (i = 0; i < nNodes; i++) {
281+
txBuffer[i].used = 0;
282+
}
283+
284+
while (true) {
285+
DtmTransState* ts;
286+
PGSemaphoreLock(&dtm->semphore);
287+
CHECK_FOR_INTERRUPTS();
288+
289+
SpinLockAcquire(&dtm->spinlock);
290+
ts = dtm->pendingTransactions;
291+
dtm->pendingTransactions = NULL;
292+
SpinLockRelease(&dtm->spinlock);
293+
294+
for (; ts != NULL; ts = ts->nextPending) {
295+
i = ts->gtid.node-1;
296+
Assert(i != MMNodeId);
297+
if (txBuffer[i].used == BUFFER_SIZE) {
298+
WriteSocket(sockets[i], txBuffer[i].data, txBuffer[i].used*sizeof(DtmCommitRequest));
299+
txBuffer[i].used = 0;
300+
}
301+
txBuffer[i].data[txBuffer[i].used].xid = ts->xid;
302+
txBuffer[i].data[txBuffer[i].used].csn = ts->csn;
303+
txBuffer[i].used += 1;
304+
}
305+
for (i = 0; i < nNodes; i++) {
306+
if (txBuffer[i].used != 0) {
307+
WriteSocket(sockets[i], txBuffer[i].data, txBuffer[i].used*sizeof(DtmCommitRequest));
308+
txBuffer[i].used = 0;
309+
}
310+
}
311+
}
312+
}
313+
314+
static void DtmTransReceiver(Datum arg)
315+
{
316+
int nNodes = dtm->nNodes-1;
317+
int i, j, rc;
318+
int rxBufPos = 0;
319+
DtmBuffer* rxBuffer = (DtmBuffer*)palloc(sizeof(DtmBuffer)*nNodes);
320+
HTAB* xid2state;
321+
322+
#ifdef USE_EPOLL
323+
struct epoll_event* events = (struct epoll_event*)palloc(SIZEOF(struct epoll_event)*nNodes);
324+
epollfd = epoll_create(nNodes);
325+
#else
326+
FD_ZERO(&inset);
327+
max_fd = 0;
328+
#endif
329+
330+
acceptConnections();
331+
xid2state = MMCreateHash();
332+
333+
for (i = 0; i < nNodes; i++) {
334+
txBuffer[i].used = 0;
335+
}
336+
337+
while (true) {
338+
#ifdef USE_EPOLL
339+
rc = epoll_wait(epollfd, events, MAX_EVENTS, shub->in_buffer_used == 0 ? -1 : shub->params->delay);
340+
if (rc < 0) {
341+
elog(ERROR, "epoll failed: %d", errno);
342+
}
343+
for (j = 0; j < rc; j++) {
344+
i = events[j].data.u32;
345+
if (events[j].events & EPOLLERR) {
346+
struct sockaddr_in insock;
347+
socklen_t len = sizeof(insock);
348+
getpeername(fd, (struct sockaddr*)&insock, &len);
349+
elog(WARNING, "Loose connection with %s", inet_ntoa(insock.sin_addr_));
350+
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
351+
}
352+
else if (events[j].events & EPOLLIN)
353+
#else
354+
fd_set events;
355+
events = inset;
356+
rc = select(max_fd+1, &events, NULL, NULL, NULL);
357+
if (rc < 0) {
358+
elog(ERROR, "select failed: %d", errno);
359+
}
360+
for (i = 0; i < nNodes; i++) {
361+
if (FD_ISSET(sockets[i], &events))
362+
#endif
363+
{
364+
int nResponses;
365+
rxBuffer[i].used += ReadSocket(sockets[i], (char*)rxBuffer[i].data + rxBuffer[i].used, RX_BUFFER_SIZE-rxBufPos);
366+
nResponses = rxBuffer[i].used/sizeof(DtmCommitRequest);
367+
368+
LWLockAcquire(&dtm->hashLock, LW_SHARED);
369+
370+
for (j = 0; j < nResponses; j++) {
371+
DtmCommitRequest* req = &rxBuffer[i].data[j];
372+
DtmTransState* ts = (DtmTransState*)hash_search(xid2state, &req->xid, HASH_FIND, NULL);
373+
Assert(ts != NULL);
374+
if (req->csn > ts->csn) {
375+
ts->csn = req->csn;
376+
}
377+
if (ts->nVotes == dtm->nNodes-1) {
378+
SetLatch(&ProcGlobal->allProcs[ts->pid].procLatch);
379+
}
380+
}
381+
if (rxBuffer[i].used != nResponses*sizeof(DtmCommitRequest)) {
382+
rxBuffer[i].used -= nResponses*sizeof(DtmCommitRequest);
383+
memmove(rxBuffer[i].data, (char*)rxBuffer[i].data + nResponses*sizeof(DtmCommitRequest), rxBuffer[i].used);
384+
}
385+
}
386+
}
387+
}
388+
}
389+

0 commit comments

Comments
 (0)