Skip to content

Commit 3f778c1

Browse files
committed
Yet another not working version of XTM
2 parents 2485a59 + cf6ae89 commit 3f778c1

File tree

12 files changed

+157
-77
lines changed

12 files changed

+157
-77
lines changed

contrib/pg_xtm/README

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
===
2-
dtm
2+
xtm
33
===
44

55
Distributed transaction management tools for PostgreSQL.
@@ -32,30 +32,37 @@ libdtm api
3232

3333
typedef unsigned long long xid_t;
3434

35+
typedef int NodeId;
36+
37+
typedef struct {
38+
TransactionId* xids;
39+
NodeId* nodes;
40+
int nNodes;
41+
} GlobalTransactionId;
42+
3543
// Connects to the specified DTM.
3644
DTMConn DtmConnect(char *host, int port);
3745

3846
// Disconnects from the DTM. Do not use the 'dtm' pointer after this call, or
3947
// bad things will happen.
4048
void DtmDisconnect(DTMConn dtm);
4149

42-
// Asks DTM for a fresh snapshot. Returns a snapshot on success, or NULL
43-
// otherwise. Please free the snapshot memory yourself after use.
44-
Snapshot DtmGlobalGetSnapshot(DTMConn dtm);
45-
46-
// Starts a transaction. Returns the 'gxid' on success, or INVALID_GXID otherwise.
47-
xid_t DtmGlobalBegin(DTMConn dtm);
48-
49-
// Marks a given transaction as 'committed'. Returns 'true' on success,
50+
// Creates an entry for a new global transaction. Returns 'true' on success, or
5051
// 'false' otherwise.
51-
bool DtmGlobalCommit(DTMConn dtm, xid_t gxid);
52+
bool DtmGlobalStartTransaction(DTMConn dtm, GlobalTransactionId* gtid);
53+
54+
// Asks DTM for a fresh snapshot. Returns 'true' on success, or 'false'
55+
// otherwise.
56+
bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapshot snapshot);
5257

53-
// Marks a given transaction as 'aborted'.
54-
void DtmGlobalRollback(DTMConn dtm, xid_t gxid);
58+
// Commits transaction only once all participants have called this function,
59+
// does not change CLOG otherwise. Returns 'true' on success, 'false' if
60+
// something failed on the daemon side.
61+
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status);
5562

56-
// Gets the status of the transaction identified by 'gxid'. Returns the status
63+
// Gets the status of the transaction identified by 'xid'. Returns the status
5764
// on success, or -1 otherwise.
58-
int DtmGlobalGetTransStatus(DTMConn dtm, xid_t gxid);
65+
XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid);
5966

6067
--------------------
6168
Backend-DTM Protocol
@@ -80,11 +87,8 @@ DTM <--> Backend:
8087
-> '-' - "something went wrong"
8188

8289
<- 's'<hex16 gxid> - "status"
83-
-> '+''c|a|?' - "here is the transaction status"
84-
(c)ommitted, (a)borted or (?)unknown
90+
-> '+''0|c|a|?' - "here is the transaction status"
91+
(0)unknown, (c)committed, (a)aborted, or (?)inprogress
8592
-> '-' - "something went wrong"
8693

8794
<snapshot> = <hex16 xmin><hex16 xmax><hex16 n><hex16 active[n]>
88-
89-
Backend disconnection is considered as an abort of all incomplete transactions
90-
started by that backend.

contrib/pg_xtm/dtmd/include/clog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
#define MIN_XID 42
1313
#define MAX_XID 0xdeadbeefcafebabe
1414

15-
#define NEUTRAL 0
15+
#define BLANK 0
1616
#define POSITIVE 1
1717
#define NEGATIVE 2
18+
#define DOUBT 3
1819

1920
typedef struct clog_data_t *clog_t;
2021

contrib/pg_xtm/dtmd/include/util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ char *join_path(const char *dir, const char *file);
1616
bool inrange(xid_t min, xid_t x, xid_t max);
1717
int falloc(int fd, off64_t size);
1818

19-
#if 0
19+
#ifdef NDEBUG
2020
#define shout(...)
2121
#else
2222
#define shout(...) \

contrib/pg_xtm/dtmd/src/clog-test.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ bool test_clog(char *datadir) {
2222
printf("commit %d status %d\n", 42, clog_read(clog, 42));
2323
printf("commit %d status %d\n", 1000, clog_read(clog, 1000));
2424
if (!clog_write(clog, 1000, POSITIVE)) return false;
25-
if (!clog_write(clog, 1500, NEGATIVE)) return false;
25+
if (!clog_write(clog, 1500, DOUBT)) return false;
2626

2727
if (!clog_close(clog)) return false;
2828
if (!(clog = clog_open(datadir))) return false;
@@ -35,11 +35,11 @@ bool test_clog(char *datadir) {
3535
printf("commit %d status %d (should be 1)\n", 1000, status = clog_read(clog, 1000));
3636
if (status != POSITIVE) return false;
3737

38-
printf("commit %d status %d (should be 2)\n", 1500, status = clog_read(clog, 1500));
39-
if (status != NEGATIVE) return false;
38+
printf("commit %d status %d (should be 3)\n", 1500, status = clog_read(clog, 1500));
39+
if (status != DOUBT) return false;
4040

4141
printf("commit %d status %d (should be 0)\n", 2044, status = clog_read(clog, 2044));
42-
if (status != NEUTRAL) return false;
42+
if (status != BLANK) return false;
4343

4444
if (!clog_close(clog)) return false;
4545

contrib/pg_xtm/dtmd/src/clog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ int clog_read(clog_t clog, xid_t xid) {
124124
"you might be experiencing a bug in backend\n",
125125
xid
126126
);
127-
return NEUTRAL;
127+
return BLANK;
128128
}
129129
}
130130

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ static void free_client_data(client_data_t *cd) {
4545
int next_client_id = 0;
4646
static void onconnect(void **client) {
4747
*client = create_client_data(next_client_id++);
48-
//shout("[%d] connected\n", CLIENT_ID(*client));
48+
shout("[%d] connected\n", CLIENT_ID(*client));
4949
}
5050

5151
static void ondisconnect(void *client) {
52-
//shout("[%d] disconnected\n", CLIENT_ID(client));
52+
shout("[%d] disconnected\n", CLIENT_ID(client));
5353
free_client_data(client);
5454
}
5555

@@ -60,6 +60,9 @@ static void clear_global_transaction(GlobalTransaction *t) {
6060
}
6161
}
6262

63+
#ifdef NDEBUG
64+
#define shout_cmd(...)
65+
#else
6366
static void shout_cmd(void *client, cmd_t *cmd) {
6467
char *cmdname;
6568
switch (cmd->cmd) {
@@ -77,6 +80,7 @@ static void shout_cmd(void *client, cmd_t *cmd) {
7780
}
7881
shout("\n");
7982
}
83+
#endif
8084

8185
static char *onbegin(void *client, cmd_t *cmd) {
8286
if (transactions_count >= MAX_TRANSACTIONS) {
@@ -117,7 +121,7 @@ static char *onbegin(void *client, cmd_t *cmd) {
117121
int node = cmd->argv[i * 2 + 1];
118122
xid_t xid = cmd->argv[i * 2 + 2];
119123

120-
if (node > MAX_NODES) {
124+
if (node >= MAX_NODES) {
121125
shout(
122126
"[%d] BEGIN: wrong 'node'\n",
123127
CLIENT_ID(client)
@@ -135,7 +139,7 @@ static char *onbegin(void *client, cmd_t *cmd) {
135139
}
136140
t->active = true;
137141
t->node = node;
138-
t->vote = NEUTRAL;
142+
t->vote = DOUBT;
139143
t->xid = xid;
140144
t->snapshot.seqno = 0;
141145
t->sent_seqno = 0;
@@ -144,21 +148,33 @@ static char *onbegin(void *client, cmd_t *cmd) {
144148
xmax[node] = xid;
145149
}
146150
}
151+
if (!global_transaction_mark(clg, transactions + i, DOUBT)) {
152+
shout(
153+
"[%d] VOTE: global transaction failed"
154+
" to initialize clog bits O_o\n",
155+
CLIENT_ID(client)
156+
);
157+
return strdup("-");
158+
}
159+
147160
transactions_count++;
148161
return strdup("+");
149162
}
150163

151164
static char *onvote(void *client, cmd_t *cmd, int vote) {
165+
assert((vote == POSITIVE) || (vote == NEGATIVE));
166+
152167
if (cmd->argc != 2) {
153168
shout(
154169
"[%d] VOTE: wrong number of arguments\n",
155170
CLIENT_ID(client)
156171
);
157172
return strdup("-");
158173
}
174+
159175
int node = cmd->argv[0];
160176
xid_t xid = cmd->argv[1];
161-
if (node > MAX_NODES) {
177+
if (node >= MAX_NODES) {
162178
shout(
163179
"[%d] VOTE: voted about a wrong 'node' (%d)\n",
164180
CLIENT_ID(client), node
@@ -182,7 +198,7 @@ static char *onvote(void *client, cmd_t *cmd, int vote) {
182198
return strdup("-");
183199
}
184200

185-
if (transactions[i].participants[node].vote != NEUTRAL) {
201+
if (transactions[i].participants[node].vote != DOUBT) {
186202
shout(
187203
"[%d] VOTE: node %d voting on xid %llu again\n",
188204
CLIENT_ID(client), node, xid
@@ -194,10 +210,10 @@ static char *onvote(void *client, cmd_t *cmd, int vote) {
194210
switch (global_transaction_status(transactions + i)) {
195211
case NEGATIVE:
196212
if (global_transaction_mark(clg, transactions + i, NEGATIVE)) {
197-
//shout(
198-
// "[%d] VOTE: global transaction aborted\n",
199-
// CLIENT_ID(client)
200-
//);
213+
shout(
214+
"[%d] VOTE: global transaction aborted\n",
215+
CLIENT_ID(client)
216+
);
201217
transactions[i] = transactions[transactions_count - 1];
202218
transactions_count--;
203219
return strdup("+");
@@ -209,15 +225,15 @@ static char *onvote(void *client, cmd_t *cmd, int vote) {
209225
);
210226
return strdup("-");
211227
}
212-
case NEUTRAL:
213-
//shout("[%d] VOTE: vote counted\n", CLIENT_ID(client));
228+
case DOUBT:
229+
shout("[%d] VOTE: vote counted\n", CLIENT_ID(client));
214230
return strdup("+");
215231
case POSITIVE:
216232
if (global_transaction_mark(clg, transactions + i, POSITIVE)) {
217-
//shout(
218-
// "[%d] VOTE: global transaction committed\n",
219-
// CLIENT_ID(client)
220-
//);
233+
shout(
234+
"[%d] VOTE: global transaction committed\n",
235+
CLIENT_ID(client)
236+
);
221237
transactions[i] = transactions[transactions_count - 1];
222238
transactions_count--;
223239
return strdup("+");
@@ -333,13 +349,16 @@ static char *onstatus(void *client, cmd_t *cmd) {
333349

334350
int status = clog_read(clg, MUX_XID(node, xid));
335351
switch (status) {
352+
case BLANK:
353+
return strdup("+0");
336354
case POSITIVE:
337355
return strdup("+c");
338356
case NEGATIVE:
339357
return strdup("+a");
340-
case NEUTRAL:
358+
case DOUBT:
341359
return strdup("+?");
342360
default:
361+
assert(false); // should not happen
343362
return strdup("-");
344363
}
345364
}
@@ -365,9 +384,8 @@ static char *onnoise(void *client, cmd_t *cmd) {
365384
// }
366385

367386
static char *oncmd(void *client, cmd_t *cmd) {
368-
//shout_cmd(client, cmd);
387+
shout_cmd(client, cmd);
369388

370-
// float started = now_s();
371389
char *result = NULL;
372390
switch (cmd->cmd) {
373391
case CMD_BEGIN:
@@ -388,8 +406,6 @@ static char *oncmd(void *client, cmd_t *cmd) {
388406
default:
389407
return onnoise(client, cmd);
390408
}
391-
// float elapsed = now_s() - started;
392-
// shout("cmd '%c' processed in %0.4f sec\n", cmd->cmd, elapsed);
393409
return result;
394410
}
395411

@@ -420,11 +436,11 @@ char *ondata(void *client, size_t len, char *data) {
420436
parser_t parser = CLIENT_PARSER(client);
421437
char *response = NULL;
422438

423-
//shout(
424-
// "[%d] got some data[%lu] %s\n",
425-
// CLIENT_ID(client),
426-
// len, data
427-
//);
439+
shout(
440+
"[%d] got some data[%lu] %s\n",
441+
CLIENT_ID(client),
442+
len, data
443+
);
428444

429445
// The idea is to feed each character through
430446
// the parser, which will return a cmd from

contrib/pg_xtm/dtmd/src/transaction.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ int global_transaction_status(GlobalTransaction *gt) {
1111
if (t->active) {
1212
assert(t->node == node);
1313
switch (t->vote) {
14+
case BLANK:
15+
shout("a blank vote, this should not happen");
16+
return BLANK;
1417
case NEGATIVE:
1518
againstcount++;
1619
break;
17-
case NEUTRAL:
20+
case DOUBT:
1821
inprogresscount++;
1922
break;
2023
case POSITIVE:
@@ -26,7 +29,7 @@ int global_transaction_status(GlobalTransaction *gt) {
2629
if (againstcount) {
2730
return NEGATIVE;
2831
} else if (inprogresscount) {
29-
return NEUTRAL;
32+
return DOUBT;
3033
} else {
3134
return POSITIVE;
3235
}

contrib/pg_xtm/libdtm.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,11 @@ bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapsho
233233
Assert(s->xip[i] == number); // the number should fit into xip[i] size
234234
}
235235

236-
fprintf(stdout, "snapshot: xmin = %#x, xmax = %#x, active =", s->xmin, s->xmax);
237-
for (i = 0; i < s->xcnt; i++) {
238-
fprintf(stdout, " %#x", s->xip[i]);
239-
}
240-
fprintf(stdout, "\n");
236+
//fprintf(stdout, "snapshot: xmin = %#x, xmax = %#x, active =", s->xmin, s->xmax);
237+
//for (i = 0; i < s->xcnt; i++) {
238+
// fprintf(stdout, " %#x", s->xip[i]);
239+
//}
240+
//fprintf(stdout, "\n");
241241

242242
return true;
243243
}
@@ -280,6 +280,8 @@ XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid)
280280
if (!dtm_read_char(dtm, &statuschar)) return -1;
281281

282282
switch (statuschar) {
283+
case '0':
284+
return TRANSACTION_STATUS_UNKNOWN;
283285
case 'c':
284286
return TRANSACTION_STATUS_COMMITTED;
285287
case 'a':

0 commit comments

Comments
 (0)