Skip to content

Commit f80aef3

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents aa7a583 + 82d2e2d commit f80aef3

File tree

10 files changed

+423
-197
lines changed

10 files changed

+423
-197
lines changed

contrib/mmts/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o
2+
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o
33

44
override CPPFLAGS += -I../raftable
55

@@ -15,7 +15,7 @@ all: multimaster.so
1515
tests/dtmbench:
1616
make -C tests
1717

18-
PG_CPPFLAGS = -I$(libpq_srcdir) -DUSE_PGLOGICAL_OUTPUT
18+
PG_CPPFLAGS = -I$(libpq_srcdir)
1919
SHLIB_LINK = $(libpq)
2020

2121
ifdef USE_PGXS

contrib/mmts/README.md

Lines changed: 53 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,117 +1,53 @@
1-
# pg_dtm
2-
3-
### Design
4-
5-
This repo implements distributed transaction manager using Snapshot Sharing mechanism. General concepts and alternative approaches described in postgres wiki https://wiki.postgresql.org/wiki/DTM.
6-
7-
Backend-DTM protocol description can be found in [dtmd/README](dtmd/README).
8-
9-
### Installation
10-
11-
* Patch postgres using xtm.patch. After that build and install postgres in usual way.
12-
```bash
13-
cd ~/code/postgres
14-
patch -p1 < ~/code/pg_dtm/xtm.patch
15-
```
16-
* Install pg_dtm extension.
17-
```bash
18-
export PATH=/path/to/pgsql/bin/:$PATH
19-
cd ~/code/pg_dtm
20-
make && make install
21-
```
22-
* Run dtmd.
23-
```bash
24-
cd ~/code/pg_dtm/dtmd
25-
make
26-
mkdir /tmp/clog
27-
./bin/dtmd &
28-
```
29-
* To run something meaningful you need at leat two postgres instances. Also pg_dtm requires presense in ```shared_preload_libraries```.
30-
```bash
31-
initdb -D ./install/data1
32-
initdb -D ./install/data2
33-
echo "port = 5433" >> ./install/data2/postgresql.conf
34-
echo "shared_preload_libraries = 'pg_dtm'" >> ./install/data1/postgresql.conf
35-
echo "shared_preload_libraries = 'pg_dtm'" >> ./install/data2/postgresql.conf
36-
pg_ctl -D ./install/data1 -l ./install/data1/log start
37-
pg_ctl -D ./install/data2 -l ./install/data2/log start
38-
```
39-
40-
#### Automatic provisioning
41-
42-
For a cluster-wide deploy we use ansible, more details in tests/deploy_layouts. (Ansible instructions will be later)
43-
44-
### Usage
45-
46-
Now cluster is running and you can use global tx between two nodes. Let's connect to postgres instances at different ports:
47-
48-
```sql
49-
create extension pg_dtm; -- node1
50-
create table accounts(user_id int, amount int); -- node1
51-
insert into accounts (select 2*generate_series(1,100)-1, 0); -- node1, odd user_id's
52-
create extension pg_dtm; -- node2
53-
create table accounts(user_id int, amount int); -- node2
54-
insert into accounts (select 2*generate_series(1,100), 0); -- node2, even user_id's
55-
select dtm_begin_transaction(); -- node1, returns global xid, e.g. 42
56-
select dtm_join_transaction(42); -- node2, join global tx
57-
begin; -- node1
58-
begin; -- node2
59-
update accounts set amount=amount-100 where user_id=1; -- node1, transfer money from user#1
60-
update accounts set amount=amount+100 where user_id=2; -- node2, to user#2
61-
commit; -- node1, blocks until second commit happend
62-
commit; -- node2
63-
```
64-
65-
### Consistency testing
66-
67-
To ensure consistency we use simple bank test: perform a lot of simultaneous transfers between accounts on different servers, while constantly checking total amount of money on all accounts. This test can be found in tests/perf.
68-
69-
```bash
70-
> go run ./tests/perf/*
71-
-C value
72-
Connection string (repeat for multiple connections)
73-
-a int
74-
The number of bank accounts (default 100000)
75-
-b string
76-
Backend to use. Possible optinos: transfers, fdw, pgshard, readers. (default "transfers")
77-
-g Use DTM to keep global consistency
78-
-i Init database
79-
-l Use 'repeatable read' isolation level instead of 'read committed'
80-
-n int
81-
The number updates each writer (reader in case of Reades backend) performs (default 10000)
82-
-p Use parallel execs
83-
-r int
84-
The number of readers (default 1)
85-
-s int
86-
StartID. Script will update rows starting from this value
87-
-v Show progress and other stuff for mortals
88-
-w int
89-
The number of writers (default 8)
90-
```
91-
92-
So previous installation can be initialized with:
93-
```
94-
go run ./tests/perf/*.go \
95-
-C "dbname=postgres port=5432" \
96-
-C "dbname=postgres port=5433" \
97-
-g -i
98-
```
99-
and tested with:
100-
```
101-
go run ./tests/perf/*.go \
102-
-C "dbname=postgres port=5432" \
103-
-C "dbname=postgres port=5433" \
104-
-g
105-
```
106-
107-
### Using with postres_fdw.
108-
109-
We also provide a patch, that enables support of global transactions with postres_fdw. After patching and installing postres_fdw it is possible to run same test via fdw usig key ```-b fdw```.
110-
111-
### Using with pg_shard
112-
113-
Citus Data have branch in their pg_shard repo, that interacts with transaction manager. https://github.com/citusdata/pg_shard/tree/transaction_manager_integration
114-
To use this feature one should have following line in postgresql.conf (or set it via GUC)
115-
```
116-
pg_shard.use_dtm_transactions = 1
117-
```
1+
# Postgres Multimaster
2+
3+
## Testing
4+
5+
The testing process involves multiple modules that perform different tasks. The
6+
modules and their APIs are listed below.
7+
8+
### Modules
9+
10+
#### `combineaux`
11+
12+
Governs the whole testing process. Runs different workloads during different
13+
troubles.
14+
15+
#### `stresseaux`
16+
17+
Puts workloads against the database. Writes logs that are later used by
18+
`valideaux`.
19+
20+
* `start(id, workload, cluster)` - starts a `workload` against the `cluster`
21+
and call it `id`.
22+
* `stop(id)` - stops a previously started workload called `id`.
23+
24+
#### `starteaux`
25+
26+
Manages the database nodes.
27+
28+
* `deploy(driver, ...)` - deploys a cluster using the specified `driver` and
29+
other parameters specific to that driver. Returns a `cluster` instance that is
30+
used in other methods.
31+
* `cluster->up(id)` - adds a node named `id` to the `cluster`.
32+
* `cluster->down(id)` - removes a node named `id` from the `cluster`.
33+
* `cluster->drop(src, dst, ratio)` - drop `ratio` packets flowing from node
34+
`src` to node `dst`.
35+
* `cluster->delay(src, dst, msec)` - delay packets flowing from node `src` to
36+
node `dst` by `msec` milliseconds.
37+
38+
#### `troubleaux`
39+
40+
This is the troublemaker that messes with the network, nodes and time.
41+
42+
* `cause(cluster, trouble, ...)` - causes the specified `trouble` in the
43+
specified `cluster` with some trouble-specific parameters.
44+
* `fix(cluster)` - fixes all troubles caused in the `cluster`.
45+
46+
#### `valideaux`
47+
48+
Validates the logs of stresseaux.
49+
50+
#### `reporteaux`
51+
52+
Generates reports on the test results. This is usually a table that with
53+
`trouble` vs `workload` axes.

contrib/mmts/Testeaux.pm

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package Testeaux;
2+
3+
package combineaux
4+
{
5+
sub combine
6+
{
7+
my ($workloads, $troubles) = @_;
8+
9+
my $cluster = starteaux->deploy('lxc');
10+
11+
foreach my $workload (@$workloads)
12+
{
13+
foreach my $trouble (@$troubles)
14+
{
15+
print("run workload $workload during trouble $trouble\n");
16+
# FIXME: generate proper id instead of 'hello'
17+
stresseaux::start('hello', $workload, $cluster);
18+
# FIXME: add a time gap here
19+
troubleaux::cause($cluster, $trouble);
20+
# FIXME: add a time gap here
21+
stresseaux::stop('hello');
22+
troubleaux::fix($cluster);
23+
}
24+
}
25+
}
26+
}
27+
28+
package stresseaux
29+
{
30+
sub start
31+
{
32+
my ($id, $workload, $cluster) = @_;
33+
print("start stress $id: workload $workload, cluster $cluster\n");
34+
# fixme: implement
35+
}
36+
37+
sub stop
38+
{
39+
my $id = shift;
40+
print("stop stress $id\n");
41+
# FIXME: implement
42+
}
43+
}
44+
45+
package starteaux
46+
{
47+
sub deploy
48+
{
49+
my ($class, $driver, @args) = @_;
50+
my $self = {};
51+
print("deploy cluster using driver $driver\n");
52+
# fixme: implement
53+
return bless $self, 'starteaux';
54+
}
55+
56+
sub up
57+
{
58+
my ($self, $id) = @_;
59+
print("up node $id\n");
60+
# FIXME: implement
61+
}
62+
63+
sub down
64+
{
65+
my ($self, $id = @_;
66+
print("down node $id\n");
67+
# FIXME: implement
68+
}
69+
70+
sub drop
71+
{
72+
my ($self, $src, $dst, $ratio) = @_;
73+
print("drop $ratio packets from $src to $dst\n");
74+
# FIXME: implement
75+
}
76+
77+
sub delay
78+
{
79+
my ($self, $src, $dst, $msec) = @_;
80+
print("delay packets from $src to $dst by $msec msec\n");
81+
# FIXME: implement
82+
}
83+
}
84+
85+
1;

contrib/mmts/multimaster.c

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ static bool MtmIsRecoverySession;
147147

148148
static MtmCurrentTrans MtmTx;
149149

150+
static dlist_head MtmLsnMapping = DLIST_STATIC_INIT(MtmLsnMapping);
151+
150152
static TransactionManager MtmTM = {
151153
PgTransactionIdGetStatus,
152154
PgTransactionIdSetTreeStatus,
@@ -183,6 +185,7 @@ int MtmConnectTimeout;
183185
int MtmKeepaliveTimeout;
184186
int MtmReconnectAttempts;
185187
int MtmNodeDisableDelay;
188+
int MtmTransSpillThreshold;
186189
bool MtmUseRaftable;
187190
bool MtmUseDtm;
188191
MtmConnectionInfo* MtmConnections;
@@ -1033,6 +1036,7 @@ void MtmHandleApplyError(void)
10331036
kill(PostmasterPid, SIGQUIT);
10341037
break;
10351038
}
1039+
FreeErrorData(edata);
10361040
}
10371041

10381042

@@ -1243,7 +1247,9 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
12431247
for (i = 0; i < n; i++) {
12441248
for (j = 0; j < i; j++) {
12451249
matrix[i] |= ((matrix[j] >> i) & 1) << j;
1250+
matrix[j] |= ((matrix[i] >> j) & 1) << i;
12461251
}
1252+
matrix[i] &= ~((nodemask_t)1 << i);
12471253
}
12481254
return true;
12491255
}
@@ -1507,6 +1513,7 @@ static void MtmInitialize()
15071513
Mtm->nodes[i].transDelay = 0;
15081514
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
15091515
Mtm->nodes[i].con = MtmConnections[i];
1516+
Mtm->nodes[i].flushPos = 0;
15101517
}
15111518
PGSemaphoreCreate(&Mtm->votingSemaphore);
15121519
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -1626,6 +1633,21 @@ _PG_init(void)
16261633
if (!process_shared_preload_libraries_in_progress)
16271634
return;
16281635

1636+
DefineCustomIntVariable(
1637+
"multimaster.trans_spill_threshold",
1638+
"Maximal size (Mb) of transaction after which transaction is written to the disk",
1639+
NULL,
1640+
&MtmTransSpillThreshold,
1641+
1000, /* 1Gb */
1642+
0,
1643+
INT_MAX,
1644+
PGC_BACKEND,
1645+
0,
1646+
NULL,
1647+
NULL,
1648+
NULL
1649+
);
1650+
16291651
DefineCustomIntVariable(
16301652
"multimaster.twopc_min_timeout",
16311653
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
@@ -2084,6 +2106,45 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20842106
on_shmem_exit(MtmOnProcExit, 0);
20852107
}
20862108

2109+
XLogRecPtr MtmGetFlushPosition(int nodeId)
2110+
{
2111+
return Mtm->nodes[nodeId-1].flushPos;
2112+
}
2113+
2114+
void MtmUpdateLsnMapping(int node_id, XLogRecPtr end_lsn)
2115+
{
2116+
dlist_mutable_iter iter;
2117+
MtmFlushPosition* flushpos;
2118+
XLogRecPtr local_flush = GetFlushRecPtr();
2119+
MemoryContext old_context = MemoryContextSwitchTo(TopMemoryContext);
2120+
2121+
/* Track commit lsn */
2122+
flushpos = (MtmFlushPosition *) palloc(sizeof(MtmFlushPosition));
2123+
flushpos->node_id = node_id;
2124+
flushpos->local_end = XactLastCommitEnd;
2125+
flushpos->remote_end = end_lsn;
2126+
dlist_push_tail(&MtmLsnMapping, &flushpos->node);
2127+
2128+
MtmLock(LW_EXCLUSIVE);
2129+
dlist_foreach_modify(iter, &MtmLsnMapping)
2130+
{
2131+
flushpos = dlist_container(MtmFlushPosition, node, iter.cur);
2132+
if (flushpos->local_end <= local_flush)
2133+
{
2134+
if (Mtm->nodes[node_id-1].flushPos < local_flush) {
2135+
Mtm->nodes[node_id-1].flushPos = local_flush;
2136+
}
2137+
dlist_delete(iter.cur);
2138+
pfree(flushpos);
2139+
} else {
2140+
break;
2141+
}
2142+
}
2143+
MtmUnlock();
2144+
MemoryContextSwitchTo(old_context);
2145+
}
2146+
2147+
20872148
static void
20882149
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
20892150
{

0 commit comments

Comments
 (0)