Skip to content

Commit 3c5a287

Browse files
committed
Cleanup transaction map in tsdtm
1 parent 9984931 commit 3c5a287

File tree

2 files changed

+45
-15
lines changed

2 files changed

+45
-15
lines changed

contrib/pg_tsdtm/pg_dtm.c

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,24 @@
4141

4242
typedef uint64 timestamp_t;
4343

44-
typedef struct
45-
{
46-
nodeid_t node_id;
47-
cid_t cid;
48-
volatile slock_t lock;
49-
} DtmNodeState;
50-
51-
typedef struct
44+
typedef struct DtmTransStatus
5245
{
5346
TransactionId xid;
5447
XidStatus status;
5548
bool is_coordinator;
5649
cid_t cid;
50+
struct DtmTransStatus* next;
5751
} DtmTransStatus;
5852

53+
typedef struct
54+
{
55+
nodeid_t node_id;
56+
cid_t cid;
57+
volatile slock_t lock;
58+
DtmTransStatus* trans_list_head;
59+
DtmTransStatus** trans_list_tail;
60+
} DtmNodeState;
61+
5962
typedef struct
6063
{
6164
char gtid[MAX_GTID_SIZE];
@@ -72,8 +75,8 @@ static HTAB* gtid2xid;
7275
static DtmNodeState* local;
7376
static DtmTransState dtm_tx;
7477

75-
static TransactionId DtmOldestXid = FirstNormalTransactionId;
76-
static cid_t DtmOldestCid = INVALID_CID;
78+
//static TransactionId DatmOldestXid = FirstNormalTransactionId;
79+
//static cid_t DtmOldestCid = INVALID_CID;
7780
static int DtmVacuumDelay;
7881

7982
static Snapshot DtmGetSnapshot(Snapshot snapshot);
@@ -369,11 +372,32 @@ static int dtm_gtid_match_fn(const void *key1, const void *key2, Size keysize)
369372
return strcmp((GlobalTransactionId)key1, (GlobalTransactionId)key2);
370373
}
371374

375+
static void IncludeInTransactionList(DtmTransStatus* ts)
376+
{
377+
ts->next = NULL;
378+
*local->trans_list_tail = ts;
379+
local->trans_list_tail = &ts->next;
380+
}
381+
372382
static TransactionId DtmAdjustOldestXid(TransactionId xid)
373383
{
374384
if (TransactionIdIsValid(xid)) {
375-
DtmTransStatus* ts;
385+
DtmTransStatus *ts, *prev = NULL;
386+
timestamp_t cutoff_time = dtm_get_current_time() - DtmVacuumDelay*USEC;
376387
SpinLockAcquire(&local->lock);
388+
#if 1
389+
for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next) {
390+
if (prev != NULL) {
391+
hash_search(xid2status, &prev->xid, HASH_REMOVE, NULL);
392+
}
393+
}
394+
if (prev != NULL) {
395+
local->trans_list_head = prev;
396+
xid = prev->xid;
397+
} else {
398+
xid = FirstNormalTransactionId;
399+
}
400+
#else
377401
ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL);
378402
if (ts == NULL || ts->cid + DtmVacuumDelay*USEC > dtm_get_current_time()) {
379403
xid = DtmOldestXid;
@@ -382,6 +406,7 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid)
382406
DtmOldestXid = xid;
383407
DtmOldestCid = ts->cid;
384408
}
409+
#endif
385410
SpinLockRelease(&local->lock);
386411
}
387412
return xid;
@@ -481,6 +506,8 @@ void DtmInitialize()
481506
{
482507
local->cid = dtm_get_current_time();
483508
local->node_id = -1;
509+
local->trans_list_head = NULL;
510+
local->trans_list_tail = &local->trans_list_head;
484511
SpinLockInit(&local->lock);
485512
RegisterXactCallback(dtm_xact_callback, NULL);
486513
}
@@ -548,8 +575,9 @@ void DtmLocalBeginPrepare(GlobalTransactionId gtid, nodeid_t coordinator)
548575

549576
ts = (DtmTransStatus*)hash_search(xid2status, &id->xid, HASH_ENTER, NULL);
550577
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
551-
ts->cid = INVALID_CID;
578+
ts->cid = dtm_get_cid();
552579
ts->is_coordinator = coordinator == local->node_id;
580+
IncludeInTransactionList(ts);
553581
}
554582
SpinLockRelease(&local->lock);
555583
}
@@ -593,7 +621,7 @@ void DtmLocalCommitPrepared(DtmTransState* x, GlobalTransactionId gtid)
593621

594622
SpinLockAcquire(&local->lock);
595623
{
596-
DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_FIND, NULL);
624+
DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_REMOVE, NULL);
597625
Assert(id != NULL);
598626

599627
x->is_global = true;
@@ -616,6 +644,7 @@ void DtmLocalCommit(DtmTransState* x)
616644
} else {
617645
Assert(!found);
618646
ts->cid = dtm_get_cid();
647+
IncludeInTransactionList(ts);
619648
}
620649
x->cid = ts->cid;
621650
ts->status = TRANSACTION_STATUS_COMMITTED;
@@ -630,7 +659,7 @@ void DtmLocalAbortPrepared(DtmTransState* x, GlobalTransactionId gtid)
630659

631660
SpinLockAcquire(&local->lock);
632661
{
633-
DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_FIND, NULL);
662+
DtmTransId* id = (DtmTransId*)hash_search(gtid2xid, gtid, HASH_REMOVE, NULL);
634663
Assert(id != NULL);
635664

636665
x->is_global = true;
@@ -654,6 +683,7 @@ void DtmLocalAbort(DtmTransState* x)
654683
} else {
655684
Assert(!found);
656685
ts->cid = dtm_get_cid();
686+
IncludeInTransactionList(ts);
657687
}
658688
x->cid = ts->cid;
659689
ts->status = TRANSACTION_STATUS_ABORTED;

contrib/pg_tsdtm/tests/transfers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
const (
1212
TRANSFER_CONNECTIONS = 8
1313
INIT_AMOUNT = 10000
14-
N_ITERATIONS = 10000
14+
N_ITERATIONS = 100000
1515
N_ACCOUNTS = 2//100000
1616
)
1717

0 commit comments

Comments
 (0)