Skip to content

Commit 4381f69

Browse files
committed
2 parents 9f59790 + 4843134 commit 4381f69

24 files changed

+621
-340
lines changed

contrib/mmts/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ cd ../../contrib/mmts && make install
6565

6666
### Docker
6767

68-
Directory contrib/mmts also includes docker-compose.yml that is capable of building multi-master and starting 3 node cluster.
68+
Directory contrib/mmts also includes docker-compose.yml that is capable of building multi-master and starting
69+
3 node cluster.
6970

7071
```sh
7172
cd contrib/mmts
@@ -136,7 +137,7 @@ Read description of all management functions at [functions](/contrib/mmts/doc/fu
136137

137138
* Commit latency.
138139
Current implementation of logical replication sends data to subscriber nodes only after local commit, so in case of
139-
heavy-write transaction user will wait for transaction processing two times: on local node and al other nodes
140+
heavy-write transaction user will wait for transaction processing two times: on local node and on all other nodes
140141
(simultaneosly). We have plans to address this issue in future.
141142

142143
* DDL replication.

contrib/mmts/arbiter.c

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ static void MtmSendHeartbeat()
360360
msg.node = MtmNodeId;
361361
msg.csn = now;
362362
if (last_sent_heartbeat != 0 && last_sent_heartbeat + MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2 < now) {
363-
MTM_LOG1("More than %ld microseconds since last heartbeat", now - last_sent_heartbeat);
363+
MTM_LOG1("More than %lld microseconds since last heartbeat", now - last_sent_heartbeat);
364364
}
365365
last_sent_heartbeat = now;
366366

@@ -377,7 +377,7 @@ static void MtmSendHeartbeat()
377377
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
378378
} else {
379379
if (last_heartbeat_to_node[i] + MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2 < now) {
380-
MTM_LOG1("Last heartbeat to node %d was sent %ld microseconds ago", i+1, now - last_heartbeat_to_node[i]);
380+
MTM_LOG1("Last heartbeat to node %d was sent %lld microseconds ago", i+1, now - last_heartbeat_to_node[i]);
381381
}
382382
last_heartbeat_to_node[i] = now;
383383
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
@@ -388,7 +388,7 @@ static void MtmSendHeartbeat()
388388
MtmReconnectNode(i+1); /* set reconnect mask to force node reconnent */
389389
//MtmOnNodeConnect(i+1);
390390
}
391-
MTM_LOG4("Send heartbeat to node %d with timestamp %ld", i+1, now);
391+
MTM_LOG4("Send heartbeat to node %d with timestamp %lld", i+1, now);
392392
}
393393
} else {
394394
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %s", i+1, (long long) busy_mask, MtmNodeStatusMnem[Mtm->status]);
@@ -940,7 +940,7 @@ static void MtmReceiver(Datum arg)
940940

941941
switch (msg->code) {
942942
case MSG_HEARTBEAT:
943-
MTM_LOG4("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
943+
MTM_LOG4("Receive HEARTBEAT from node %d with timestamp %lld delay %lld",
944944
node, msg->csn, USEC_TO_MSEC(MtmGetSystemTime() - msg->csn));
945945
continue;
946946
case MSG_POLL_REQUEST:
@@ -1017,13 +1017,13 @@ static void MtmReceiver(Datum arg)
10171017
}
10181018
ts = (MtmTransState*)hash_search(MtmXid2State, &msg->dxid, HASH_FIND, NULL);
10191019
if (ts == NULL) {
1020-
elog(WARNING, "Ignore response for unexisted transaction %d from node %d", msg->dxid, node);
1020+
elog(WARNING, "Ignore response for unexisted transaction %llu from node %d", (long64)msg->dxid, node);
10211021
continue;
10221022
}
10231023
Assert(msg->code == MSG_ABORTED || strcmp(msg->gid, ts->gid) == 0);
10241024
if (BIT_CHECK(ts->votedMask, node-1)) {
1025-
elog(WARNING, "Receive deteriorated %s response for transaction %d (%s) from node %d",
1026-
MtmMessageKindMnem[msg->code], ts->xid, ts->gid, node);
1025+
elog(WARNING, "Receive deteriorated %s response for transaction %s (%llu) from node %d",
1026+
MtmMessageKindMnem[msg->code], ts->gid, (long64)ts->xid, node);
10271027
continue;
10281028
}
10291029
BIT_SET(ts->votedMask, node-1);
@@ -1033,8 +1033,8 @@ static void MtmReceiver(Datum arg)
10331033
case MSG_PREPARED:
10341034
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_PREPARED");
10351035
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
1036-
elog(WARNING, "Receive PREPARED response for already committed transaction %d from node %d",
1037-
ts->xid, node);
1036+
elog(WARNING, "Receive PREPARED response for already committed transaction %llu from node %d",
1037+
(long64)ts->xid, node);
10381038
continue;
10391039
}
10401040
Mtm->nodes[node-1].transDelay += MtmGetCurrentTime() - ts->csn;
@@ -1043,8 +1043,8 @@ static void MtmReceiver(Datum arg)
10431043
if ((~msg->disabledNodeMask & Mtm->disabledNodeMask) != 0) {
10441044
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
10451045
commit on smaller subset of nodes */
1046-
elog(WARNING, "Coordinator of distributed transaction %s (%d) see less nodes than node %d: %llx instead of %llx",
1047-
ts->gid, ts->xid, node, (long long) Mtm->disabledNodeMask, (long long) msg->disabledNodeMask);
1046+
elog(WARNING, "Coordinator of distributed transaction %s (%llu) see less nodes than node %d: %llx instead of %llx",
1047+
ts->gid, (long64)ts->xid, node, Mtm->disabledNodeMask, msg->disabledNodeMask);
10481048
MtmAbortTransaction(ts);
10491049
}
10501050
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
@@ -1053,7 +1053,7 @@ static void MtmReceiver(Datum arg)
10531053
MtmWakeUpBackend(ts);
10541054
} else {
10551055
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1056-
MTM_LOG2("Transaction %s is prepared (status=%s participants=%lx disabled=%lx, voted=%lx)",
1056+
MTM_LOG2("Transaction %s is prepared (status=%s participants=%llx disabled=%llx, voted=%llx)",
10571057
ts->gid, MtmTxnStatusMnem[ts->status], ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
10581058
ts->isPrepared = true;
10591059
if (ts->isTwoPhase) {
@@ -1079,12 +1079,12 @@ static void MtmReceiver(Datum arg)
10791079
break;
10801080
case MSG_ABORTED:
10811081
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
1082-
elog(WARNING, "Receive ABORTED response for already committed transaction %d (%s) from node %d",
1083-
ts->xid, ts->gid, node);
1082+
elog(WARNING, "Receive ABORTED response for already committed transaction %s (%llu) from node %d",
1083+
ts->gid, (long64)ts->xid, node);
10841084
continue;
10851085
}
10861086
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1087-
MTM_LOG1("Arbiter receive abort message for transaction %s (%d)", ts->gid, ts->xid);
1087+
MTM_LOG1("Arbiter receive abort message for transaction %s (%llu)", ts->gid, (long64)ts->xid);
10881088
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
10891089
MtmAbortTransaction(ts);
10901090
}
@@ -1095,8 +1095,8 @@ static void MtmReceiver(Datum arg)
10951095
case MSG_PRECOMMITTED:
10961096
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_PRECOMMITTED");
10971097
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
1098-
elog(WARNING, "Receive PRECOMMITTED response for already committed transaction %d (%s) from node %d",
1099-
ts->xid, ts->gid, node);
1098+
elog(WARNING, "Receive PRECOMMITTED response for already committed transaction %s (%llu) from node %d",
1099+
ts->gid, (long64)ts->xid, node);
11001100
continue;
11011101
}
11021102
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
@@ -1111,8 +1111,8 @@ static void MtmReceiver(Datum arg)
11111111
}
11121112
} else {
11131113
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
1114-
elog(WARNING, "Receive PRECOMMITTED response for aborted transaction %d (%s) from node %d",
1115-
ts->xid, ts->gid, node); // How it can happen? SHould we use assert here?
1114+
elog(WARNING, "Receive PRECOMMITTED response for aborted transaction %s (%llu) from node %d",
1115+
ts->gid, (long64)ts->xid, node); // How it can happen? SHould we use assert here?
11161116
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
11171117
MtmWakeUpBackend(ts);
11181118
}
@@ -1160,7 +1160,7 @@ static void MtmReceiver(Datum arg)
11601160
if (!MtmWatchdog(now)) {
11611161
for (i = 0; i < nNodes; i++) {
11621162
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1163-
MTM_LOG1("Last heartbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1163+
MTM_LOG1("Last heartbeat from node %d received %lld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
11641164
}
11651165
}
11661166
}

contrib/mmts/bgwpool.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
#include "storage/s_lock.h"
55
#include "storage/spin.h"
66
#include "storage/pg_sema.h"
7+
#include "bkb.h"
78

89
typedef void(*BgwPoolExecutor)(void* work, size_t size);
910

10-
typedef uint64 timestamp_t;
11+
typedef ulong64 timestamp_t;
12+
1113

1214
#define MAX_DBNAME_LEN 30
1315
#define MAX_DBUSER_LEN 30

contrib/mmts/bkb.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66

77
#define MAX_NODES 64
88

9-
typedef uint64_t nodemask_t;
9+
typedef long long long64; /* we are not using int64 here because we want to use %lld format for this type */
10+
typedef unsigned long long ulong64; /* we are not using uint64 here because we want to use %lld format for this type */
11+
12+
typedef ulong64 nodemask_t;
13+
1014
#define BIT_CHECK(mask, bit) (((mask) & ((nodemask_t)1 << (bit))) != 0)
1115
#define BIT_CLEAR(mask, bit) (mask &= ~((nodemask_t)1 << (bit)))
1216
#define BIT_SET(mask, bit) (mask |= ((nodemask_t)1 << (bit)))

contrib/mmts/bkbtest.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ int main() {
1111
matrix[2] = 1;
1212
matrix[4] = 3;
1313
clique = MtmFindMaxClique(matrix, 64, &clique_size);
14-
printf("Clique=%lx\n", clique);
14+
printf("Clique=%llx\n", clique);
1515
return 0;
1616
}
1717

contrib/mmts/doc/architecture.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# `Multi-master architecture`
2+
3+
## Intro
4+
5+
Multi-master consists of two major subsystems: synchronous logical replication and arbiter process that is
6+
respostible for health checks and cluster recovery automation.
7+
8+
## Replication
9+
10+
When postgres loads multi-master shared library it sets up [[logical replication|logrep doc link]] producer an consumer to each node in the cluster and hooks into transaction commit pipeline. Since each server can accept writes it is possible that any server can abort transaction due to concurrent update - in the same way as it happens on a single server between different backends. Usual way of dealing with such situations is to perform transaction in two steps: first try to ensure that commit is possible (PREPARE stage) and if all nodes acknowledged that then we can finally commit. Postgres support such [[two-phase commit|https://www.postgresql.org/docs/9.6/static/sql-prepare-transaction.html]] procedure. So multi-master captures each commit statement and implicitly transforms it to PREPARE, waits when cohort (all nodes except our) will get that transaction via replication protocol and only after successfull responses from cohort finally commit it.
11+
12+
Also to be able to resist node crashes and network failures ordinary two-phase commit (2PC) is insufficient. When failure happens between PREPARE and COMMIT survived nodes may not have enough information to decide what to do with prepared transaction -- crashed node can already commit or abort that transaction, but didn't notified other nodes about that and such transaction will block resouces (hold locks) until recovery of crashed node. Otherwise if we decide to commit/abort transaction without knowing faled node's decision then we can end up with data inconsistencies in database when failed node will be recovered (e.g. failed node committed transaction but survived node aborted it).
13+
14+
To be able to deal with crashes E3PC commit protocol was used [1][2]. Main idea of 3PC-like protocols is to write intention to commit transaction before actual commit, introducing new message (PRECOMMIT) in protocol between PREPARE and COMMIT messages. That message is not used during normal work, but in case of failure all nodes have enough information to decide what to do with transaction using quorum-based voting procedure. For voting to complete protocol requires majority of nodes to be presenet, hence the rule that cluster of 2N+1 can tolerate N simultaneous failures.
15+
16+
This process summarized on following diagram:
17+
18+
![](https://cdn.rawgit.com/postgrespro/postgres_cluster/fac1e9fa/contrib/mmts/doc/mmts_commit.svg)
19+
20+
Here user, connected to a backend (BE) decides to commit his transaction. Multi-master extension hooks that commit and changes it to a PREPARE statement. During transaction execution walsender process (WS) already started to decode transaction to "reorder buffer", and by the time when PREPARE statement happend WS starting sending our transaction to all neighbouring nodes (cohort). Then cohort nodes applies that transaction in walreceiver process (WR) and, after succes, signaling arbbiter process (Arb on diagram, custom background worker implemented in multimaster) to send vote for transaction (prepared) on initiating node.
21+
Arbiter process on initiating node wait until all nodes from cohort will send vote for transaction; after that he send "precommit" messages and waits till all nodes will respond to that with "precommited" message.
22+
When all participating sites answered with "precommited" message arbiter signalling backend to stop waiting and commit our prepared transaction.
23+
After that commit WAL record reaches cohort nodes via walsender/walreceiver connections.
24+
25+
[1] Idit Keidar, Danny Dolev. Increasing the Resilience of Distributed and Replicated Database Systems. http://dx.doi.org/10.1006/jcss.1998.1566
26+
27+
[2] Tim Kempster, Colin Stirling, Peter Thanisch. A more committed quorum-based three phase commit protocol. http://dx.doi.org/10.1007/BFb0056487
28+
29+
30+
<!--
31+
32+
## DDL replication
33+
34+
Multi-master replicates such statements on statement-based level wrapping them as part of two-phase transaction.
35+
36+
## Sequences
37+
38+
-->
39+
40+
## Failure detection and recovery
41+
42+
While multi-master allows writes to each node it waits responses about transaction acknowledgement from all other nodes, so without special actions in case of failure of any node each commit will wait until failed node recovery. To deal with such kind of situations multi-master periodically send heartbeats to check health and connectivity between nodes. When several hearbeats to the node are lost in a row (see configuration parameters ```multimaster.heartbeat_recv_timeout``` and ```multimaster.heartbeat_send_timeout```) that node can be kicked out the cluster to allow writes to alive nodes.
43+
44+
For alive nodes there is no way to distinguish between faled node that stopped serving requests and network-partitioned node that isn't reacheable by other nodes, but can be reacheble by database users. So to protect from split-brain situations (conflicting writes to nodes in different network partitions) in case pf failure multi-master allow writes only to nodes that sees majority of other nodes. For example when 5-node multi-master cluster experienced failure that splitted network into two isolated subnets with 2 and 3 cluster nodes then multi-master based on heartbeats propagation info will continue to accept writes at each node in bigger patition and deny all writes in smaller one. Speking generaly cluster consisting from 2N+1 can tolerate N node failures and will be alive if any N+1 alive and connected to each other. In case of partial network split, when different nodes have different connectivity (for example in 3-node cluster when node B can't access node C, but node A can access both B and C) multi-master will find fully-connected subset of nodes and switch off other nodes. Each node maintance data structure that keeps status of all nodes from this node's point of view, that is accessible through ```mtm.get_nodes_state()``` system view.
45+
46+
When failed node connects back to the cluster recovery process is started. Recovering node will select one of the cluster nodes to apply changes that were made while node was offline. That process will continue till recovering catches up to ```multimaster.min_recovery_lag``` WAL lag (default: 100kB). After that all cluster locks for writes to allow recovery process to finish. After recovery is done returned node is promoted to online status and returned back to replication scheme as it was before failure. Such automatic recovery only possible when failed node WAL lag behind the working ones is not more then ```multimaster.max_recovery_lag```. When failed node's lag is bigger ```multimaster.max_recovery_lag``` then node should be manually recovered using pg_basebackup from one of the working nodes.

contrib/mmts/doc/mmts_commit.svg

Lines changed: 2 additions & 0 deletions
Loading

contrib/mmts/multimaster--1.0.sql

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ CREATE FUNCTION mtm.stop_replication() RETURNS void
99
AS 'MODULE_PATHNAME','mtm_stop_replication'
1010
LANGUAGE C;
1111

12-
CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURNS void
13-
AS 'MODULE_PATHNAME','mtm_drop_node'
12+
CREATE FUNCTION mtm.stop_node(node integer, drop_slot bool default false) RETURNS void
13+
AS 'MODULE_PATHNAME','mtm_stop_node'
1414
LANGUAGE C;
1515

1616
CREATE FUNCTION mtm.add_node(conn_str text) RETURNS void
1717
AS 'MODULE_PATHNAME','mtm_add_node'
1818
LANGUAGE C;
1919

20-
-- Create replication slot for the node which was previously dropped together with it's slot
20+
-- Create replication slot for the node which was previously stopped
2121
CREATE FUNCTION mtm.recover_node(node integer) RETURNS void
2222
AS 'MODULE_PATHNAME','mtm_recover_node'
2323
LANGUAGE C;
@@ -27,7 +27,7 @@ CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
2727
AS 'MODULE_PATHNAME','mtm_get_snapshot'
2828
LANGUAGE C;
2929

30-
CREATE FUNCTION mtm.get_csn(xid integer) RETURNS bigint
30+
CREATE FUNCTION mtm.get_csn(xid bigint) RETURNS bigint
3131
AS 'MODULE_PATHNAME','mtm_get_csn'
3232
LANGUAGE C;
3333

@@ -36,22 +36,22 @@ AS 'MODULE_PATHNAME','mtm_get_last_csn'
3636
LANGUAGE C;
3737

3838

39-
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "oldestSnapshot" bigint, "SenderPid" integer, "SenderStartTime" timestamp, "ReceiverPid" integer, "ReceiverStartTime" timestamp, "connStr" text, "connectivityMask" bigint);
39+
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "oldestSnapshot" bigint, "SenderPid" integer, "SenderStartTime" timestamp, "ReceiverPid" integer, "ReceiverStartTime" timestamp, "connStr" text, "connectivityMask" bigint, "stalled" bool, "stopped" bool);
4040

4141
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
4242
AS 'MODULE_PATHNAME','mtm_get_nodes_state'
4343
LANGUAGE C;
4444

4545
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "liveNodes" integer, "allNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer,
46-
"xidHashSize" bigint, "gidHashSize" bigint, "oldestXid" integer, "configChanges" integer);
46+
"xidHashSize" bigint, "gidHashSize" bigint, "oldestXid" bigint, "configChanges" integer, "stalledNodeMask" bigint, "stoppedNodeMask" bigint);
4747

48-
CREATE TYPE mtm.trans_state AS ("status" text, "gid" text, "xid" integer, "coordinator" integer, "gxid" integer, "csn" timestamp, "snapshot" timestamp, "local" boolean, "prepared" boolean, "active" boolean, "twophase" boolean, "votingCompleted" boolean, "participants" bigint, "voted" bigint, "configChanges" integer);
48+
CREATE TYPE mtm.trans_state AS ("status" text, "gid" text, "xid" bigint, "coordinator" integer, "gxid" bigint, "csn" timestamp, "snapshot" timestamp, "local" boolean, "prepared" boolean, "active" boolean, "twophase" boolean, "votingCompleted" boolean, "participants" bigint, "voted" bigint, "configChanges" integer);
4949

5050
CREATE FUNCTION mtm.get_trans_by_gid(git text) RETURNS mtm.trans_state
5151
AS 'MODULE_PATHNAME','mtm_get_trans_by_gid'
5252
LANGUAGE C;
5353

54-
CREATE FUNCTION mtm.get_trans_by_xid(xid integer) RETURNS mtm.trans_state
54+
CREATE FUNCTION mtm.get_trans_by_xid(xid bigint) RETURNS mtm.trans_state
5555
AS 'MODULE_PATHNAME','mtm_get_trans_by_xid'
5656
LANGUAGE C;
5757

@@ -79,7 +79,7 @@ CREATE FUNCTION mtm.inject_2pc_error(stage integer) RETURNS void
7979
AS 'MODULE_PATHNAME','mtm_inject_2pc_error'
8080
LANGUAGE C;
8181

82-
CREATE FUNCTION mtm.check_deadlock(xid integer) RETURNS boolean
82+
CREATE FUNCTION mtm.check_deadlock(xid bigint) RETURNS boolean
8383
AS 'MODULE_PATHNAME','mtm_check_deadlock'
8484
LANGUAGE C;
8585

0 commit comments

Comments
 (0)