Skip to content

Commit b54929c

Browse files
committed
Test for local transactions in tsDTM
1 parent 5bd148a commit b54929c

File tree

3 files changed

+42
-36
lines changed

3 files changed

+42
-36
lines changed

contrib/pg_dtm/tests/transfers.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ go run transfers.go \
66
-v \
77
-m \
88
-u 100000 \
9-
-w 10 \
9+
-w 8 \
1010
-g

contrib/pg_tsdtm/pg_dtm.c

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ static HTAB* gtid2xid;
7373
static DtmNodeState* local;
7474
static DtmTransState dtm_tx;
7575

76-
//static TransactionId DatmOldestXid = FirstNormalTransactionId;
77-
//static cid_t DtmOldestCid = INVALID_CID;
7876
static int DtmVacuumDelay;
7977

8078
static Snapshot DtmGetSnapshot(Snapshot snapshot);
@@ -374,7 +372,6 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid)
374372
DtmTransStatus *ts, *prev = NULL;
375373
timestamp_t cutoff_time = dtm_get_current_time() - DtmVacuumDelay*USEC;
376374
SpinLockAcquire(&local->lock);
377-
#if 1
378375
for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next) {
379376
if (prev != NULL) {
380377
hash_search(xid2status, &prev->xid, HASH_REMOVE, NULL);
@@ -386,16 +383,6 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid)
386383
} else {
387384
xid = FirstNormalTransactionId;
388385
}
389-
#else
390-
ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL);
391-
if (ts == NULL || ts->cid + DtmVacuumDelay*USEC > dtm_get_current_time()) {
392-
xid = DtmOldestXid;
393-
} else /*if (ts->cid > DtmOldestCid)*/ {
394-
DTM_TRACE(("Set new oldest xid=%u csn=%lu now=%lu\n", xid, ts->cid, dtm_get_current_time()));
395-
DtmOldestXid = xid;
396-
DtmOldestCid = ts->cid;
397-
}
398-
#endif
399386
SpinLockRelease(&local->lock);
400387
}
401388
return xid;
@@ -418,7 +405,6 @@ TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
418405
bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
419406
{
420407
timestamp_t delay = MIN_WAIT_TIMEOUT;
421-
422408
Assert(xid != InvalidTransactionId);
423409

424410
SpinLockAcquire(&local->lock);

contrib/pg_tsdtm/tests/transfers.go

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
const (
1212
TRANSFER_CONNECTIONS = 8
1313
INIT_AMOUNT = 10000
14-
N_ITERATIONS = 100000
14+
N_ITERATIONS = 10000//0
1515
N_ACCOUNTS = 2//100000
1616
)
1717

@@ -92,6 +92,8 @@ func transfer(id int, wg *sync.WaitGroup) {
9292
var snapshot int64
9393
var csn int64
9494

95+
nGlobalTrans := 0
96+
9597
conn1, err := pgx.Connect(cfg1)
9698
checkErr(err)
9799
defer conn1.Close()
@@ -101,38 +103,56 @@ func transfer(id int, wg *sync.WaitGroup) {
101103
defer conn2.Close()
102104

103105
for i := 0; i < N_ITERATIONS; i++ {
106+
var dst *pgx.Conn
107+
var src *pgx.Conn
108+
choice := rand.Intn(3)-1
104109
gtid := strconv.Itoa(id) + "." + strconv.Itoa(i)
110+
if choice < 0 {
111+
src = conn1
112+
dst = conn1
113+
exec(conn1, "begin transaction")
114+
} else if choice > 0 {
115+
src = conn2
116+
dst = conn2
117+
exec(conn2, "begin transaction")
118+
} else {
119+
src = conn1
120+
dst = conn2
121+
exec(conn1, "begin transaction")
122+
exec(conn2, "begin transaction")
123+
snapshot = execQuery(conn1, "select dtm_extend($1)", gtid)
124+
snapshot = execQuery(conn2, "select dtm_access($1, $2)", snapshot, gtid)
125+
}
105126
//amount := 2*rand.Intn(2) - 1
106127
amount := 1
107128
account1 := rand.Intn(N_ACCOUNTS)
108129
account2 := rand.Intn(N_ACCOUNTS)
109130

110-
exec(conn1, "begin transaction")
111-
exec(conn2, "begin transaction")
112-
113-
snapshot = execQuery(conn1, "select dtm_extend($1)", gtid)
114-
snapshot = execQuery(conn2, "select dtm_access($1, $2)", snapshot, gtid)
131+
exec(src, "update t set v = v - $1 where u=$2", amount, account1)
132+
exec(dst, "update t set v = v + $1 where u=$2", amount, account2)
115133

116-
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
117-
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)
134+
if (src != dst) {
135+
exec(conn1, "prepare transaction '" + gtid + "'")
136+
exec(conn2, "prepare transaction '" + gtid + "'")
118137

119-
exec(conn1, "prepare transaction '" + gtid + "'")
120-
exec(conn2, "prepare transaction '" + gtid + "'")
138+
exec(conn1, "select dtm_begin_prepare($1)", gtid)
139+
exec(conn2, "select dtm_begin_prepare($1)", gtid)
121140

122-
exec(conn1, "select dtm_begin_prepare($1)", gtid)
123-
exec(conn2, "select dtm_begin_prepare($1)", gtid)
141+
csn = execQuery(conn1, "select dtm_prepare($1, 0)", gtid)
142+
csn = execQuery(conn2, "select dtm_prepare($1, $2)", gtid, csn)
124143

125-
csn = execQuery(conn1, "select dtm_prepare($1, 0)", gtid)
126-
csn = execQuery(conn2, "select dtm_prepare($1, $2)", gtid, csn)
144+
exec(conn1, "select dtm_end_prepare($1, $2)", gtid, csn)
145+
exec(conn2, "select dtm_end_prepare($1, $2)", gtid, csn)
127146

128-
exec(conn1, "select dtm_end_prepare($1, $2)", gtid, csn)
129-
exec(conn2, "select dtm_end_prepare($1, $2)", gtid, csn)
130-
131-
exec(conn1, "commit prepared '" + gtid + "'")
132-
exec(conn2, "commit prepared '" + gtid + "'")
133-
}
147+
exec(conn1, "commit prepared '" + gtid + "'")
148+
exec(conn2, "commit prepared '" + gtid + "'")
149+
nGlobalTrans++
150+
} else {
151+
exec(dst, "commit")
152+
}
153+
}
134154

135-
fmt.Println("Test completed")
155+
fmt.Println("Test completed, performed %d global transactions", nGlobalTrans)
136156
wg.Done()
137157
}
138158

0 commit comments

Comments
 (0)