Skip to content

Commit e0de0b3

Browse files
committed
merge with pg_dtm
2 parents a8b2483 + fe5374d commit e0de0b3

29 files changed

+2905
-30
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ win32ver.rc
2727
*.exe
2828
lib*dll.def
2929
lib*.pc
30+
.DS_Store
3031

3132
# Local excludes in root directory
3233
/GNUmakefile
@@ -38,4 +39,3 @@ lib*.pc
3839
/Debug/
3940
/Release/
4041
/tmp_install/
41-

contrib/pg_dtm/Makefile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,10 @@ top_builddir = ../..
1717
include $(top_builddir)/src/Makefile.global
1818
include $(top_srcdir)/contrib/contrib-global.mk
1919
endif
20+
21+
# ifndef PG_CONFIG
22+
# PG_CONFIG = pg_config
23+
# endif
24+
25+
# PGXS := $(shell $(PG_CONFIG) --pgxs)
26+
# include $(PGXS)

contrib/pg_dtm/README.md

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# pg_dtm
2+
3+
### Design
4+
5+
This repo implements distributed transaction manager using Snapshot Sharing mechanism. General concepts and alternative approaches described in postgres wiki https://wiki.postgresql.org/wiki/DTM.
6+
7+
Backend-DTM protocol description can be found in [dtmd/README](dtmd/README).
8+
9+
### Installation
10+
11+
* Patch postgres using xtm.patch. After that build and install postgres in usual way.
12+
```bash
13+
cd ~/code/postgres
14+
patch -p1 < ~/code/pg_dtm/xtm.patch
15+
```
16+
* Install pg_dtm extension.
17+
```bash
18+
export PATH=/path/to/pgsql/bin/:$PATH
19+
cd ~/code/pg_dtm
20+
make && make install
21+
```
22+
* Run dtmd.
23+
```bash
24+
cd ~/code/pg_dtm/dtmd
25+
make
26+
mkdir /tmp/clog
27+
./bin/dtmd &
28+
```
29+
* To run something meaningful you need at leat two postgres instances. Also pg_dtm requires presense in ```shared_preload_libraries```.
30+
```bash
31+
initdb -D ./install/data1
32+
initdb -D ./install/data2
33+
echo "port = 5433" >> ./install/data2/postgresql.conf
34+
echo "shared_preload_libraries = 'pg_dtm'" >> ./install/data1/postgresql.conf
35+
echo "shared_preload_libraries = 'pg_dtm'" >> ./install/data2/postgresql.conf
36+
pg_ctl -D ./install/data1 -l ./install/data1/log start
37+
pg_ctl -D ./install/data2 -l ./install/data2/log start
38+
```
39+
40+
#### Automatic provisioning
41+
42+
For a cluster-wide deploy we use ansible, more details in tests/deploy_layouts. (Ansible instructions will be later)
43+
44+
### Usage
45+
46+
Now cluster is running and you can use global tx between two nodes. Let's connect to postgres instances at different ports:
47+
48+
```sql
49+
create extension pg_dtm; -- node1
50+
create table accounts(user_id int, amount int); -- node1
51+
insert into accounts (select 2*generate_series(1,100)-1, 0); -- node1, odd user_id's
52+
create extension pg_dtm; -- node2
53+
create table accounts(user_id int, amount int); -- node2
54+
insert into accounts (select 2*generate_series(1,100), 0); -- node2, even user_id's
55+
select dtm_begin_transaction(); -- node1, returns global xid, e.g. 42
56+
select dtm_join_transaction(42); -- node2, join global tx
57+
begin; -- node1
58+
begin; -- node2
59+
update accounts set amount=amount-100 where user_id=1; -- node1, transfer money from user#1
60+
update accounts set amount=amount+100 where user_id=2; -- node2, to user#2
61+
commit; -- node1, blocks until second commit happend
62+
commit; -- node2
63+
```
64+
65+
### Consistency testing
66+
67+
To ensure consistency we use simple bank test: perform a lot of simultaneous transfers between accounts on different servers, while constantly checking total amount of money on all accounts. This test can be found in tests/perf.
68+
69+
```bash
70+
> go run ./tests/perf/*
71+
-C value
72+
Connection string (repeat for multiple connections)
73+
-a int
74+
The number of bank accounts (default 100000)
75+
-b string
76+
Backend to use. Possible optinos: transfers, fdw, pgshard, readers. (default "transfers")
77+
-g Use DTM to keep global consistency
78+
-i Init database
79+
-l Use 'repeatable read' isolation level instead of 'read committed'
80+
-n int
81+
The number updates each writer (reader in case of Reades backend) performs (default 10000)
82+
-p Use parallel execs
83+
-r int
84+
The number of readers (default 1)
85+
-s int
86+
StartID. Script will update rows starting from this value
87+
-v Show progress and other stuff for mortals
88+
-w int
89+
The number of writers (default 8)
90+
```
91+
92+
So previous installation can be initialized with:
93+
```
94+
go run ./tests/perf/*.go \
95+
-C "dbname=postgres port=5432" \
96+
-C "dbname=postgres port=5433" \
97+
-g -i
98+
```
99+
and tested with:
100+
```
101+
go run ./tests/perf/*.go \
102+
-C "dbname=postgres port=5432" \
103+
-C "dbname=postgres port=5433" \
104+
-g
105+
```
106+
107+
### Using with postres_fdw.
108+
109+
We also provide a patch, that enables support of global transactions with postres_fdw. After patching and installing postres_fdw it is possible to run same test via fdw usig key ```-b fdw```.
110+
111+
### Using with pg_shard
112+
113+
Citus Data have branch in their pg_shard repo, that interacts with transaction manager. https://github.com/citusdata/pg_shard/tree/transaction_manager_integration
114+
To use this feature one should have following line in postgresql.conf (or set it via GUC)
115+
```
116+
pg_shard.use_dtm_transactions = 1
117+
```

contrib/pg_dtm/dtmd/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
bin/*
2+
obj/*

contrib/pg_dtm/dtmd/README

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
===
2+
xtm
3+
===
4+
5+
Distributed transaction management tools for PostgreSQL.
6+
7+
--------------------
8+
Communication scheme
9+
--------------------
10+
┏━━━━━━━━━┓
11+
┌────────┨ Backend ┠──────────┐
12+
│ ┗━━━━━━━━━┛ │
13+
┏━━━━┷━━━━┓ ┏━━━━━━━━━┓ ┏━━━━━━┷━━━━━━┓
14+
┃ Arbiter ┠───┨ Backend ┠───┨ Coordinator ┃
15+
┗━━━━┯━━━━┛ ┗━━━━━━━━━┛ ┗━━━━━━┯━━━━━━┛
16+
│ ┏━━━━━━━━━┓ │
17+
└──┬─────┨ Backend ┠───────┬──┘
18+
┆ ┗━━━━━━━━━┛ ┆
19+
libdtm + libsockhub libpq + xtm procs
20+
21+
-----------------------
22+
Coordinator-Backend API
23+
-----------------------
24+
25+
This API includes a set of postgres procedures that
26+
the coordinator can call with "select" statement.
27+
28+
FIXME: actualize the API
29+
30+
------------------------
31+
Backend-Arbiter Protocol
32+
------------------------
33+
34+
The underlying protocol (libsockhub) also transmits the message length, so
35+
there is no need in 'argc'. Every command or reply is a series of int64
36+
numbers.
37+
38+
The format of all commands:
39+
[cmd, argv[0], argv[1], ...]
40+
41+
'cmd' is a command.
42+
'argv[i]' are the arguments.
43+
44+
The commands:
45+
46+
'r': reserve(minxid, minsize)
47+
Claims a sequence ≥ minsize of xids ≥ minxid for local usage. This will
48+
prevent the arbiter from using those values for global transactions.
49+
50+
The arbiter replies with:
51+
[RES_OK, min, max] if reserved a range [min, max]
52+
[RES_FAILED] on failure
53+
54+
'b': begin(size)
55+
Starts a global transaction and assign a 'xid' to it. 'size' is used
56+
for vote results calculation. The arbiter also creates and returns the
57+
snapshot.
58+
59+
The arbiter replies with:
60+
[RES_OK, xid, *snapshot] if transaction started successfully
61+
[RES_FAILED] on failure
62+
63+
See the 'snapshot' command description for the snapshot format.
64+
65+
's': status(xid, wait)
66+
Asks the arbiter about the status of the global transaction identified
67+
by the given 'xid'.
68+
69+
If 'wait' is 1, the arbiter will not reply until it considers the
70+
transaction finished (all nodes voted, or one dead).
71+
72+
The arbiter replies with:
73+
[RES_TRANSACTION_UNKNOWN] if not started
74+
[RES_TRANSACTION_COMMITTED] if committed
75+
[RES_TRANSACTION_ABORTED] if aborted
76+
[RES_TRANSACTION_INPROGRESS] if in progress
77+
[RES_FAILED] if failed
78+
79+
'y': for(xid, wait)
80+
Tells the arbiter that this node votes for commit of the global
81+
transaction identified by the given 'xid'.
82+
83+
The reply and 'wait' logic is the same as for the 'status' command.
84+
85+
'n': against(xid, wait)
86+
Tells the arbiter that this node votes againts commit of the global
87+
transaction identified by the given 'xid'.
88+
89+
The reply and 'wait' logic is the same as for the 'status' command.
90+
91+
'h': snapshot(xid)
92+
Tells the arbiter to generate a snapshot for the global transaction
93+
identified by the given 'xid'. The arbiter will create a snapshot for
94+
every participant, so when each of them asks for the snapshot it will
95+
reply with the same snapshot. The arbiter generates a fresh version if
96+
the same client asks for a snapshot again for the same transaction.
97+
98+
Joins the global transaction identified by the given 'xid', if not
99+
joined already.
100+
101+
The arbiter replies with [RES_OK, gxmin, xmin, xmax, xcnt, xip[0], xip[1]...],
102+
where 'gxmin' is the smallest xmin among all available snapshots.
103+
104+
In case of a failure, the arbiter replies with [RES_FAILED].

contrib/pg_dtm/dtmd/include/limits.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#ifndef LIMITS_H
2+
#define LIMITS_H
3+
4+
#define MAX_TRANSACTIONS 4096
5+
6+
#define BUFFER_SIZE (256 * 1024)
7+
#define LISTEN_QUEUE_SIZE 100
8+
#define MAX_STREAMS 4096
9+
10+
#endif
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
*.a
2+
sockhub
3+
test-client
4+
test-async-client
5+
test-server

contrib/pg_dtm/sockhub/tests/Makefile

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
CC = gcc
2+
CFLAGS = -c -I. -I.. -Wall -O3 -g -fPIC
3+
LD = $(CC)
4+
LDFLAGS = -g
5+
AR = ar
6+
ARFLAGS = -cru
7+
8+
all: sockhub library tests
9+
10+
library: libsockhub.a
11+
12+
tests: test-client test-async-client test-server
13+
14+
sockhub.o: ../sockhub.c ../sockhub.h
15+
$(CC) $(CFLAGS) ../sockhub.c
16+
17+
sockhup_main.o: sockhub_main.c sockhub.h
18+
$(CC) $(CFLAGS) sockhub_main.c
19+
20+
libsockhub.a: sockhub.o
21+
rm -f libsockhub.a
22+
$(AR) $(ARFLAGS) libsockhub.a sockhub.o
23+
24+
sockhub: sockhub_main.o libsockhub.a
25+
$(LD) $(LDFLAGS) -o sockhub sockhub_main.o libsockhub.a
26+
27+
test-client.o: test-client.c ../sockhub.h
28+
$(CC) $(CFLAGS) test-client.c
29+
30+
test-client: test-client.o libsockhub.a
31+
$(LD) $(LDFLAGS) -o test-client test-client.o libsockhub.a
32+
33+
test-async-client.o: test-async-client.c ../sockhub.h
34+
$(CC) $(CFLAGS) test-async-client.c
35+
36+
test-async-client: test-async-client.o libsockhub.a
37+
$(LD) $(LDFLAGS) -o test-async-client test-async-client.o libsockhub.a
38+
39+
test-server.o: test-server.c ../sockhub.h
40+
$(CC) $(CFLAGS) test-server.c
41+
42+
test-server: test-server.o libsockhub.a
43+
$(LD) $(LDFLAGS) -o test-server test-server.o libsockhub.a
44+
45+
clean:
46+
rm -f *.o *.a
47+
48+
tgz: clean
49+
cd .. ; tar cvzf sockhub.tgz sockhub
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include "sockhub.h"
4+
5+
int main(int argc, char* argv[])
6+
{
7+
int i;
8+
Shub shub;
9+
ShubParams params;
10+
11+
ShubInitParams(&params);
12+
13+
for (i = 1; i < argc-1; i++) {
14+
if (argv[i][0] == '-') {
15+
switch (argv[i][1]) {
16+
case 'h':
17+
params.host = argv[++i];
18+
continue;
19+
case 'p':
20+
params.port = atoi(argv[++i]);
21+
continue;
22+
case 'f':
23+
params.file = argv[++i];
24+
continue;
25+
case 'd':
26+
params.delay = atoi(argv[++i]);
27+
continue;
28+
case 'b':
29+
params.buffer_size = atoi(argv[++i]);
30+
continue;
31+
case 'r':
32+
params.max_attempts = atoi(argv[++i]);
33+
continue;
34+
}
35+
}
36+
Usage:
37+
fprintf(stderr, "sockhub: combine several local unix socket connections into one inet socket\n"
38+
"Options:\n"
39+
"\t-h HOST\tremote host name\n"
40+
"\t-p PORT\tremote port\n"
41+
"\t-f FILE\tunix socket file name\n"
42+
"\t-d DELAY\tdelay for waiting income requests (milliseconds)\n"
43+
"\t-b SIZE\tbuffer size\n"
44+
"\t-q SIZE\tlisten queue size\n"
45+
"\t-r N\tmaximun connect attempts\n"
46+
);
47+
48+
return 1;
49+
}
50+
if (params.host == NULL || params.file == NULL || i != argc) {
51+
goto Usage;
52+
}
53+
54+
ShubInitialize(&shub, &params);
55+
56+
ShubLoop(&shub);
57+
58+
return 0;
59+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
n_clients=8
2+
n_iters=10000
3+
pkill -9 sockub
4+
pkill -9 test-async-client
5+
./sockhub -h $1 -p 5001 -f /tmp/p5002 &
6+
for ((i=0;i<n_clients;i++))
7+
do
8+
./test-async-client -h localhost -p 5002 -i $n_iters &
9+
done
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
n_clients=10
2+
n_iters=100000
3+
for ((i=0;i<n_clients;i++))
4+
do
5+
./test-client $1 5001 $n_iters &
6+
done
7+
wait

0 commit comments

Comments
 (0)