Skip to content

Commit 4815181

Browse files
committed
Implement waits in 'commit' query for DTMD.
1 parent 262db5c commit 4815181

File tree

8 files changed

+162
-68
lines changed

8 files changed

+162
-68
lines changed

contrib/pg_xtm/README

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@ Coordinator-Backend API
2424
This API includes a set of postgres procedures that
2525
the coordinator can call with "select" statement.
2626

27-
FIXME: document the API
27+
-- Informs the DTM about a global transaction
28+
-- identified by the corresponding pairs of node:xid values.
29+
dtm_begin_transaction(nodes integer[], xids integer[]) RETURNS void
30+
31+
-- Causes the backend to get a snapshot from the DTM
32+
-- and merge it with the local snapshot.
33+
dtm_get_snapshot() RETURNS void
2834

2935
----------
3036
libdtm api
@@ -69,27 +75,66 @@ XidStatus DtmGlobalGetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid,
6975
Backend-DTM Protocol
7076
--------------------
7177

72-
DTM <--> Backend:
78+
The queries from backend to DTM should be formatted according to this syntax.
79+
80+
<char cmd><hex16 argc><hex16 argv[0]><hex16 argv[1]>...
81+
82+
<cmd> is a character representing a command.
83+
<argc> is the number of arguments.
84+
<argv[i]> are the arguments.
85+
86+
The commands:
87+
88+
'b': begin(size, node0, xid0, node1, xid1, ...)
89+
Starts a global transaction using 'xid0' on 'node0', 'xid1' on 'node1'
90+
and so on. The 'size' is the number of nodes, so for example if 'size'
91+
== 3 there are 6 values expected after it.
92+
93+
The DTM replies with '+' if transaction started, or '-' if failed.
94+
95+
'c': commit(node, xid, wait)
96+
Tells the DTM to vote for commit of the global transaction identified
97+
by the given 'node:xid' pair.
98+
99+
The DTM replies with '+' if committed, or '-' if aborted or failed.
100+
101+
If 'wait' is true, DTM will not reply until it considers the
102+
transaction finished (all nodes committed, or at least one aborted).
103+
104+
'a': abort(node, xid)
105+
Tells the DTM to vote againts commit of the global transaction
106+
identified by the given 'node:xid' pair. This query not have the 'wait'
107+
parameter, because the DTM will not wait for all votes if one is
108+
against the commit.
109+
110+
The DTM replies with '+' if aborted, or '-' if failed. The backend
111+
probably should ignore this reply anyway :)
112+
113+
'h': snapshot(node, xid)
114+
Tells the DTM to give a snapshot for the global transaction identified
115+
by the given 'node:xid' pair. The DTM will create a snapshot for every
116+
participant, so when they ask for the snapshot it will reply with the
117+
"same" snapshot. When a node asks for a snapshot once again, the DTM
118+
generates a fresh version for every participant. So be careful not to
119+
ask for a snapshot from the same node the second time, until all other
120+
nodes also ask for that snapshot.
73121

74-
<- 'b'<hex16 self> - "begin"
75-
-> '+'<hex16 gxid> - "transaction started"
76-
-> '-' - "something went wrong"
122+
The DTM replies with '+' followed by a snapshot in the form:
77123

78-
<- 'c'<hex16 gxid> - "commit"
79-
-> '+' - "commit saved"
80-
-> '-' - "something went wrong"
124+
<hex16 xmin><hex16 xmax><hex16 xcnt><hex16 xip[0]>...
81125

82-
<- 'a'<hex16 gxid> - "abort"
83-
-> '+' - "abort saved"
84-
-> '-' - "something went wrong"
126+
In case of a failure, the DTM replies with '-'.
85127

86-
<- 'h' - "snapshot"
87-
-> '+'<snapshot> - "here is a fresh snapshot for you"
88-
-> '-' - "something went wrong"
128+
's': status(node, xid, wait)
129+
Asks the DTM about the status of the global transaction identified
130+
by the given 'node:xid' pair.
89131

90-
<- 's'<hex16 gxid> - "status"
91-
-> '+''0|c|a|?' - "here is the transaction status"
92-
(0)unknown, (c)committed, (a)aborted, or (?)inprogress
93-
-> '-' - "something went wrong"
132+
The DTM replies with:
133+
"+0" if not started;
134+
"+c" if committed;
135+
"+a" if aborted;
136+
"+?" if in progress;
137+
'-' if failed.
94138

95-
<snapshot> = <hex16 xmin><hex16 xmax><hex16 n><hex16 active[n]>
139+
If 'wait' is true, DTM will not reply until it considers the
140+
transaction finished (all nodes committed, or at least one aborted).

contrib/pg_xtm/dtmd/include/transaction.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@ typedef struct Transaction {
2121
int sent_seqno;
2222
} Transaction;
2323

24+
#define CHAR_TO_INDEX(C) ((C) - 'a')
2425
typedef struct GlobalTransaction {
2526
Transaction participants[MAX_NODES];
26-
void *listener;
27+
void *listeners[CHAR_TO_INDEX('z')]; // we are going to use 'a' to 'z' for indexing
2728
} GlobalTransaction;
2829

2930
int global_transaction_status(GlobalTransaction *gt);
3031
bool global_transaction_mark(clog_t clg, GlobalTransaction *gt, int status);
3132
void global_transaction_clear(GlobalTransaction *gt);
32-
void global_transaction_push_listener(GlobalTransaction *gt, void *listener);
33-
void *global_transaction_pop_listener(GlobalTransaction *gt);
33+
void global_transaction_push_listener(GlobalTransaction *gt, char cmd, void *listener);
34+
void *global_transaction_pop_listener(GlobalTransaction *gt, char cmd);
3435

3536
#endif

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 73 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,24 @@ static client_data_t *create_client_data(int id) {
3333
}
3434

3535
clog_t clg;
36+
static bool queue_for_transaction_finish(void *stream, void *clientdata, int node, xid_t xid, char cmd);
37+
static void free_client_data(client_data_t *cd);
38+
static void onconnect(void *stream, void **clientdata);
39+
static void ondisconnect(void *stream, void *clientdata);
40+
static char *onbegin(void *stream, void *clientdata, cmd_t *cmd);
41+
static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote);
42+
static char *oncommit(void *stream, void *clientdata, cmd_t *cmd);
43+
static char *onabort(void *stream, void *clientdata, cmd_t *cmd);
44+
static void gen_snapshot(Snapshot *s, int node);
45+
static void gen_snapshots(GlobalTransaction *gt);
46+
static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd);
47+
static bool queue_for_transaction_finish(void *stream, void *clientdata, int node, xid_t xid, char cmd);
48+
static char *onstatus(void *stream, void *clientdata, cmd_t *cmd);
49+
static char *onnoise(void *stream, void *clientdata, cmd_t *cmd);
50+
static char *oncmd(void *stream, void *clientdata, cmd_t *cmd);
51+
static char *destructive_concat(char *a, char *b);
52+
static char *ondata(void *stream, void *clientdata, size_t len, char *data);
53+
static void usage(char *prog);
3654

3755
#define CLIENT_ID(X) (((client_data_t*)(X))->id)
3856
#define CLIENT_PARSER(X) (((client_data_t*)(X))->parser)
@@ -157,16 +175,9 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
157175
static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
158176
assert((vote == POSITIVE) || (vote == NEGATIVE));
159177

160-
if (cmd->argc != 2) {
161-
shout(
162-
"[%d] VOTE: wrong number of arguments\n",
163-
CLIENT_ID(clientdata)
164-
);
165-
return strdup("-");
166-
}
167-
168178
int node = cmd->argv[0];
169179
xid_t xid = cmd->argv[1];
180+
bool wait = (vote == POSITIVE) ? cmd->argv[2] : false;
170181
if (node >= MAX_NODES) {
171182
shout(
172183
"[%d] VOTE: voted about a wrong 'node' (%d)\n",
@@ -175,6 +186,13 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
175186
return strdup("-");
176187
}
177188

189+
if ((vote == NEGATIVE) && wait) {
190+
shout(
191+
"[%d] VOTE: 'wait' is ignored for NEGATIVE votes\n",
192+
CLIENT_ID(clientdata)
193+
);
194+
}
195+
178196
int i;
179197
for (i = 0; i < transactions_count; i++) {
180198
Transaction *t = transactions[i].participants + node;
@@ -203,19 +221,15 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
203221
switch (global_transaction_status(transactions + i)) {
204222
case NEGATIVE:
205223
if (global_transaction_mark(clg, transactions + i, NEGATIVE)) {
206-
//shout(
207-
// "[%d] VOTE: global transaction aborted\n",
208-
// CLIENT_ID(clientdata)
209-
//);
210-
211224
void *listener;
212-
while ((listener = global_transaction_pop_listener(transactions + i))) {
213-
//shout(
214-
// "[%d] VOTE: notifying a listener\n",
215-
// CLIENT_ID(clientdata)
216-
//);
225+
while ((listener = global_transaction_pop_listener(transactions + i, 's'))) {
226+
// notify 'status' listeners about the aborted status
217227
write_to_stream(listener, strdup("+a"));
218228
}
229+
while ((listener = global_transaction_pop_listener(transactions + i, 'c'))) {
230+
// notify 'commit' listeners about the failure
231+
write_to_stream(listener, strdup("-"));
232+
}
219233

220234
transactions[i] = transactions[transactions_count - 1];
221235
transactions_count--;
@@ -230,7 +244,18 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
230244
}
231245
case DOUBT:
232246
//shout("[%d] VOTE: vote counted\n", CLIENT_ID(clientdata));
233-
return strdup("+");
247+
if (wait) {
248+
if (!queue_for_transaction_finish(stream, clientdata, node, xid, 'c')) {
249+
shout(
250+
"[%d] VOTE: couldn't queue for transaction finish\n",
251+
CLIENT_ID(clientdata)
252+
);
253+
return strdup("-");
254+
}
255+
return NULL;
256+
} else {
257+
return strdup("+");
258+
}
234259
case POSITIVE:
235260
if (global_transaction_mark(clg, transactions + i, POSITIVE)) {
236261
//shout(
@@ -239,13 +264,14 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
239264
//);
240265

241266
void *listener;
242-
while ((listener = global_transaction_pop_listener(transactions + i))) {
243-
//shout(
244-
// "[%d] VOTE: notifying a listener\n",
245-
// CLIENT_ID(clientdata)
246-
//);
267+
while ((listener = global_transaction_pop_listener(transactions + i, 's'))) {
268+
// notify 'status' listeners about the committed status
247269
write_to_stream(listener, strdup("+c"));
248270
}
271+
while ((listener = global_transaction_pop_listener(transactions + i, 'c'))) {
272+
// notify 'commit' listeners about the success
273+
write_to_stream(listener, strdup("+"));
274+
}
249275

250276
transactions[i] = transactions[transactions_count - 1];
251277
transactions_count--;
@@ -265,10 +291,26 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
265291
}
266292

267293
static char *oncommit(void *stream, void *clientdata, cmd_t *cmd) {
294+
if (cmd->argc != 3) {
295+
shout(
296+
"[%d] COMMIT: wrong number of arguments\n",
297+
CLIENT_ID(clientdata)
298+
);
299+
return strdup("-");
300+
}
301+
268302
return onvote(stream, clientdata, cmd, POSITIVE);
269303
}
270304

271305
static char *onabort(void *stream, void *clientdata, cmd_t *cmd) {
306+
if (cmd->argc != 2) {
307+
shout(
308+
"[%d] ABORT: wrong number of arguments\n",
309+
CLIENT_ID(clientdata)
310+
);
311+
return strdup("-");
312+
}
313+
272314
return onvote(stream, clientdata, cmd, NEGATIVE);
273315
}
274316

@@ -342,7 +384,8 @@ static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd) {
342384
return snapshot_serialize(&t->snapshot);
343385
}
344386

345-
static bool queue_for_transaction_finish(void *stream, void *clientdata, int node, xid_t xid) {
387+
static bool queue_for_transaction_finish(void *stream, void *clientdata, int node, xid_t xid, char cmd) {
388+
assert((cmd >= 'a') && (cmd <= 'z'));
346389
int i;
347390
for (i = 0; i < transactions_count; i++) {
348391
Transaction *t = transactions[i].participants + node;
@@ -359,7 +402,7 @@ static bool queue_for_transaction_finish(void *stream, void *clientdata, int nod
359402
return false;
360403
}
361404

362-
global_transaction_push_listener(transactions + i, stream);
405+
global_transaction_push_listener(transactions + i, cmd, stream);
363406
return true;
364407
}
365408

@@ -392,7 +435,7 @@ static char *onstatus(void *stream, void *clientdata, cmd_t *cmd) {
392435
return strdup("+a");
393436
case DOUBT:
394437
if (wait) {
395-
if (!queue_for_transaction_finish(stream, clientdata, node, xid)) {
438+
if (!queue_for_transaction_finish(stream, clientdata, node, xid, 's')) {
396439
shout(
397440
"[%d] STATUS: couldn't queue for transaction finish\n",
398441
CLIENT_ID(clientdata)
@@ -455,7 +498,7 @@ static char *oncmd(void *stream, void *clientdata, cmd_t *cmd) {
455498
return result;
456499
}
457500

458-
char *destructive_concat(char *a, char *b) {
501+
static char *destructive_concat(char *a, char *b) {
459502
if ((a == NULL) && (b == NULL)) {
460503
return NULL;
461504
}
@@ -477,7 +520,7 @@ char *destructive_concat(char *a, char *b) {
477520
return c;
478521
}
479522

480-
char *ondata(void *stream, void *clientdata, size_t len, char *data) {
523+
static char *ondata(void *stream, void *clientdata, size_t len, char *data) {
481524
int i;
482525
parser_t parser = CLIENT_PARSER(clientdata);
483526
char *response = NULL;
@@ -519,7 +562,7 @@ char *ondata(void *stream, void *clientdata, size_t len, char *data) {
519562
return response;
520563
}
521564

522-
void usage(char *prog) {
565+
static void usage(char *prog) {
523566
printf("Usage: %s [-d DATADIR] [-a HOST] [-p PORT]\n", prog);
524567
}
525568

contrib/pg_xtm/dtmd/src/transaction.c

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,22 +61,26 @@ void global_transaction_clear(GlobalTransaction *gt) {
6161
for (i = 0; i < MAX_NODES; i++) {
6262
gt->participants[i].active = false;
6363
}
64-
gt->listener = NULL;
64+
for (i = 'a'; i <= 'z'; i++) {
65+
gt->listeners[CHAR_TO_INDEX(i)] = NULL;
66+
}
6567
}
6668

67-
void global_transaction_push_listener(GlobalTransaction *gt, void *stream) {
69+
void global_transaction_push_listener(GlobalTransaction *gt, char cmd, void *stream) {
70+
assert((cmd >= 'a') && (cmd <= 'z'));
6871
list_node_t *n = malloc(sizeof(list_node_t));
6972
n->value = stream;
70-
n->next = gt->listener;
71-
gt->listener = n;
73+
n->next = gt->listeners[CHAR_TO_INDEX(cmd)];
74+
gt->listeners[CHAR_TO_INDEX(cmd)] = n;
7275
}
7376

74-
void *global_transaction_pop_listener(GlobalTransaction *gt) {
75-
if (!gt->listener) {
77+
void *global_transaction_pop_listener(GlobalTransaction *gt, char cmd) {
78+
assert((cmd >= 'a') && (cmd <= 'z'));
79+
if (!gt->listeners[CHAR_TO_INDEX(cmd)]) {
7680
return NULL;
7781
}
78-
list_node_t *n = gt->listener;
79-
gt->listener = n->next;
82+
list_node_t *n = gt->listeners[CHAR_TO_INDEX(cmd)];
83+
gt->listeners[CHAR_TO_INDEX(cmd)] = n->next;
8084
void *value = n->value;
8185
free(n);
8286
return value;

contrib/pg_xtm/libdtm.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,16 +245,19 @@ bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapsho
245245
// Commits transaction only once all participants have called this function,
246246
// does not change CLOG otherwise. Returns 'true' on success, 'false' if
247247
// something failed on the daemon side.
248-
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status)
248+
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status, bool wait)
249249
{
250250
bool ok;
251251
switch (status) {
252252
case TRANSACTION_STATUS_COMMITTED:
253253
// query
254-
if (!dtm_query(dtm, 'c', 2, nodeid, xid)) return false;
254+
if (!dtm_query(dtm, 'c', 3, nodeid, xid, wait)) return false;
255255
break;
256256
case TRANSACTION_STATUS_ABORTED:
257257
// query
258+
if (wait) {
259+
fprintf(stderr, "'wait' is ignored for aborts\n");
260+
}
258261
if (!dtm_query(dtm, 'a', 2, nodeid, xid)) return false;
259262
break;
260263
default:

contrib/pg_xtm/libdtm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ bool DtmGlobalGetSnapshot(DTMConn dtm, NodeId nodeid, TransactionId xid, Snapsho
3939
// Commits transaction only once all participants have called this function,
4040
// does not change CLOG otherwise. Returns 'true' on success, 'false' if
4141
// something failed on the daemon side.
42-
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status);
42+
bool DtmGlobalSetTransStatus(DTMConn dtm, NodeId nodeid, TransactionId xid, XidStatus status, bool wait);
4343

4444
// Gets the status of the transaction identified by 'xid'. Returns the status
4545
// on success, or -1 otherwise. If 'wait' is true, then it does not return

0 commit comments

Comments
 (0)