41
41
42
42
typedef uint64 timestamp_t ;
43
43
44
- typedef struct
45
- {
46
- nodeid_t node_id ;
47
- cid_t cid ;
48
- volatile slock_t lock ;
49
- } DtmNodeState ;
50
-
51
- typedef struct
44
+ typedef struct DtmTransStatus
52
45
{
53
46
TransactionId xid ;
54
47
XidStatus status ;
55
48
bool is_coordinator ;
56
49
cid_t cid ;
50
+ struct DtmTransStatus * next ;
57
51
} DtmTransStatus ;
58
52
53
+ typedef struct
54
+ {
55
+ nodeid_t node_id ;
56
+ cid_t cid ;
57
+ volatile slock_t lock ;
58
+ DtmTransStatus * trans_list_head ;
59
+ DtmTransStatus * * trans_list_tail ;
60
+ } DtmNodeState ;
61
+
59
62
typedef struct
60
63
{
61
64
char gtid [MAX_GTID_SIZE ];
@@ -72,8 +75,8 @@ static HTAB* gtid2xid;
72
75
static DtmNodeState * local ;
73
76
static DtmTransState dtm_tx ;
74
77
75
- static TransactionId DtmOldestXid = FirstNormalTransactionId ;
76
- static cid_t DtmOldestCid = INVALID_CID ;
78
+ // static TransactionId DatmOldestXid = FirstNormalTransactionId;
79
+ // static cid_t DtmOldestCid = INVALID_CID;
77
80
static int DtmVacuumDelay ;
78
81
79
82
static Snapshot DtmGetSnapshot (Snapshot snapshot );
@@ -369,11 +372,32 @@ static int dtm_gtid_match_fn(const void *key1, const void *key2, Size keysize)
369
372
return strcmp ((GlobalTransactionId )key1 , (GlobalTransactionId )key2 );
370
373
}
371
374
375
+ static void IncludeInTransactionList (DtmTransStatus * ts )
376
+ {
377
+ ts -> next = NULL ;
378
+ * local -> trans_list_tail = ts ;
379
+ local -> trans_list_tail = & ts -> next ;
380
+ }
381
+
372
382
static TransactionId DtmAdjustOldestXid (TransactionId xid )
373
383
{
374
384
if (TransactionIdIsValid (xid )) {
375
- DtmTransStatus * ts ;
385
+ DtmTransStatus * ts , * prev = NULL ;
386
+ timestamp_t cutoff_time = dtm_get_current_time () - DtmVacuumDelay * USEC ;
376
387
SpinLockAcquire (& local -> lock );
388
+ #if 1
389
+ for (ts = local -> trans_list_head ; ts != NULL && ts -> cid < cutoff_time ; prev = ts , ts = ts -> next ) {
390
+ if (prev != NULL ) {
391
+ hash_search (xid2status , & prev -> xid , HASH_REMOVE , NULL );
392
+ }
393
+ }
394
+ if (prev != NULL ) {
395
+ local -> trans_list_head = prev ;
396
+ xid = prev -> xid ;
397
+ } else {
398
+ xid = FirstNormalTransactionId ;
399
+ }
400
+ #else
377
401
ts = (DtmTransStatus * )hash_search (xid2status , & xid , HASH_FIND , NULL );
378
402
if (ts == NULL || ts -> cid + DtmVacuumDelay * USEC > dtm_get_current_time ()) {
379
403
xid = DtmOldestXid ;
@@ -382,6 +406,7 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid)
382
406
DtmOldestXid = xid ;
383
407
DtmOldestCid = ts -> cid ;
384
408
}
409
+ #endif
385
410
SpinLockRelease (& local -> lock );
386
411
}
387
412
return xid ;
@@ -481,6 +506,8 @@ void DtmInitialize()
481
506
{
482
507
local -> cid = dtm_get_current_time ();
483
508
local -> node_id = -1 ;
509
+ local -> trans_list_head = NULL ;
510
+ local -> trans_list_tail = & local -> trans_list_head ;
484
511
SpinLockInit (& local -> lock );
485
512
RegisterXactCallback (dtm_xact_callback , NULL );
486
513
}
@@ -548,8 +575,9 @@ void DtmLocalBeginPrepare(GlobalTransactionId gtid, nodeid_t coordinator)
548
575
549
576
ts = (DtmTransStatus * )hash_search (xid2status , & id -> xid , HASH_ENTER , NULL );
550
577
ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
551
- ts -> cid = INVALID_CID ;
578
+ ts -> cid = dtm_get_cid () ;
552
579
ts -> is_coordinator = coordinator == local -> node_id ;
580
+ IncludeInTransactionList (ts );
553
581
}
554
582
SpinLockRelease (& local -> lock );
555
583
}
@@ -593,7 +621,7 @@ void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid)
593
621
594
622
SpinLockAcquire (& local -> lock );
595
623
{
596
- DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_FIND , NULL );
624
+ DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_REMOVE , NULL );
597
625
Assert (id != NULL );
598
626
599
627
x -> is_global = true;
@@ -616,6 +644,7 @@ void DtmLocalCommit(DtmTransState* x)
616
644
} else {
617
645
Assert (!found );
618
646
ts -> cid = dtm_get_cid ();
647
+ IncludeInTransactionList (ts );
619
648
}
620
649
x -> cid = ts -> cid ;
621
650
ts -> status = TRANSACTION_STATUS_COMMITTED ;
@@ -630,7 +659,7 @@ void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid)
630
659
631
660
SpinLockAcquire (& local -> lock );
632
661
{
633
- DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_FIND , NULL );
662
+ DtmTransId * id = (DtmTransId * )hash_search (gtid2xid , gtid , HASH_REMOVE , NULL );
634
663
Assert (id != NULL );
635
664
636
665
x -> is_global = true;
@@ -654,6 +683,7 @@ void DtmLocalAbort(DtmTransState* x)
654
683
} else {
655
684
Assert (!found );
656
685
ts -> cid = dtm_get_cid ();
686
+ IncludeInTransactionList (ts );
657
687
}
658
688
x -> cid = ts -> cid ;
659
689
ts -> status = TRANSACTION_STATUS_ABORTED ;
0 commit comments