Skip to content

Commit e518a0e

Browse files
committed
2 parents 5c44b4e + 269d8dd commit e518a0e

File tree

21 files changed

+568
-171
lines changed

21 files changed

+568
-171
lines changed

contrib/mmts/Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@ include $(top_builddir)/src/Makefile.global
2222
include $(top_srcdir)/contrib/contrib-global.mk
2323
endif
2424

25+
check:
26+
env DESTDIR='$(abs_top_builddir)'/tmp_install make install
27+
$(prove_check)
28+

contrib/mmts/TODO

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
TODO
2+
3+
* Disallow or do not replicate tables without pkeys.
4+
* Automate extension creation (?)
5+
* Database itn't usable right after pg_ctl -w. There are still several second before db will switch to operational mode.
6+
+ Statements without tx.
7+
* Disallow user-created MTM-* gid's.
8+
* Check configuration sanity for mm before actual startup: max_wal_senders, max_worker_processes, max_wal_senders, wal_level, max_replication_slots
9+
* Handle SIGQUIT
10+
* Move arbiter host/port to connstring
11+
12+
13+
14+
15+
16+
17+
18+
19+
20+
21+
22+
23+

contrib/mmts/arbiter.c

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,51 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
230230

231231

232232

233+
static void MtmSetSocketOptions(int sd)
234+
{
235+
#ifdef TCP_NODELAY
236+
int optval = 1;
237+
if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval)) < 0) {
238+
elog(WARNING, "Failed to set TCP_NODELAY: %m");
239+
}
240+
#endif
241+
if (tcp_keepalives_idle) {
242+
#ifdef TCP_KEEPIDLE
243+
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPIDLE,
244+
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle)) < 0)
245+
{
246+
elog(WARNING, "Failed to set TCP_KEEPIDLE: %m");
247+
}
248+
#else
249+
#ifdef TCP_KEEPALIVE
250+
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPALIVE,
251+
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle)) < 0)
252+
{
253+
elog(WARNING, "Failed to set TCP_KEEPALIVE: %m");
254+
}
255+
#endif
256+
#endif
257+
}
258+
#ifdef TCP_KEEPINTVL
259+
if (tcp_keepalives_interval) {
260+
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPINTVL,
261+
(char *) &tcp_keepalives_interval, sizeof(tcp_keepalives_interval)) < 0)
262+
{
263+
elog(WARNING, "Failed to set TCP_KEEPINTVL: %m");
264+
}
265+
}
266+
#endif
267+
#ifdef TCP_KEEPCNT
268+
if (tcp_keepalives_count) {
269+
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPCNT,
270+
(char *) &tcp_keepalives_count, sizeof(tcp_keepalives_count)) < 0)
271+
{
272+
elog(WARNING, "Failed to set TCP_KEEPCNT: %m");
273+
}
274+
}
275+
#endif
276+
}
277+
233278
static int MtmConnectSocket(char const* host, int port, int max_attempts)
234279
{
235280
struct sockaddr_in sock_inet;
@@ -274,12 +319,9 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
274319
}
275320
continue;
276321
} else {
277-
int optval = 1;
278322
MtmHandshakeMessage req;
279323
MtmArbiterMessage resp;
280-
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
281-
setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char const*)&optval, sizeof(optval));
282-
324+
MtmSetSocketOptions(sd);
283325
req.hdr.code = MSG_HANDSHAKE;
284326
req.hdr.node = MtmNodeId;
285327
req.hdr.dxid = HANDSHAKE_MAGIC;
@@ -306,10 +348,9 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
306348
/* Some node considered that I am dead, so switch to recovery mode */
307349
if (BIT_CHECK(resp.disabledNodeMask, MtmNodeId-1)) {
308350
elog(WARNING, "Node %d think that I am dead", resp.node);
351+
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
309352
MtmSwitchClusterMode(MTM_RECOVERY);
310353
}
311-
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
312-
Mtm->disabledNodeMask |= resp.disabledNodeMask;
313354
return sd;
314355
}
315356
}
@@ -335,7 +376,7 @@ static void MtmOpenConnections()
335376
}
336377
if (Mtm->nNodes < MtmNodes/2+1) { /* no quorum */
337378
elog(WARNING, "Node is out of quorum: only %d nodes from %d are accssible", Mtm->nNodes, MtmNodes);
338-
Mtm->status = MTM_OFFLINE;
379+
Mtm->status = MTM_IN_MINORITY;
339380
} else if (Mtm->status == MTM_INITIALIZATION) {
340381
MtmSwitchClusterMode(MTM_CONNECTED);
341382
}
@@ -389,6 +430,7 @@ static void MtmAcceptOneConnection()
389430
resp.dxid = HANDSHAKE_MAGIC;
390431
resp.sxid = ShmemVariableCache->nextXid;
391432
resp.csn = MtmGetCurrentTime();
433+
resp.node = MtmNodeId;
392434
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con, req.connStr);
393435
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
394436
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", resp.node);
@@ -605,7 +647,7 @@ static void MtmTransReceiver(Datum arg)
605647
} while (n < 0 && errno == EINTR);
606648
} while (n < 0 && MtmRecovery());
607649

608-
if (rc < 0) {
650+
if (n < 0) {
609651
elog(ERROR, "Arbiter failed to select sockets: %d", errno);
610652
}
611653
for (i = 0; i < nNodes; i++) {

0 commit comments

Comments
 (0)