Skip to content

Commit 74ff316

Browse files
committed
Abort transaction if the initiator disconnects.
1 parent a6c72e1 commit 74ff316

File tree

3 files changed

+63
-29
lines changed

3 files changed

+63
-29
lines changed

contrib/pg_xtm/dtmd/include/transaction.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ typedef struct Transaction {
1313
// true if the transaction was started on the node
1414
bool active;
1515

16+
int client_id;
1617
int node;
1718
int vote;
1819

@@ -25,7 +26,7 @@ typedef struct Transaction {
2526

2627
#define CHAR_TO_INDEX(C) ((C) - 'a')
2728
typedef struct GlobalTransaction {
28-
int n_snapshots;
29+
int n_snapshots;
2930
Transaction participants[MAX_NODES];
3031
void *listeners[CHAR_TO_INDEX('z')]; // we are going to use 'a' to 'z' for indexing
3132
} GlobalTransaction;

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ typedef struct client_data_t {
2626
} client_data_t;
2727

2828
clog_t clg;
29-
static bool queue_for_transaction_finish(void *stream, void *clientdata, int node, xid_t xid, char cmd);
3029
static client_data_t *create_client_data(int id);
3130
static void free_client_data(client_data_t *cd);
3231
static void onconnect(void *stream, void **clientdata);
@@ -39,6 +38,7 @@ static void gen_snapshot(Snapshot *s, int node);
3938
static void gen_snapshots(GlobalTransaction *gt);
4039
static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd);
4140
static bool queue_for_transaction_finish(void *stream, void *clientdata, int node, xid_t xid, char cmd);
41+
static void notify_listeners(GlobalTransaction *gt, int status);
4242
static char *onstatus(void *stream, void *clientdata, cmd_t *cmd);
4343
static char *onnoise(void *stream, void *clientdata, cmd_t *cmd);
4444
static char *oncmd(void *stream, void *clientdata, cmd_t *cmd);
@@ -68,7 +68,33 @@ static void onconnect(void *stream, void **clientdata) {
6868
}
6969

7070
static void ondisconnect(void *stream, void *clientdata) {
71-
shout("[%d] disconnected\n", CLIENT_ID(clientdata));
71+
int client_id = CLIENT_ID(clientdata);
72+
shout("[%d] disconnected\n", client_id);
73+
74+
int i, n;
75+
for (i = transactions_count - 1; i >= 0; i--) {
76+
GlobalTransaction *gt = transactions + i;
77+
78+
for (n = 0; n < MAX_NODES; n++) {
79+
Transaction *t = gt->participants + n;
80+
if ((t->active) && (t->client_id == client_id)) {
81+
if (global_transaction_mark(clg, gt, NEGATIVE)) {
82+
notify_listeners(gt, NEGATIVE);
83+
84+
transactions[i] = transactions[transactions_count - 1];
85+
transactions_count--;
86+
} else {
87+
shout(
88+
"[%d] DISCONNECT: global transaction failed"
89+
" to abort O_o\n",
90+
client_id
91+
);
92+
}
93+
break;
94+
}
95+
}
96+
}
97+
7298
free_client_data(clientdata);
7399
}
74100

@@ -149,6 +175,7 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
149175
);
150176
return strdup("-");
151177
}
178+
t->client_id = CLIENT_ID(clientdata);
152179
t->active = true;
153180
t->node = node;
154181
t->vote = DOUBT;
@@ -172,6 +199,32 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
172199
return strdup("+");
173200
}
174201

202+
static void notify_listeners(GlobalTransaction *gt, int status) {
203+
void *listener;
204+
switch (status) {
205+
case NEGATIVE:
206+
while ((listener = global_transaction_pop_listener(gt, 's'))) {
207+
// notify 'status' listeners about the aborted status
208+
write_to_stream(listener, strdup("+a"));
209+
}
210+
while ((listener = global_transaction_pop_listener(gt, 'c'))) {
211+
// notify 'commit' listeners about the failure
212+
write_to_stream(listener, strdup("-"));
213+
}
214+
break;
215+
case POSITIVE:
216+
while ((listener = global_transaction_pop_listener(gt, 's'))) {
217+
// notify 'status' listeners about the committed status
218+
write_to_stream(listener, strdup("+c"));
219+
}
220+
while ((listener = global_transaction_pop_listener(gt, 'c'))) {
221+
// notify 'commit' listeners about the success
222+
write_to_stream(listener, strdup("+"));
223+
}
224+
break;
225+
}
226+
}
227+
175228
static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
176229
assert((vote == POSITIVE) || (vote == NEGATIVE));
177230

@@ -218,18 +271,11 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
218271
}
219272
transactions[i].participants[node].vote = vote;
220273

221-
switch (global_transaction_status(transactions + i)) {
274+
GlobalTransaction *gt = transactions + i;
275+
switch (global_transaction_status(gt)) {
222276
case NEGATIVE:
223-
if (global_transaction_mark(clg, transactions + i, NEGATIVE)) {
224-
void *listener;
225-
while ((listener = global_transaction_pop_listener(transactions + i, 's'))) {
226-
// notify 'status' listeners about the aborted status
227-
write_to_stream(listener, strdup("+a"));
228-
}
229-
while ((listener = global_transaction_pop_listener(transactions + i, 'c'))) {
230-
// notify 'commit' listeners about the failure
231-
write_to_stream(listener, strdup("-"));
232-
}
277+
if (global_transaction_mark(clg, gt, NEGATIVE)) {
278+
notify_listeners(gt, NEGATIVE);
233279

234280
transactions[i] = transactions[transactions_count - 1];
235281
transactions_count--;
@@ -258,20 +304,7 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
258304
}
259305
case POSITIVE:
260306
if (global_transaction_mark(clg, transactions + i, POSITIVE)) {
261-
//shout(
262-
// "[%d] VOTE: global transaction committed\n",
263-
// CLIENT_ID(clientdata)
264-
//);
265-
266-
void *listener;
267-
while ((listener = global_transaction_pop_listener(transactions + i, 's'))) {
268-
// notify 'status' listeners about the committed status
269-
write_to_stream(listener, strdup("+c"));
270-
}
271-
while ((listener = global_transaction_pop_listener(transactions + i, 'c'))) {
272-
// notify 'commit' listeners about the success
273-
write_to_stream(listener, strdup("+"));
274-
}
307+
notify_listeners(gt, POSITIVE);
275308

276309
transactions[i] = transactions[transactions_count - 1];
277310
transactions_count--;

contrib/pg_xtm/dtmd/src/transaction.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ bool global_transaction_mark(clog_t clg, GlobalTransaction *gt, int status) {
5858

5959
void global_transaction_clear(GlobalTransaction *gt) {
6060
int i;
61-
gt->n_snapshots = 0;
61+
gt->n_snapshots = 0;
6262
for (i = 0; i < MAX_NODES; i++) {
6363
gt->participants[i].active = false;
6464
}

0 commit comments

Comments
 (0)