@@ -31,9 +31,6 @@ Transaction* transaction_hash[MAX_TRANSACTIONS];
31
31
// reserve something in (next, x) range otherwise, moving 'next' after 'x'.
32
32
xid_t prev_gxid , next_gxid ;
33
33
34
- xid_t threshold_gxid ; // when to start worrying about starting a new term
35
- xid_t last_gxid ; // the greatest gxid we can provide on BEGIN or RESERVE
36
-
37
34
xid_t global_xmin = INVALID_XID ;
38
35
39
36
static Transaction * find_transaction (xid_t xid ) {
@@ -262,14 +259,41 @@ static void onhello(client_t client, int argc, xid_t *argv) {
262
259
}
263
260
}
264
261
262
+ // the greatest gxid we can provide on BEGIN or RESERVE
263
+ static xid_t last_xid_in_term () {
264
+ return raft .term * XIDS_PER_TERM - 1 ;
265
+ }
266
+
267
+ static xid_t first_xid_in_term () {
268
+ return (raft .term - 1 ) * XIDS_PER_TERM ;
269
+ }
270
+
271
+ static int xid2term (xid_t xid ) {
272
+ int term = xid / XIDS_PER_TERM + 1 ;
273
+ return term ;
274
+ }
275
+
276
+ // when to start worrying about starting a new term
277
+ static xid_t get_threshold_xid () {
278
+ return last_xid_in_term () - NEW_TERM_THRESHOLD ;
279
+ }
280
+
281
+ static bool xid_is_safe (xid_t xid ) {
282
+ return xid <= last_xid_in_term ();
283
+ }
284
+
285
+ static bool xid_is_disturbing (xid_t xid ) {
286
+ return inrange (next_gxid + 1 , get_threshold_xid (), xid );
287
+ }
288
+
265
289
static void set_next_gxid (xid_t value ) {
266
290
assert (next_gxid < value ); // The value should only grow.
267
291
268
292
if (use_raft && raft .role == ROLE_LEADER ) {
269
- assert (value <= last_gxid );
270
- if (inrange ( next_gxid + 1 , threshold_gxid , value )) {
293
+ assert (xid_is_safe ( value ) );
294
+ if (xid_is_disturbing ( value )) {
271
295
// Time to worry has come.
272
- raft_start_next_term (& raft );
296
+ raft_ensure_term (& raft , xid2term ( value ) );
273
297
} else {
274
298
// It is either too early to worry,
275
299
// or we have already increased the term.
@@ -293,6 +317,15 @@ static void set_next_gxid(xid_t value) {
293
317
next_gxid = value ;
294
318
}
295
319
320
+ static bool use_xid (xid_t xid ) {
321
+ if (!xid_is_safe (xid )) {
322
+ return false;
323
+ }
324
+ shout ("setting next_gxid to %u\n" , xid + 1 );
325
+ set_next_gxid (xid + 1 );
326
+ return true;
327
+ }
328
+
296
329
static void onreserve (client_t client , int argc , xid_t * argv ) {
297
330
CHECK (argc == 3 , client , "RESERVE: wrong number of arguments" );
298
331
@@ -317,11 +350,10 @@ static void onreserve(client_t client, int argc, xid_t *argv) {
317
350
minxid = max_of_xids (minxid , next_gxid );
318
351
maxxid = max_of_xids (maxxid , minxid + minsize - 1 );
319
352
CHECK (
320
- maxxid <= last_gxid ,
353
+ use_xid ( maxxid ) ,
321
354
client ,
322
355
"not enough xids left in this term"
323
356
);
324
- set_next_gxid (maxxid + 1 );
325
357
}
326
358
debug (
327
359
"[%d] RESERVE: allocating range %u-%u\n" ,
@@ -371,13 +403,13 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
371
403
transaction_clear (t );
372
404
l2_list_link (& active_transactions , & t -> elem );
373
405
406
+ t -> xid = next_gxid ;
374
407
CHECK (
375
- next_gxid <= last_gxid ,
408
+ use_xid ( next_gxid ) ,
376
409
client ,
377
410
"not enought xids left in this term"
378
411
);
379
- prev_gxid = t -> xid = next_gxid ;
380
- set_next_gxid (next_gxid + 1 );
412
+ prev_gxid = t -> xid ;
381
413
t -> snapshots_count = 0 ;
382
414
t -> size = 1 ;
383
415
@@ -865,9 +897,16 @@ int main(int argc, char **argv) {
865
897
866
898
next_gxid = MIN_XID ;
867
899
clg = clog_open (datadir );
868
- set_next_gxid (clog_find_last_used (clg ) + 1 );
900
+
901
+ xid_t last_used_xid = clog_find_last_used (clg );
902
+ shout ("will use %u\n" , last_used_xid );
903
+ if (!use_xid (last_used_xid )) {
904
+ shout ("could not set last used xid to %u\n" , last_used_xid );
905
+ return EXIT_FAILURE ;
906
+ }
907
+ raft .term = xid2term (next_gxid );
908
+
869
909
prev_gxid = next_gxid - 1 ;
870
- last_gxid = INVALID_XID ;
871
910
debug ("initial next_gxid = %u\n" , next_gxid );
872
911
if (!clg ) {
873
912
shout ("could not open clog at '%s'\n" , datadir );
@@ -906,6 +945,7 @@ int main(int argc, char **argv) {
906
945
907
946
mstimer_t t ;
908
947
mstimer_reset (& t );
948
+ int old_term = 0 ;
909
949
while (true) {
910
950
int ms = mstimer_reset (& t );
911
951
raft_msg_t * m = NULL ;
@@ -933,19 +973,16 @@ int main(int argc, char **argv) {
933
973
server_set_enabled (server , raft .role == ROLE_LEADER );
934
974
935
975
// Update the gxid limits based on current term and leadership.
936
- xid_t recent_last_gxid = raft .term * XIDS_PER_TERM ;
937
- if (last_gxid < recent_last_gxid ) {
938
- shout ("updating last_gxid from %u to %u\n" , last_gxid , recent_last_gxid );
939
- last_gxid = recent_last_gxid ;
940
- threshold_gxid = last_gxid - NEW_TERM_THRESHOLD ;
976
+ if (old_term < raft .term ) {
941
977
if (raft .role == ROLE_FOLLOWER ) {
942
978
// If we become a leader, we will use
943
979
// the range of xids after the current
944
980
// last_gxid.
945
- prev_gxid = last_gxid ;
946
- next_gxid = prev_gxid + 1 ;
981
+ prev_gxid = last_xid_in_term () ;
982
+ set_next_gxid ( prev_gxid + 1 ) ;
947
983
shout ("updated range to %u-%u\n" , prev_gxid , next_gxid );
948
984
}
985
+ old_term = raft .term ;
949
986
}
950
987
} else {
951
988
server_set_enabled (server , true);
0 commit comments