@@ -26,7 +26,6 @@ typedef struct client_data_t {
26
26
} client_data_t ;
27
27
28
28
clog_t clg ;
29
- static bool queue_for_transaction_finish (void * stream , void * clientdata , int node , xid_t xid , char cmd );
30
29
static client_data_t * create_client_data (int id );
31
30
static void free_client_data (client_data_t * cd );
32
31
static void onconnect (void * stream , void * * clientdata );
@@ -39,6 +38,7 @@ static void gen_snapshot(Snapshot *s, int node);
39
38
static void gen_snapshots (GlobalTransaction * gt );
40
39
static char * onsnapshot (void * stream , void * clientdata , cmd_t * cmd );
41
40
static bool queue_for_transaction_finish (void * stream , void * clientdata , int node , xid_t xid , char cmd );
41
+ static void notify_listeners (GlobalTransaction * gt , int status );
42
42
static char * onstatus (void * stream , void * clientdata , cmd_t * cmd );
43
43
static char * onnoise (void * stream , void * clientdata , cmd_t * cmd );
44
44
static char * oncmd (void * stream , void * clientdata , cmd_t * cmd );
@@ -68,7 +68,33 @@ static void onconnect(void *stream, void **clientdata) {
68
68
}
69
69
70
70
static void ondisconnect (void * stream , void * clientdata ) {
71
- shout ("[%d] disconnected\n" , CLIENT_ID (clientdata ));
71
+ int client_id = CLIENT_ID (clientdata );
72
+ shout ("[%d] disconnected\n" , client_id );
73
+
74
+ int i , n ;
75
+ for (i = transactions_count - 1 ; i >= 0 ; i -- ) {
76
+ GlobalTransaction * gt = transactions + i ;
77
+
78
+ for (n = 0 ; n < MAX_NODES ; n ++ ) {
79
+ Transaction * t = gt -> participants + n ;
80
+ if ((t -> active ) && (t -> client_id == client_id )) {
81
+ if (global_transaction_mark (clg , gt , NEGATIVE )) {
82
+ notify_listeners (gt , NEGATIVE );
83
+
84
+ transactions [i ] = transactions [transactions_count - 1 ];
85
+ transactions_count -- ;
86
+ } else {
87
+ shout (
88
+ "[%d] DISCONNECT: global transaction failed"
89
+ " to abort O_o\n" ,
90
+ client_id
91
+ );
92
+ }
93
+ break ;
94
+ }
95
+ }
96
+ }
97
+
72
98
free_client_data (clientdata );
73
99
}
74
100
@@ -149,6 +175,7 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
149
175
);
150
176
return strdup ("-" );
151
177
}
178
+ t -> client_id = CLIENT_ID (clientdata );
152
179
t -> active = true;
153
180
t -> node = node ;
154
181
t -> vote = DOUBT ;
@@ -172,6 +199,32 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
172
199
return strdup ("+" );
173
200
}
174
201
202
+ static void notify_listeners (GlobalTransaction * gt , int status ) {
203
+ void * listener ;
204
+ switch (status ) {
205
+ case NEGATIVE :
206
+ while ((listener = global_transaction_pop_listener (gt , 's' ))) {
207
+ // notify 'status' listeners about the aborted status
208
+ write_to_stream (listener , strdup ("+a" ));
209
+ }
210
+ while ((listener = global_transaction_pop_listener (gt , 'c' ))) {
211
+ // notify 'commit' listeners about the failure
212
+ write_to_stream (listener , strdup ("-" ));
213
+ }
214
+ break ;
215
+ case POSITIVE :
216
+ while ((listener = global_transaction_pop_listener (gt , 's' ))) {
217
+ // notify 'status' listeners about the committed status
218
+ write_to_stream (listener , strdup ("+c" ));
219
+ }
220
+ while ((listener = global_transaction_pop_listener (gt , 'c' ))) {
221
+ // notify 'commit' listeners about the success
222
+ write_to_stream (listener , strdup ("+" ));
223
+ }
224
+ break ;
225
+ }
226
+ }
227
+
175
228
static char * onvote (void * stream , void * clientdata , cmd_t * cmd , int vote ) {
176
229
assert ((vote == POSITIVE ) || (vote == NEGATIVE ));
177
230
@@ -218,18 +271,11 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
218
271
}
219
272
transactions [i ].participants [node ].vote = vote ;
220
273
221
- switch (global_transaction_status (transactions + i )) {
274
+ GlobalTransaction * gt = transactions + i ;
275
+ switch (global_transaction_status (gt )) {
222
276
case NEGATIVE :
223
- if (global_transaction_mark (clg , transactions + i , NEGATIVE )) {
224
- void * listener ;
225
- while ((listener = global_transaction_pop_listener (transactions + i , 's' ))) {
226
- // notify 'status' listeners about the aborted status
227
- write_to_stream (listener , strdup ("+a" ));
228
- }
229
- while ((listener = global_transaction_pop_listener (transactions + i , 'c' ))) {
230
- // notify 'commit' listeners about the failure
231
- write_to_stream (listener , strdup ("-" ));
232
- }
277
+ if (global_transaction_mark (clg , gt , NEGATIVE )) {
278
+ notify_listeners (gt , NEGATIVE );
233
279
234
280
transactions [i ] = transactions [transactions_count - 1 ];
235
281
transactions_count -- ;
@@ -258,20 +304,7 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
258
304
}
259
305
case POSITIVE :
260
306
if (global_transaction_mark (clg , transactions + i , POSITIVE )) {
261
- //shout(
262
- // "[%d] VOTE: global transaction committed\n",
263
- // CLIENT_ID(clientdata)
264
- //);
265
-
266
- void * listener ;
267
- while ((listener = global_transaction_pop_listener (transactions + i , 's' ))) {
268
- // notify 'status' listeners about the committed status
269
- write_to_stream (listener , strdup ("+c" ));
270
- }
271
- while ((listener = global_transaction_pop_listener (transactions + i , 'c' ))) {
272
- // notify 'commit' listeners about the success
273
- write_to_stream (listener , strdup ("+" ));
274
- }
307
+ notify_listeners (gt , POSITIVE );
275
308
276
309
transactions [i ] = transactions [transactions_count - 1 ];
277
310
transactions_count -- ;
0 commit comments