@@ -33,6 +33,24 @@ static client_data_t *create_client_data(int id) {
33
33
}
34
34
35
35
clog_t clg ;
36
+ static bool queue_for_transaction_finish (void * stream , void * clientdata , int node , xid_t xid , char cmd );
37
+ static void free_client_data (client_data_t * cd );
38
+ static void onconnect (void * stream , void * * clientdata );
39
+ static void ondisconnect (void * stream , void * clientdata );
40
+ static char * onbegin (void * stream , void * clientdata , cmd_t * cmd );
41
+ static char * onvote (void * stream , void * clientdata , cmd_t * cmd , int vote );
42
+ static char * oncommit (void * stream , void * clientdata , cmd_t * cmd );
43
+ static char * onabort (void * stream , void * clientdata , cmd_t * cmd );
44
+ static void gen_snapshot (Snapshot * s , int node );
45
+ static void gen_snapshots (GlobalTransaction * gt );
46
+ static char * onsnapshot (void * stream , void * clientdata , cmd_t * cmd );
47
+ static bool queue_for_transaction_finish (void * stream , void * clientdata , int node , xid_t xid , char cmd );
48
+ static char * onstatus (void * stream , void * clientdata , cmd_t * cmd );
49
+ static char * onnoise (void * stream , void * clientdata , cmd_t * cmd );
50
+ static char * oncmd (void * stream , void * clientdata , cmd_t * cmd );
51
+ static char * destructive_concat (char * a , char * b );
52
+ static char * ondata (void * stream , void * clientdata , size_t len , char * data );
53
+ static void usage (char * prog );
36
54
37
55
#define CLIENT_ID (X ) (((client_data_t*)(X))->id)
38
56
#define CLIENT_PARSER (X ) (((client_data_t*)(X))->parser)
@@ -157,16 +175,9 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
157
175
static char * onvote (void * stream , void * clientdata , cmd_t * cmd , int vote ) {
158
176
assert ((vote == POSITIVE ) || (vote == NEGATIVE ));
159
177
160
- if (cmd -> argc != 2 ) {
161
- shout (
162
- "[%d] VOTE: wrong number of arguments\n" ,
163
- CLIENT_ID (clientdata )
164
- );
165
- return strdup ("-" );
166
- }
167
-
168
178
int node = cmd -> argv [0 ];
169
179
xid_t xid = cmd -> argv [1 ];
180
+ bool wait = (vote == POSITIVE ) ? cmd -> argv [2 ] : false;
170
181
if (node >= MAX_NODES ) {
171
182
shout (
172
183
"[%d] VOTE: voted about a wrong 'node' (%d)\n" ,
@@ -175,6 +186,13 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
175
186
return strdup ("-" );
176
187
}
177
188
189
+ if ((vote == NEGATIVE ) && wait ) {
190
+ shout (
191
+ "[%d] VOTE: 'wait' is ignored for NEGATIVE votes\n" ,
192
+ CLIENT_ID (clientdata )
193
+ );
194
+ }
195
+
178
196
int i ;
179
197
for (i = 0 ; i < transactions_count ; i ++ ) {
180
198
Transaction * t = transactions [i ].participants + node ;
@@ -203,19 +221,15 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
203
221
switch (global_transaction_status (transactions + i )) {
204
222
case NEGATIVE :
205
223
if (global_transaction_mark (clg , transactions + i , NEGATIVE )) {
206
- //shout(
207
- // "[%d] VOTE: global transaction aborted\n",
208
- // CLIENT_ID(clientdata)
209
- //);
210
-
211
224
void * listener ;
212
- while ((listener = global_transaction_pop_listener (transactions + i ))) {
213
- //shout(
214
- // "[%d] VOTE: notifying a listener\n",
215
- // CLIENT_ID(clientdata)
216
- //);
225
+ while ((listener = global_transaction_pop_listener (transactions + i , 's' ))) {
226
+ // notify 'status' listeners about the aborted status
217
227
write_to_stream (listener , strdup ("+a" ));
218
228
}
229
+ while ((listener = global_transaction_pop_listener (transactions + i , 'c' ))) {
230
+ // notify 'commit' listeners about the failure
231
+ write_to_stream (listener , strdup ("-" ));
232
+ }
219
233
220
234
transactions [i ] = transactions [transactions_count - 1 ];
221
235
transactions_count -- ;
@@ -230,7 +244,18 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
230
244
}
231
245
case DOUBT :
232
246
//shout("[%d] VOTE: vote counted\n", CLIENT_ID(clientdata));
233
- return strdup ("+" );
247
+ if (wait ) {
248
+ if (!queue_for_transaction_finish (stream , clientdata , node , xid , 'c' )) {
249
+ shout (
250
+ "[%d] VOTE: couldn't queue for transaction finish\n" ,
251
+ CLIENT_ID (clientdata )
252
+ );
253
+ return strdup ("-" );
254
+ }
255
+ return NULL ;
256
+ } else {
257
+ return strdup ("+" );
258
+ }
234
259
case POSITIVE :
235
260
if (global_transaction_mark (clg , transactions + i , POSITIVE )) {
236
261
//shout(
@@ -239,13 +264,14 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
239
264
//);
240
265
241
266
void * listener ;
242
- while ((listener = global_transaction_pop_listener (transactions + i ))) {
243
- //shout(
244
- // "[%d] VOTE: notifying a listener\n",
245
- // CLIENT_ID(clientdata)
246
- //);
267
+ while ((listener = global_transaction_pop_listener (transactions + i , 's' ))) {
268
+ // notify 'status' listeners about the committed status
247
269
write_to_stream (listener , strdup ("+c" ));
248
270
}
271
+ while ((listener = global_transaction_pop_listener (transactions + i , 'c' ))) {
272
+ // notify 'commit' listeners about the success
273
+ write_to_stream (listener , strdup ("+" ));
274
+ }
249
275
250
276
transactions [i ] = transactions [transactions_count - 1 ];
251
277
transactions_count -- ;
@@ -265,10 +291,26 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
265
291
}
266
292
267
293
static char * oncommit (void * stream , void * clientdata , cmd_t * cmd ) {
294
+ if (cmd -> argc != 3 ) {
295
+ shout (
296
+ "[%d] COMMIT: wrong number of arguments\n" ,
297
+ CLIENT_ID (clientdata )
298
+ );
299
+ return strdup ("-" );
300
+ }
301
+
268
302
return onvote (stream , clientdata , cmd , POSITIVE );
269
303
}
270
304
271
305
static char * onabort (void * stream , void * clientdata , cmd_t * cmd ) {
306
+ if (cmd -> argc != 2 ) {
307
+ shout (
308
+ "[%d] ABORT: wrong number of arguments\n" ,
309
+ CLIENT_ID (clientdata )
310
+ );
311
+ return strdup ("-" );
312
+ }
313
+
272
314
return onvote (stream , clientdata , cmd , NEGATIVE );
273
315
}
274
316
@@ -342,7 +384,8 @@ static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd) {
342
384
return snapshot_serialize (& t -> snapshot );
343
385
}
344
386
345
- static bool queue_for_transaction_finish (void * stream , void * clientdata , int node , xid_t xid ) {
387
+ static bool queue_for_transaction_finish (void * stream , void * clientdata , int node , xid_t xid , char cmd ) {
388
+ assert ((cmd >= 'a' ) && (cmd <= 'z' ));
346
389
int i ;
347
390
for (i = 0 ; i < transactions_count ; i ++ ) {
348
391
Transaction * t = transactions [i ].participants + node ;
@@ -359,7 +402,7 @@ static bool queue_for_transaction_finish(void *stream, void *clientdata, int nod
359
402
return false;
360
403
}
361
404
362
- global_transaction_push_listener (transactions + i , stream );
405
+ global_transaction_push_listener (transactions + i , cmd , stream );
363
406
return true;
364
407
}
365
408
@@ -392,7 +435,7 @@ static char *onstatus(void *stream, void *clientdata, cmd_t *cmd) {
392
435
return strdup ("+a" );
393
436
case DOUBT :
394
437
if (wait ) {
395
- if (!queue_for_transaction_finish (stream , clientdata , node , xid )) {
438
+ if (!queue_for_transaction_finish (stream , clientdata , node , xid , 's' )) {
396
439
shout (
397
440
"[%d] STATUS: couldn't queue for transaction finish\n" ,
398
441
CLIENT_ID (clientdata )
@@ -455,7 +498,7 @@ static char *oncmd(void *stream, void *clientdata, cmd_t *cmd) {
455
498
return result ;
456
499
}
457
500
458
- char * destructive_concat (char * a , char * b ) {
501
+ static char * destructive_concat (char * a , char * b ) {
459
502
if ((a == NULL ) && (b == NULL )) {
460
503
return NULL ;
461
504
}
@@ -477,7 +520,7 @@ char *destructive_concat(char *a, char *b) {
477
520
return c ;
478
521
}
479
522
480
- char * ondata (void * stream , void * clientdata , size_t len , char * data ) {
523
+ static char * ondata (void * stream , void * clientdata , size_t len , char * data ) {
481
524
int i ;
482
525
parser_t parser = CLIENT_PARSER (clientdata );
483
526
char * response = NULL ;
@@ -519,7 +562,7 @@ char *ondata(void *stream, void *clientdata, size_t len, char *data) {
519
562
return response ;
520
563
}
521
564
522
- void usage (char * prog ) {
565
+ static void usage (char * prog ) {
523
566
printf ("Usage: %s [-d DATADIR] [-a HOST] [-p PORT]\n" , prog );
524
567
}
525
568
0 commit comments