Skip to content

Commit 9a76754

Browse files
committed
Implement DTM with local range reserving and common XIDs.
1 parent 3b14aea commit 9a76754

File tree

10 files changed

+442
-490
lines changed

10 files changed

+442
-490
lines changed

contrib/pg_xtm/README

Lines changed: 59 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -36,40 +36,25 @@ dtm_get_snapshot() RETURNS void
3636
libdtm api
3737
----------
3838

39-
typedef unsigned long long xid_t;
39+
void DtmInitSnapshot(Snapshot snapshot);
4040

41-
typedef int NodeId;
41+
// Starts new global transaction
42+
TransactionId DtmGlobalStartTransaction(int nParticipants, Snapshot shaposhot);
4243

43-
typedef struct {
44-
TransactionId* xids;
45-
NodeId* nodes;
46-
int nNodes;
47-
} GlobalTransactionId;
48-
49-
// Connects to the specified DTM.
50-
DTMConn DtmConnect(char *host, int port);
51-
52-
// Disconnects from the DTM. Do not use the 'dtm' pointer after this call, or
53-
// bad things will happen.
54-
void DtmDisconnect(DTMConn dtm);
55-
56-
// Creates an entry for a new global transaction. Returns 'true' on success, or
57-
// 'false' otherwise.
58-
bool DtmGlobalStartTransaction(DTMConn dtm, GlobalTransactionId* gtid);
59-
60-
// Asks DTM for a fresh snapshot. Returns 'true' on success, or 'false'
61-
// otherwise.
62-
bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapshot snapshot);
44+
// Get existed DTM snapshot.
45+
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot);
6346

6447
// Commits transaction only once all participants have called this function,
65-
// does not change CLOG otherwise. Returns 'true' on success, 'false' if
66-
// something failed on the daemon side.
67-
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status);
48+
// does not change CLOG otherwise.
49+
void DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait);
6850

6951
// Gets the status of the transaction identified by 'xid'. Returns the status
7052
// on success, or -1 otherwise. If 'wait' is true, then it does not return
7153
// until the transaction is finished.
72-
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, bool wait);
54+
XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
55+
56+
// Reserve XIDs for local transaction
57+
TransactioinId DtmGlobalReserve(int nXids);
7358

7459
--------------------
7560
Backend-DTM Protocol
@@ -85,56 +70,63 @@ The queries from backend to DTM should be formatted according to this syntax.
8570

8671
The commands:
8772

88-
'b': begin(size, node0, xid0, node1, xid1, ...)
89-
Starts a global transaction using 'xid0' on 'node0', 'xid1' on 'node1'
90-
and so on. The 'size' is the number of nodes, so for example if 'size'
91-
== 3 there are 6 values expected after it.
92-
93-
The DTM replies with '+' if transaction started, or '-' if failed.
73+
'r': reserve(minxid, minsize)
74+
Claims a sequence ≥ minsize of xids ≥ minxid for local usage. This will
75+
prevent DTM from using those values for global transactions.
9476

95-
'c': commit(node, xid, wait)
96-
Tells the DTM to vote for commit of the global transaction identified
97-
by the given 'node:xid' pair.
77+
The DTM replies with:
78+
'+'<hex16 min><hex16 max> if reserved a range [min, max]
79+
'-' on failure
9880

99-
The DTM replies with '+' if committed, or '-' if aborted or failed.
81+
'b': begin(size)
82+
Starts a global transaction and assign a 'xid' to it. 'size' is used
83+
for vote results calculation. The DTM also creates and returns the
84+
snapshot.
85+
86+
The DTM replies with:
87+
'+'<hex16 xid><snapshot> if transaction started successfully
88+
'-' on failure
89+
90+
See the 'snapshot' command description for the snapshot format.
91+
92+
's': status(xid, wait)
93+
Asks the DTM about the status of the global transaction identified
94+
by the given 'xid'.
10095

10196
If 'wait' is true, DTM will not reply until it considers the
102-
transaction finished (all nodes committed, or at least one aborted).
97+
transaction finished (all nodes voted, or one dead).
98+
99+
The DTM replies with:
100+
"+0" if not started
101+
"+c" if committed
102+
"+a" if aborted
103+
"+?" if in progress
104+
'-' if failed
105+
106+
'y': for(xid, wait)
107+
Tells the DTM to vote for commit of the global transaction identified
108+
by the given 'xid'.
109+
110+
The reply and 'wait' logic is the same as for the 'status' command.
103111

104-
'a': abort(node, xid)
112+
'n': against(xid, wait)
105113
Tells the DTM to vote againts commit of the global transaction
106-
identified by the given 'node:xid' pair. This query not have the 'wait'
107-
parameter, because the DTM will not wait for all votes if one is
108-
against the commit.
109-
110-
The DTM replies with '+' if aborted, or '-' if failed. The backend
111-
probably should ignore this reply anyway :)
112-
113-
'h': snapshot(node, xid)
114-
Tells the DTM to give a snapshot for the global transaction identified
115-
by the given 'node:xid' pair. The DTM will create a snapshot for every
116-
participant, so when they ask for the snapshot it will reply with the
117-
"same" snapshot. When a node asks for a snapshot once again, the DTM
118-
generates a fresh version for every participant. So be careful not to
119-
ask for a snapshot from the same node the second time, until all other
120-
nodes also ask for that snapshot.
114+
identified by the given 'xid'.
121115

122-
The DTM replies with '+' followed by a snapshot in the form:
116+
The reply and 'wait' logic is the same as for the 'status' command.
123117

124-
<hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
118+
'h': snapshot(xid)
119+
Tells the DTM to generate a snapshot for the global transaction
120+
identified by the given 'xid'. The DTM will create a snapshot for every
121+
participant, so when each of them asks for the snapshot it will reply
122+
with the same snapshot. The DTM generates a fresh version if the same
123+
client asks for a snapshot again for the same transaction.
125124

126-
In case of a failure, the DTM replies with '-'.
125+
Joins the global transaction identified by the given 'xid', if not
126+
joined already.
127127

128-
's': status(node, xid, wait)
129-
Asks the DTM about the status of the global transaction identified
130-
by the given 'node:xid' pair.
128+
The DTM replies with '+' followed by a snapshot in the form:
131129

132-
The DTM replies with:
133-
"+0" if not started;
134-
"+c" if committed;
135-
"+a" if aborted;
136-
"+?" if in progress;
137-
'-' if failed.
130+
<hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
138131

139-
If 'wait' is true, DTM will not reply until it considers the
140-
transaction finished (all nodes committed, or at least one aborted).
132+
In case of a failure, the DTM replies with '-'.

contrib/pg_xtm/dtmd/include/limits.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,4 @@
77
#define BITS_PER_NODE 4
88
#define MAX_NODES (1 << BITS_PER_NODE)
99

10-
#define MUX_XID(NODE, XID) (((XID) << (BITS_PER_NODE)) + NODE)
11-
1210
#endif

contrib/pg_xtm/dtmd/include/parser.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
#include "int.h"
77
#include "limits.h"
88

9+
#define CMD_RESERVE 'r'
910
#define CMD_BEGIN 'b'
10-
#define CMD_COMMIT 'c'
11-
#define CMD_ABORT 'a'
11+
#define CMD_FOR 'y'
12+
#define CMD_AGAINST 'n'
1213
#define CMD_SNAPSHOT 'h'
1314
#define CMD_STATUS 's'
1415

contrib/pg_xtm/dtmd/include/snapshot.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ typedef struct Snapshot {
99
xid_t xmax;
1010
int nactive;
1111
xid_t active[MAX_TRANSACTIONS];
12+
int times_sent;
1213
} Snapshot;
1314

1415
char *snapshot_serialize(Snapshot *s);

contrib/pg_xtm/dtmd/include/transaction.h

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,32 +9,29 @@
99

1010
#define MAX_SNAPSHOTS_PER_TRANS 8
1111

12+
#define CHAR_TO_INDEX(C) ((C) - 'a')
13+
1214
typedef struct Transaction {
13-
// true if the transaction was started on the node
14-
bool active;
15+
xid_t xid;
1516

16-
int client_id;
17-
int node;
18-
int vote;
17+
int size;
1918

20-
xid_t xid;
21-
Snapshot snapshot[MAX_SNAPSHOTS_PER_TRANS];
19+
// for + against ≤ size
20+
int votes_for;
21+
int votes_against;
2222

23-
// if this is equal to seqno, we need to generate a new snapshot (for each node)
24-
int snapshot_no;
25-
} Transaction;
23+
Snapshot snapshots[MAX_SNAPSHOTS_PER_TRANS];
24+
int snapshots_count; // will wrap around if exceeds max snapshots
2625

27-
#define CHAR_TO_INDEX(C) ((C) - 'a')
28-
typedef struct GlobalTransaction {
29-
int n_snapshots;
30-
Transaction participants[MAX_NODES];
3126
void *listeners[CHAR_TO_INDEX('z')]; // we are going to use 'a' to 'z' for indexing
32-
} GlobalTransaction;
27+
} Transaction;
3328

34-
int global_transaction_status(GlobalTransaction *gt);
35-
bool global_transaction_mark(clog_t clg, GlobalTransaction *gt, int status);
36-
void global_transaction_clear(GlobalTransaction *gt);
37-
void global_transaction_push_listener(GlobalTransaction *gt, char cmd, void *listener);
38-
void *global_transaction_pop_listener(GlobalTransaction *gt, char cmd);
29+
Snapshot *transaction_latest_snapshot(Transaction *t);
30+
Snapshot *transaction_snapshot(Transaction *t, int snapno);
31+
int transaction_status(Transaction *t);
32+
void transaction_clear(Transaction *t);
33+
void transaction_push_listener(Transaction *t, char cmd, void *listener);
34+
void *transaction_pop_listener(Transaction *t, char cmd);
35+
bool transaction_participate(Transaction *t, int clientid);
3936

4037
#endif

contrib/pg_xtm/dtmd/include/util.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
char *join_path(const char *dir, const char *file);
1616
bool inrange(xid_t min, xid_t x, xid_t max);
1717
int falloc(int fd, off64_t size);
18+
char *destructive_concat(char *a, char *b);
1819

1920
#ifndef DEBUG
2021
#define shout(...)

0 commit comments

Comments
 (0)