Skip to content

Commit 7e10729

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents 970a1b1 + aa2c91a commit 7e10729

File tree

20 files changed

+1216
-889
lines changed

20 files changed

+1216
-889
lines changed

contrib/pg_dtm/README

Lines changed: 49 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@ Distributed transaction management tools for PostgreSQL.
77
--------------------
88
Communication scheme
99
--------------------
10-
11-
.- Backend -.
12-
/ \
13-
/ \
14-
DTM ---- Backend ---- Coordinator
15-
\ /
16-
\ /
17-
`- Backend -´
18-
10+
┏━━━━━━━━━┓
11+
┌────────┨ Backend ┠──────────┐
12+
│ ┗━━━━━━━━━┛ │
13+
┏━━━━┷━━━━┓ ┏━━━━━━━━━┓ ┏━━━━━━┷━━━━━━┓
14+
┃ Arbiter ┠───┨ Backend ┠───┨ Coordinator ┃
15+
┗━━━━┯━━━━┛ ┗━━━━━━━━━┛ ┗━━━━━━┯━━━━━━┛
16+
│ ┏━━━━━━━━━┓ │
17+
└──┬─────┨ Backend ┠───────┬──┘
18+
┆ ┗━━━━━━━━━┛ ┆
19+
libdtm + libsockhub libpq + xtm procs
1920

2021
-----------------------
2122
Coordinator-Backend API
@@ -24,128 +25,80 @@ Coordinator-Backend API
2425
This API includes a set of postgres procedures that
2526
the coordinator can call with "select" statement.
2627

27-
-- Informs the DTM about a global transaction
28-
-- identified by the corresponding pairs of node:xid values.
29-
dtm_begin_transaction(nodes integer[], xids integer[]) RETURNS void
30-
31-
-- Causes the backend to get a snapshot from the DTM
32-
-- and merge it with the local snapshot.
33-
dtm_get_snapshot() RETURNS void
34-
35-
----------
36-
libdtm api
37-
----------
38-
39-
// Sets up the host and port for DTM connection.
40-
// The defaults are "127.0.0.1" and 5431.
41-
void TuneToDtm(char *host, int port);
42-
43-
void DtmInitSnapshot(Snapshot snapshot);
44-
45-
// Starts a new global transaction of nParticipants size. Returns the
46-
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
47-
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
48-
// otherwise.
49-
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot snapshot, TransactionId *gxmin);
50-
51-
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
52-
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.
53-
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *gxmin);
54-
55-
// Commits transaction only once all participants have called this function,
56-
// does not change CLOG otherwise. Set 'wait' to 'true' if you want this call
57-
// to return only after the transaction is considered finished by the DTM.
58-
// Returns the status on success, or -1 otherwise.
59-
XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait);
60-
61-
// Gets the status of the transaction identified by 'xid'. Returns the status
62-
// on success, or -1 otherwise. If 'wait' is true, then it does not return
63-
// until the transaction is finished.
64-
XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
65-
66-
// Reserves at least 'nXids' successive xids for local transactions. The xids
67-
// reserved are not less than 'xid' in value. Returns the actual number of xids
68-
// reserved, and sets the 'first' xid accordingly. The number of xids reserved
69-
// is guaranteed to be at least nXids.
70-
// In other words, *first ≥ xid and result ≥ nXids.
71-
// Also sets the 'active' snapshot, which is used as a container for the list
72-
// of active global transactions.
73-
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first, Snapshot active);
28+
FIXME: actualize the API
7429

75-
--------------------
76-
Backend-DTM Protocol
77-
--------------------
30+
------------------------
31+
Backend-Arbiter Protocol
32+
------------------------
7833

79-
The queries from backend to DTM should be formatted according to this syntax.
34+
The underlying protocol (libsockhub) also transmits the message length, so
35+
there is no need in 'argc'. Every command or reply is a series of int64
36+
numbers.
8037

81-
<char cmd><hex16 argc><hex16 argv[0]><hex16 argv[1]>...
38+
The format of all commands:
39+
[cmd, argv[0], argv[1], ...]
8240

83-
<cmd> is a character representing a command.
84-
<argc> is the number of arguments.
85-
<argv[i]> are the arguments.
41+
'cmd' is a command.
42+
'argv[i]' are the arguments.
8643

8744
The commands:
8845

8946
'r': reserve(minxid, minsize)
9047
Claims a sequence ≥ minsize of xids ≥ minxid for local usage. This will
91-
prevent DTM from using those values for global transactions. The
92-
'snapshot' represent the list of currently active global transactions.
48+
prevent the arbiter from using those values for global transactions.
9349

94-
The DTM replies with:
95-
'+'<hex16 min><hex16 max><snapshot> if reserved a range [min, max]
96-
'-' on failure
50+
The arbiter replies with:
51+
[RES_OK, min, max] if reserved a range [min, max]
52+
[RES_FAILED] on failure
9753

9854
'b': begin(size)
9955
Starts a global transaction and assign a 'xid' to it. 'size' is used
100-
for vote results calculation. The DTM also creates and returns the
56+
for vote results calculation. The arbiter also creates and returns the
10157
snapshot.
10258

103-
The DTM replies with:
104-
'+'<hex16 xid><snapshot> if transaction started successfully
105-
'-' on failure
59+
The arbiter replies with:
60+
[RES_OK, xid, *snapshot] if transaction started successfully
61+
[RES_FAILED] on failure
10662

10763
See the 'snapshot' command description for the snapshot format.
10864

10965
's': status(xid, wait)
110-
Asks the DTM about the status of the global transaction identified
66+
Asks the arbiter about the status of the global transaction identified
11167
by the given 'xid'.
11268

113-
If 'wait' is true, DTM will not reply until it considers the
69+
If 'wait' is 1, the arbiter will not reply until it considers the
11470
transaction finished (all nodes voted, or one dead).
11571

116-
The DTM replies with:
117-
"+0" if not started
118-
"+c" if committed
119-
"+a" if aborted
120-
"+?" if in progress
121-
'-' if failed
72+
The arbiter replies with:
73+
[RES_TRANSACTION_UNKNOWN] if not started
74+
[RES_TRANSACTION_COMMITTED] if committed
75+
[RES_TRANSACTION_ABORTED] if aborted
76+
[RES_TRANSACTION_INPROGRESS] if in progress
77+
[RES_FAILED] if failed
12278

12379
'y': for(xid, wait)
124-
Tells the DTM to vote for commit of the global transaction identified
125-
by the given 'xid'.
80+
Tells the arbiter that this node votes for commit of the global
81+
transaction identified by the given 'xid'.
12682

12783
The reply and 'wait' logic is the same as for the 'status' command.
12884

12985
'n': against(xid, wait)
130-
Tells the DTM to vote againts commit of the global transaction
131-
identified by the given 'xid'.
86+
Tells the arbiter that this node votes againts commit of the global
87+
transaction identified by the given 'xid'.
13288

13389
The reply and 'wait' logic is the same as for the 'status' command.
13490

13591
'h': snapshot(xid)
136-
Tells the DTM to generate a snapshot for the global transaction
137-
identified by the given 'xid'. The DTM will create a snapshot for every
138-
participant, so when each of them asks for the snapshot it will reply
139-
with the same snapshot. The DTM generates a fresh version if the same
140-
client asks for a snapshot again for the same transaction.
92+
Tells the arbiter to generate a snapshot for the global transaction
93+
identified by the given 'xid'. The arbiter will create a snapshot for
94+
every participant, so when each of them asks for the snapshot it will
95+
reply with the same snapshot. The arbiter generates a fresh version if
96+
the same client asks for a snapshot again for the same transaction.
14197

14298
Joins the global transaction identified by the given 'xid', if not
14399
joined already.
144100

145-
The DTM replies with '+' followed by a snapshot in the form:
146-
147-
<hex16 gxmin><hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
148-
149-
Where 'gxmin' is the smallest xmin among all available snapshots.
101+
The arbiter replies with [RES_OK, gxmin, xmin, xmax, xcnt, xip[0], xip[1]...],
102+
where 'gxmin' is the smallest xmin among all available snapshots.
150103

151-
In case of a failure, the DTM replies with '-'.
104+
In case of a failure, the arbiter replies with [RES_FAILED].

contrib/pg_dtm/dtmd/Makefile

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
CC=gcc
22
CFLAGS=-g -O2 -Wall -Iinclude -D_LARGEFILE64_SOURCE # -DDEBUG
3-
LIBUV_PREFIX=$(HOME)/libuv-build
4-
LIBUV_CFLAGS=-I"$(LIBUV_PREFIX)/include" -L"$(LIBUV_PREFIX)/lib"
5-
LIBUV_LDFLAGS=-luv -pthread #-lrt
3+
SOCKHUB_PREFIX=../sockhub
4+
SOCKHUB_CFLAGS=-I"$(SOCKHUB_PREFIX)"
5+
SOCKHUB_LDFLAGS=-lsockhub -L"$(SOCKHUB_PREFIX)"
66

77
SYSTEM=$(shell uname -s)
88
ifeq ($(SYSTEM),Darwin)
@@ -15,18 +15,18 @@ all: bin/dtmd
1515
@echo Done.
1616
@echo Feel free to run the tests with \'make check\'.
1717

18-
bin/dtmd: obj/eventwrap.o obj/main.o obj/parser.o obj/clog.o obj/clogfile.o obj/util.o obj/transaction.o obj/snapshot.o | bindir objdir
19-
$(CC) -o bin/dtmd $(CFLAGS) $(LIBUV_CFLAGS) \
20-
obj/eventwrap.o obj/main.o obj/parser.o \
18+
bin/dtmd: obj/server.o obj/main.o obj/clog.o obj/clogfile.o obj/util.o obj/transaction.o obj/snapshot.o | bindir objdir
19+
$(CC) -o bin/dtmd $(CFLAGS) \
20+
obj/server.o obj/main.o \
2121
obj/clog.o obj/clogfile.o obj/util.o obj/transaction.o \
2222
obj/snapshot.o \
23-
$(LIBUV_LDFLAGS)
23+
$(SOCKHUB_LDFLAGS)
2424

25-
obj/eventwrap.o: src/eventwrap.c | objdir
26-
$(CC) -c -o obj/eventwrap.o $(CFLAGS) $(LIBUV_CFLAGS) src/eventwrap.c
25+
obj/server.o: src/server.c | objdir
26+
$(CC) -c -o obj/server.o $(CFLAGS) $(SOCKHUB_CFLAGS) src/server.c
2727

28-
check: bin/util-test bin/clog-test bin/parser-test
29-
./check.sh util parser clog
28+
check: bin/util-test bin/clog-test
29+
./check.sh util clog
3030

3131
obj/%.o: src/%.c | objdir
3232
$(CC) $(CFLAGS) -c -o $@ $<
@@ -37,9 +37,6 @@ bin/util-test: obj/util-test.o obj/util.o | bindir
3737
bin/clog-test: obj/clog-test.o obj/clog.o obj/clogfile.o obj/util.o | bindir
3838
$(CC) -o bin/clog-test $(CFLAGS) obj/clog-test.o obj/clog.o obj/clogfile.o obj/util.o
3939

40-
bin/parser-test: obj/parser-test.o obj/parser.o | bindir
41-
$(CC) -o bin/parser-test $(CFLAGS) obj/parser-test.o obj/parser.o
42-
4340
bindir:
4441
mkdir -p bin
4542

contrib/pg_dtm/dtmd/include/clog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
#define INVALID_XID 0
1212
#define MIN_XID 42
13-
#define MAX_XID 0xdeadbeefcafebabe
13+
#define MAX_XID ~0
1414

1515
#define BLANK 0
1616
#define POSITIVE 1

contrib/pg_dtm/dtmd/include/int.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#ifndef INT_H
22
#define INT_H
33

4-
typedef unsigned long long xid_t;
4+
typedef unsigned xid_t;
55

66
#endif

contrib/pg_dtm/dtmd/include/limits.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
#define MAX_TRANSACTIONS_PER_CLIENT 1024
55
#define MAX_TRANSACTIONS 1024
66

7-
#define BITS_PER_NODE 4
8-
#define MAX_NODES (1 << BITS_PER_NODE)
7+
#define BUFFER_SIZE (64 * 1024)
8+
#define LISTEN_QUEUE_SIZE 100
9+
#define MAX_STREAMS 128
910

1011
#endif

contrib/pg_dtm/dtmd/include/parser.h

Lines changed: 0 additions & 47 deletions
This file was deleted.

contrib/pg_dtm/dtmd/include/proto.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#ifndef PROTO_H
2+
#define PROTO_H
3+
4+
#define CMD_RESERVE 'r'
5+
#define CMD_BEGIN 'b'
6+
#define CMD_FOR 'y'
7+
#define CMD_AGAINST 'n'
8+
#define CMD_SNAPSHOT 'h'
9+
#define CMD_STATUS 's'
10+
11+
#define RES_FAILED 0xDEADBEEF
12+
#define RES_OK 0xC0FFEE
13+
#define RES_TRANSACTION_COMMITTED 1
14+
#define RES_TRANSACTION_ABORTED 2
15+
#define RES_TRANSACTION_INPROGRESS 3
16+
#define RES_TRANSACTION_UNKNOWN 4
17+
18+
#endif

0 commit comments

Comments
 (0)