Skip to content

Commit 0582c5d

Browse files
committed
Implement an example application for libdtm.
1 parent 864cf87 commit 0582c5d

File tree

6 files changed

+121
-9
lines changed

6 files changed

+121
-9
lines changed

contrib/pg_xtm/dtmd/include/clogfile.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#define BITS_PER_COMMIT 2
1212
#define COMMIT_MASK ((1 << BITS_PER_COMMIT) - 1)
1313
#define COMMITS_PER_BYTE 4
14-
#define COMMITS_PER_FILE 1024 // 0x100000000
14+
#define COMMITS_PER_FILE 0x100000000
1515
#define BYTES_PER_FILE ((COMMITS_PER_FILE) / (COMMITS_PER_BYTE))
1616
#define XID_TO_FILEID(XID) ((XID) / (COMMITS_PER_FILE))
1717
#define XID_TO_OFFSET(XID) (((XID) % (COMMITS_PER_FILE)) / (COMMITS_PER_BYTE))

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ static char *onsnapshot(void *client, cmd_t *cmd) {
308308
}
309309
assert(t->sent_seqno < t->snapshot.seqno);
310310

311+
t->sent_seqno++;
311312
return snapshot_serialize(&t->snapshot);
312313
}
313314

contrib/pg_xtm/dtmd/src/transaction.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ bool global_transaction_mark(clog_t clg, GlobalTransaction *gt, int status) {
3838
Transaction *t = gt->participants + node;
3939
if (t->active) {
4040
assert(t->node == node);
41-
assert(t->vote == POSITIVE);
4241
if (!clog_write(clg, MUX_XID(node, t->xid), status)) {
4342
shout("clog write failed");
4443
return false;

contrib/pg_xtm/libdtm/src/example.c

Lines changed: 117 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,130 @@
33

44
#include "libdtm.h"
55

6+
#define NODES 5
7+
8+
void start_transaction(DTMConn conn, TransactionId base) {
9+
GlobalTransactionId gtid;
10+
gtid.nNodes = NODES;
11+
gtid.xids = malloc(sizeof(TransactionId) * gtid.nNodes);
12+
gtid.nodes = malloc(sizeof(NodeId) * gtid.nNodes);
13+
14+
int n;
15+
for (n = 0; n < gtid.nNodes; n++) {
16+
gtid.xids[n] = base + n;
17+
gtid.nodes[n] = n;
18+
}
19+
20+
if (!DtmGlobalStartTransaction(conn, &gtid)) {
21+
fprintf(stdout, "global transaction not started\n");
22+
exit(EXIT_FAILURE);
23+
}
24+
fprintf(stdout, "global transaction started\n");
25+
26+
free(gtid.xids);
27+
free(gtid.nodes);
28+
}
29+
30+
void commit_transaction(DTMConn conn, TransactionId base) {
31+
int n;
32+
for (n = 0; n < NODES; n++) {
33+
if (!DtmGlobalSetTransStatus(conn, n, base + n, TRANSACTION_STATUS_COMMITTED)) {
34+
fprintf(stdout, "global transaction not committed\n");
35+
exit(EXIT_FAILURE);
36+
}
37+
}
38+
fprintf(stdout, "global transaction committed\n");
39+
}
40+
41+
void abort_transaction(DTMConn conn, TransactionId base) {
42+
if (!DtmGlobalSetTransStatus(conn, 0, base + 0, TRANSACTION_STATUS_ABORTED)) {
43+
fprintf(stdout, "global transaction not aborted\n");
44+
exit(EXIT_FAILURE);
45+
}
46+
fprintf(stdout, "global transaction aborted\n");
47+
}
48+
49+
void show_snapshots(DTMConn conn, TransactionId base) {
50+
int i, n;
51+
for (n = 0; n < NODES; n++) {
52+
Snapshot s = malloc(sizeof(SnapshotData));
53+
s->xip = NULL;
54+
55+
if (!DtmGlobalGetSnapshot(conn, n, base + n, s)) {
56+
fprintf(stdout, "failed to get a snapshot[%d]\n", n);
57+
exit(EXIT_FAILURE);
58+
}
59+
fprintf(stdout, "snapshot[%d, %#x]: xmin = %#x, xmax = %#x, active =", n, base + n, s->xmin, s->xmax);
60+
for (i = 0; i < s->xcnt; i++) {
61+
fprintf(stdout, " %#x", s->xip[i]);
62+
}
63+
fprintf(stdout, "\n");
64+
65+
free(s->xip);
66+
free(s);
67+
}
68+
}
69+
70+
void show_status(DTMConn conn, TransactionId base) {
71+
int n;
72+
for (n = 0; n < NODES; n++) {
73+
XidStatus s = DtmGlobalGetTransStatus(conn, n, base + n);
74+
if (s == -1) {
75+
fprintf(stdout, "failed to get transaction status [%d, %#x]\n", n, base + n);
76+
exit(EXIT_FAILURE);
77+
}
78+
fprintf(stdout, "status[%d, %#x]: ", n, base + n);
79+
switch (s) {
80+
case TRANSACTION_STATUS_COMMITTED:
81+
fprintf(stdout, "committed\n");
82+
break;
83+
case TRANSACTION_STATUS_ABORTED:
84+
fprintf(stdout, "aborted\n");
85+
break;
86+
case TRANSACTION_STATUS_IN_PROGRESS:
87+
fprintf(stdout, "in progress\n");
88+
break;
89+
default:
90+
fprintf(stdout, "(error)\n");
91+
break;
92+
}
93+
}
94+
}
95+
696
int main() {
797
DTMConn conn = DtmConnect("localhost", 5431);
898
if (!conn) {
999
fprintf(stderr, "failed to connect to dtmd\n");
10100
exit(1);
11101
}
12102

13-
xid_t xid = 0xdeadbeefcafebabe;
14-
Snapshot s = malloc(sizeof(SnapshotData));
15-
16-
DtmGlobalGetSnapshot(conn, 0, xid, s);
17-
fprintf(stdout, "snapshot is %d xids wide\n", s->xcnt);
103+
TransactionId base0 = 0xdeadbeef;
104+
TransactionId base1 = base0 + NODES;
105+
TransactionId base2 = base1 + NODES;
18106

19-
DtmDisconnect(conn);
107+
start_transaction(conn, base0);
108+
show_snapshots(conn, base0);
109+
110+
start_transaction(conn, base1);
111+
show_snapshots(conn, base0);
112+
show_snapshots(conn, base1);
113+
114+
start_transaction(conn, base2);
115+
show_snapshots(conn, base0);
116+
show_snapshots(conn, base1);
117+
show_snapshots(conn, base2);
118+
119+
commit_transaction(conn, base0);
120+
show_snapshots(conn, base1);
121+
show_snapshots(conn, base2);
20122

21-
return 0;
123+
abort_transaction(conn, base1);
124+
show_snapshots(conn, base2);
125+
126+
show_status(conn, base0);
127+
show_status(conn, base1);
128+
show_status(conn, base2);
129+
130+
DtmDisconnect(conn);
131+
return EXIT_SUCCESS;
22132
}

contrib/pg_xtm/libdtm/src/libdtm.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../libdtm.c

contrib/pg_xtm/libdtm/src/libdtm.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../libdtm.h

0 commit comments

Comments
 (0)