@@ -30,6 +30,10 @@ Transaction* transaction_hash[MAX_TRANSACTIONS];
30
30
// We reserve the local xids if they fit between (prev, next) range, and
31
31
// reserve something in (next, x) range otherwise, moving 'next' after 'x'.
32
32
xid_t prev_gxid , next_gxid ;
33
+
34
+ xid_t threshold_gxid ; // when to start worrying about starting a new term
35
+ xid_t last_gxid ; // the greatest gxid we can provide on BEGIN or RESERVE
36
+
33
37
xid_t global_xmin = INVALID_XID ;
34
38
35
39
static Transaction * find_transaction (xid_t xid ) {
@@ -41,7 +45,11 @@ static Transaction *find_transaction(xid_t xid) {
41
45
typedef struct client_userdata_t {
42
46
int id ;
43
47
int snapshots_sent ;
44
- xid_t xid ;
48
+
49
+ // FIXME: use some meaningful words for these. E.g. "expectee" instead
50
+ // of "xwait".
51
+ Transaction * xpart ; // the transaction this client is participating in
52
+ Transaction * xwait ; // the transaction this client is waiting for
45
53
} client_userdata_t ;
46
54
47
55
clog_t clg ;
@@ -51,13 +59,15 @@ bool use_raft;
51
59
#define CLIENT_USERDATA (CLIENT ) ((client_userdata_t*)client_get_userdata(CLIENT))
52
60
#define CLIENT_ID (CLIENT ) (CLIENT_USERDATA(CLIENT)->id)
53
61
#define CLIENT_SNAPSENT (CLIENT ) (CLIENT_USERDATA(CLIENT)->snapshots_sent)
54
- #define CLIENT_XID (CLIENT ) (CLIENT_USERDATA(CLIENT)->xid)
62
+ #define CLIENT_XPART (CLIENT ) (CLIENT_USERDATA(CLIENT)->xpart)
63
+ #define CLIENT_XWAIT (CLIENT ) (CLIENT_USERDATA(CLIENT)->xwait)
55
64
56
65
static client_userdata_t * create_client_userdata (int id ) {
57
66
client_userdata_t * cd = malloc (sizeof (client_userdata_t ));
58
67
cd -> id = id ;
59
68
cd -> snapshots_sent = 0 ;
60
- cd -> xid = INVALID_XID ;
69
+ cd -> xpart = NULL ;
70
+ cd -> xwait = NULL ;
61
71
return cd ;
62
72
}
63
73
@@ -66,9 +76,10 @@ static void free_client_userdata(client_userdata_t *cd) {
66
76
}
67
77
68
78
inline static void free_transaction (Transaction * t ) {
69
- Transaction * * tpp ;
70
- for (tpp = & transaction_hash [t -> xid % MAX_TRANSACTIONS ]; * tpp != t ; tpp = & (* tpp )-> collision );
71
- * tpp = t -> collision ;
79
+ assert (transaction_pop_listener (t , 's' ) == NULL );
80
+ Transaction * * tpp ;
81
+ for (tpp = & transaction_hash [t -> xid % MAX_TRANSACTIONS ]; * tpp != t ; tpp = & (* tpp )-> collision );
82
+ * tpp = t -> collision ;
72
83
l2_list_unlink (& t -> elem );
73
84
t -> elem .next = free_transactions ;
74
85
free_transactions = & t -> elem ;
@@ -84,7 +95,7 @@ static void notify_listeners(Transaction *t, int status) {
84
95
case BLANK :
85
96
while ((listener = transaction_pop_listener (t , 's' ))) {
86
97
debug ("[%d] notifying the client about xid=%u (unknown)\n" , CLIENT_ID (listener ), t -> xid );
87
- shout ( "%p DEREF(notify): %d\n" , listener , client_deref ( listener )) ;
98
+ CLIENT_XWAIT ( listener ) = NULL ;
88
99
client_message_shortcut (
89
100
(client_t )listener ,
90
101
RES_TRANSACTION_UNKNOWN
@@ -94,7 +105,7 @@ static void notify_listeners(Transaction *t, int status) {
94
105
case NEGATIVE :
95
106
while ((listener = transaction_pop_listener (t , 's' ))) {
96
107
debug ("[%d] notifying the client about xid=%u (aborted)\n" , CLIENT_ID (listener ), t -> xid );
97
- shout ( "%p DEREF(notify): %d\n" , listener , client_deref ( listener )) ;
108
+ CLIENT_XWAIT ( listener ) = NULL ;
98
109
client_message_shortcut (
99
110
(client_t )listener ,
100
111
RES_TRANSACTION_ABORTED
@@ -104,7 +115,7 @@ static void notify_listeners(Transaction *t, int status) {
104
115
case POSITIVE :
105
116
while ((listener = transaction_pop_listener (t , 's' ))) {
106
117
debug ("[%d] notifying the client about xid=%u (committed)\n" , CLIENT_ID (listener ), t -> xid );
107
- shout ( "%p DEREF(notify): %d\n" , listener , client_deref ( listener )) ;
118
+ CLIENT_XWAIT ( listener ) = NULL ;
108
119
client_message_shortcut (
109
120
(client_t )listener ,
110
121
RES_TRANSACTION_COMMITTED
@@ -114,7 +125,7 @@ static void notify_listeners(Transaction *t, int status) {
114
125
case DOUBT :
115
126
while ((listener = transaction_pop_listener (t , 's' ))) {
116
127
debug ("[%d] notifying the client about xid=%u (inprogress)\n" , CLIENT_ID (listener ), t -> xid );
117
- shout ( "%p DEREF(notify): %d\n" , listener , client_deref ( listener )) ;
128
+ CLIENT_XWAIT ( listener ) = NULL ;
118
129
client_message_shortcut (
119
130
(client_t )listener ,
120
131
RES_TRANSACTION_INPROGRESS
@@ -124,6 +135,21 @@ static void notify_listeners(Transaction *t, int status) {
124
135
}
125
136
}
126
137
138
+ static void set_next_gxid (xid_t value ) {
139
+ assert (next_gxid < value );
140
+ if (use_raft && raft .role == ROLE_LEADER ) {
141
+ assert (value <= last_gxid );
142
+ if (inrange (next_gxid + 1 , threshold_gxid , value )) {
143
+ // Time to worry has come.
144
+ raft .term ++ ;
145
+ } else {
146
+ // It is either too early to worry,
147
+ // or we have already increased the term.
148
+ }
149
+ }
150
+ next_gxid = value ;
151
+ }
152
+
127
153
static void apply_clog_update (int action , int argument ) {
128
154
int status = action ;
129
155
xid_t xid = argument ;
@@ -154,28 +180,20 @@ static void onconnect(client_t client) {
154
180
}
155
181
156
182
static void ondisconnect (client_t client ) {
157
- debug ("[%d] disconnected\n" , CLIENT_ID (client ));
158
-
159
- if (CLIENT_XID (client ) != INVALID_XID ) {
160
- Transaction * t = find_transaction (CLIENT_XID (client ));
161
- if (t != NULL ) {
162
- if (transaction_remove_listener (t , 's' , client )) {
163
- shout ("%p DEREF(disconn): %d\n" , client , client_deref (client ));
164
- } else {
165
- shout ("%p DEREF(disconn): not found\n" , client );
166
- }
167
-
168
- if (use_raft && (raft .role == ROLE_LEADER )) {
169
- raft_emit (& raft , NEGATIVE , t -> xid );
170
- }
171
- } else {
172
- shout (
173
- "[%d] DISCONNECT: transaction xid=%u not found O_o\n" ,
174
- CLIENT_ID (client ), CLIENT_XID (client )
175
- );
183
+ Transaction * t ;
184
+ debug ("[%d, %p] disconnected\n" , CLIENT_ID (client ), client );
185
+
186
+ if ((t = CLIENT_XPART (client ))) {
187
+ transaction_remove_listener (t , 's' , client );
188
+ if (use_raft && (raft .role == ROLE_LEADER )) {
189
+ raft_emit (& raft , NEGATIVE , t -> xid );
176
190
}
177
191
}
178
192
193
+ if ((t = CLIENT_XWAIT (client ))) {
194
+ transaction_remove_listener (t , 's' , client );
195
+ }
196
+
179
197
free_client_userdata (CLIENT_USERDATA (client ));
180
198
client_set_userdata (client , NULL );
181
199
}
@@ -274,15 +292,20 @@ static void onreserve(client_t client, int argc, xid_t *argv) {
274
292
275
293
if ((prev_gxid >= minxid ) || (maxxid >= next_gxid )) {
276
294
debug (
277
- "[%d] RESERVE: local range %u-%u is not between global range %u-%u\n" ,
295
+ "[%d] RESERVE: local range %u-%u is not inside global range %u-%u\n" ,
278
296
CLIENT_ID (client ),
279
297
minxid , maxxid ,
280
298
prev_gxid , next_gxid
281
299
);
282
300
283
301
minxid = max_of_xids (minxid , next_gxid );
284
302
maxxid = max_of_xids (maxxid , minxid + minsize - 1 );
285
- next_gxid = maxxid + 1 ;
303
+ CHECK (
304
+ maxxid <= last_gxid ,
305
+ client ,
306
+ "not enough xids left in this term"
307
+ );
308
+ set_next_gxid (maxxid + 1 );
286
309
}
287
310
debug (
288
311
"[%d] RESERVE: allocating range %u-%u\n" ,
@@ -318,7 +341,7 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
318
341
);
319
342
320
343
CHECK (
321
- CLIENT_XID (client ) == INVALID_XID ,
344
+ CLIENT_XPART (client ) == NULL ,
322
345
client ,
323
346
"BEGIN: already participating in another transaction"
324
347
);
@@ -332,15 +355,21 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
332
355
transaction_clear (t );
333
356
l2_list_link (& active_transactions , & t -> elem );
334
357
335
- prev_gxid = t -> xid = next_gxid ++ ;
358
+ CHECK (
359
+ next_gxid <= last_gxid ,
360
+ client ,
361
+ "not enought xids left in this term"
362
+ );
363
+ set_next_gxid (next_gxid + 1 );
364
+ prev_gxid = t -> xid = next_gxid ;
336
365
t -> snapshots_count = 0 ;
337
366
t -> size = 1 ;
338
367
339
- t -> collision = transaction_hash [t -> xid % MAX_TRANSACTIONS ];
340
- transaction_hash [t -> xid % MAX_TRANSACTIONS ] = t ;
368
+ t -> collision = transaction_hash [t -> xid % MAX_TRANSACTIONS ];
369
+ transaction_hash [t -> xid % MAX_TRANSACTIONS ] = t ;
341
370
342
371
CLIENT_SNAPSENT (client ) = 0 ;
343
- CLIENT_XID (client ) = t -> xid ;
372
+ CLIENT_XPART (client ) = t ;
344
373
345
374
if (!clog_write (clg , t -> xid , DOUBT )) {
346
375
shout (
@@ -390,8 +419,8 @@ static bool queue_for_transaction_finish(client_t client, xid_t xid, char cmd) {
390
419
// CLIENT_XID(client) and 'xid', i.e. we are able to tell which
391
420
// transaction waits which transaction.
392
421
422
+ CLIENT_XWAIT (client ) = t ;
393
423
transaction_push_listener (t , cmd , client );
394
- shout ("%p REF: %d\n" , client , client_ref (client ));
395
424
return true;
396
425
}
397
426
@@ -403,7 +432,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
403
432
bool wait = argv [2 ];
404
433
405
434
CHECK (
406
- CLIENT_XID (client ) == xid ,
435
+ CLIENT_XPART (client ) && ( CLIENT_XPART ( client ) -> xid == xid ) ,
407
436
client ,
408
437
"VOTE: voting for a transaction not participated in"
409
438
);
@@ -427,7 +456,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
427
456
}
428
457
assert (t -> votes_for + t -> votes_against <= t -> size );
429
458
430
- CLIENT_XID (client ) = INVALID_XID ; // not participating any more
459
+ CLIENT_XPART (client ) = NULL ; // not participating any more
431
460
432
461
int s = transaction_status (t );
433
462
switch (s ) {
@@ -485,14 +514,14 @@ static void onsnapshot(client_t client, int argc, xid_t *argv) {
485
514
return ;
486
515
}
487
516
488
- if (CLIENT_XID (client ) == INVALID_XID ) {
517
+ if (CLIENT_XPART (client ) == NULL ) {
489
518
CLIENT_SNAPSENT (client ) = 0 ;
490
- CLIENT_XID (client ) = t -> xid ;
519
+ CLIENT_XPART (client ) = t ;
491
520
t -> size += 1 ;
492
521
}
493
522
494
523
CHECK (
495
- CLIENT_XID (client ) == t -> xid ,
524
+ CLIENT_XPART (client ) && ( CLIENT_XPART ( client ) -> xid == xid ) ,
496
525
client ,
497
526
"SNAPSHOT: getting snapshot for a transaction not participated in"
498
527
);
@@ -839,6 +868,7 @@ int main(int argc, char **argv) {
839
868
840
869
prev_gxid = MIN_XID ;
841
870
next_gxid = MIN_XID ;
871
+ last_gxid = INVALID_XID ;
842
872
843
873
int raftsock = raft_create_udp_socket (& raft );
844
874
if (raftsock == -1 ) {
@@ -856,8 +886,6 @@ int main(int argc, char **argv) {
856
886
return EXIT_FAILURE ;
857
887
}
858
888
859
- srand (getpid ());
860
-
861
889
mstimer_t t ;
862
890
mstimer_reset (& t );
863
891
while (true) {
@@ -874,11 +902,6 @@ int main(int argc, char **argv) {
874
902
assert (m ); // m should not be NULL, because the message should be ready to recv
875
903
}
876
904
877
- if (rand () % 10000 == 0 ) {
878
- shout ("sleeping to test raft features\n" );
879
- sleep (1 );
880
- }
881
-
882
905
if (use_raft ) {
883
906
int applied = raft_apply (& raft , apply_clog_update );
884
907
if (applied ) {
@@ -890,6 +913,22 @@ int main(int argc, char **argv) {
890
913
}
891
914
892
915
server_set_enabled (server , raft .role == ROLE_LEADER );
916
+
917
+ // Update the gxid limits based on current term and leadership.
918
+ xid_t recent_last_gxid = raft .term * XIDS_PER_TERM ;
919
+ if (last_gxid < recent_last_gxid ) {
920
+ shout ("updating last_gxid from %u to %u\n" , last_gxid , recent_last_gxid );
921
+ last_gxid = recent_last_gxid ;
922
+ threshold_gxid = last_gxid - NEW_TERM_THRESHOLD ;
923
+ if (raft .role == ROLE_FOLLOWER ) {
924
+ // If we become a leader, we will use
925
+ // the range of xids after the current
926
+ // last_gxid.
927
+ prev_gxid = last_gxid ;
928
+ next_gxid = prev_gxid + 1 ;
929
+ shout ("updated range to %u-%u\n" , prev_gxid , next_gxid );
930
+ }
931
+ }
893
932
} else {
894
933
server_set_enabled (server , true);
895
934
}
0 commit comments