Skip to content

Commit 7ba7388

Browse files
committed
2 parents 8873855 + a1fdefa commit 7ba7388

File tree

4 files changed

+50
-39
lines changed

4 files changed

+50
-39
lines changed

contrib/pg_dtm/tests/transfers.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ go run transfers.go \
66
-v \
77
-m \
88
-u 100000 \
9-
-w 10 \
9+
-w 8 \
1010
-g

contrib/pg_tsdtm/pg_dtm.c

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ static HTAB* gtid2xid;
7373
static DtmNodeState* local;
7474
static DtmTransState dtm_tx;
7575

76-
//static TransactionId DatmOldestXid = FirstNormalTransactionId;
77-
//static cid_t DtmOldestCid = INVALID_CID;
7876
static int DtmVacuumDelay;
7977

8078
static Snapshot DtmGetSnapshot(Snapshot snapshot);
@@ -301,8 +299,7 @@ Datum
301299
dtm_begin_prepare(PG_FUNCTION_ARGS)
302300
{
303301
GlobalTransactionId gtid = PG_GETARG_CSTRING(0);
304-
nodeid_t coordinator = PG_GETARG_INT32(1);
305-
DtmLocalBeginPrepare(gtid, coordinator);
302+
DtmLocalBeginPrepare(gtid);
306303
DTM_TRACE((stderr, "Backend %d begins prepare of transaction %s\n", getpid(), gtid));
307304
PG_RETURN_VOID();
308305
}
@@ -375,7 +372,6 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid)
375372
DtmTransStatus *ts, *prev = NULL;
376373
timestamp_t cutoff_time = dtm_get_current_time() - DtmVacuumDelay*USEC;
377374
SpinLockAcquire(&local->lock);
378-
#if 1
379375
for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next) {
380376
if (prev != NULL) {
381377
hash_search(xid2status, &prev->xid, HASH_REMOVE, NULL);
@@ -387,16 +383,6 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid)
387383
} else {
388384
xid = FirstNormalTransactionId;
389385
}
390-
#else
391-
ts = (DtmTransStatus*)hash_search(xid2status, &xid, HASH_FIND, NULL);
392-
if (ts == NULL || ts->cid + DtmVacuumDelay*USEC > dtm_get_current_time()) {
393-
xid = DtmOldestXid;
394-
} else /*if (ts->cid > DtmOldestCid)*/ {
395-
DTM_TRACE(("Set new oldest xid=%u csn=%lu now=%lu\n", xid, ts->cid, dtm_get_current_time()));
396-
DtmOldestXid = xid;
397-
DtmOldestCid = ts->cid;
398-
}
399-
#endif
400386
SpinLockRelease(&local->lock);
401387
}
402388
return xid;
@@ -419,7 +405,6 @@ TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
419405
bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
420406
{
421407
timestamp_t delay = MIN_WAIT_TIMEOUT;
422-
423408
Assert(xid != InvalidTransactionId);
424409

425410
SpinLockAcquire(&local->lock);

contrib/pg_tsdtm/pg_dtm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ cid_t DtmLocalExtend(DtmTransState* x, GlobalTransactionId gtid);
2323
/* Function called at first access to any datanode except first one involved in distributed transaction */
2424
cid_t DtmLocalAccess(DtmTransState* x, GlobalTransactionId gtid, cid_t snapshot);
2525
/* Mark transaction as in-doubt */
26-
void DtmLocalBeginPrepare(GlobalTransactionId gtid, nodeid_t coordinator);
26+
void DtmLocalBeginPrepare(GlobalTransactionId gtid);
2727
/* Choose CSN for global transaction */
2828
cid_t DtmLocalPrepare(GlobalTransactionId gtid, cid_t cid);
2929
/* Assign CSN to global transaction */

contrib/pg_tsdtm/tests/transfers.go

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
const (
1212
TRANSFER_CONNECTIONS = 8
1313
INIT_AMOUNT = 10000
14-
N_ITERATIONS = 100000
14+
N_ITERATIONS = 10000//0
1515
N_ACCOUNTS = 2//100000
1616
)
1717

@@ -92,6 +92,8 @@ func transfer(id int, wg *sync.WaitGroup) {
9292
var snapshot int64
9393
var csn int64
9494

95+
nGlobalTrans := 0
96+
9597
conn1, err := pgx.Connect(cfg1)
9698
checkErr(err)
9799
defer conn1.Close()
@@ -101,38 +103,62 @@ func transfer(id int, wg *sync.WaitGroup) {
101103
defer conn2.Close()
102104

103105
for i := 0; i < N_ITERATIONS; i++ {
106+
var dst *pgx.Conn
107+
var src *pgx.Conn
108+
choice := rand.Intn(3)-1
104109
gtid := strconv.Itoa(id) + "." + strconv.Itoa(i)
110+
if choice < 0 {
111+
src = conn1
112+
dst = conn1
113+
exec(conn1, "begin transaction")
114+
} else if choice > 0 {
115+
src = conn2
116+
dst = conn2
117+
exec(conn2, "begin transaction")
118+
} else {
119+
src = conn1
120+
dst = conn2
121+
exec(conn1, "begin transaction")
122+
exec(conn2, "begin transaction")
123+
snapshot = execQuery(conn1, "select dtm_extend($1)", gtid)
124+
snapshot = execQuery(conn2, "select dtm_access($1, $2)", snapshot, gtid)
125+
}
105126
//amount := 2*rand.Intn(2) - 1
106127
amount := 1
107128
account1 := rand.Intn(N_ACCOUNTS)
108129
account2 := rand.Intn(N_ACCOUNTS)
130+
131+
if account1 > account2 {
132+
tmp := account1
133+
account1 = account2
134+
account2 = tmp
135+
}
109136

110-
exec(conn1, "begin transaction")
111-
exec(conn2, "begin transaction")
112-
113-
snapshot = execQuery(conn1, "select dtm_extend($1)", gtid)
114-
snapshot = execQuery(conn2, "select dtm_access($1, $2)", snapshot, gtid)
115-
116-
exec(conn1, "update t set v = v + $1 where u=$2", amount, account1)
117-
exec(conn2, "update t set v = v - $1 where u=$2", amount, account2)
137+
exec(src, "update t set v = v - $1 where u=$2", amount, account1)
138+
exec(dst, "update t set v = v + $1 where u=$2", amount, account2)
118139

119-
exec(conn1, "prepare transaction '" + gtid + "'")
120-
exec(conn2, "prepare transaction '" + gtid + "'")
140+
if (src != dst) {
141+
exec(conn1, "prepare transaction '" + gtid + "'")
142+
exec(conn2, "prepare transaction '" + gtid + "'")
121143

122-
exec(conn1, "select dtm_begin_prepare($1)", gtid)
123-
exec(conn2, "select dtm_begin_prepare($1)", gtid)
144+
exec(conn1, "select dtm_begin_prepare($1)", gtid)
145+
exec(conn2, "select dtm_begin_prepare($1)", gtid)
124146

125-
csn = execQuery(conn1, "select dtm_prepare($1, 0)", gtid)
126-
csn = execQuery(conn2, "select dtm_prepare($1, $2)", gtid, csn)
147+
csn = execQuery(conn1, "select dtm_prepare($1, 0)", gtid)
148+
csn = execQuery(conn2, "select dtm_prepare($1, $2)", gtid, csn)
127149

128-
exec(conn1, "select dtm_end_prepare($1, $2)", gtid, csn)
129-
exec(conn2, "select dtm_end_prepare($1, $2)", gtid, csn)
150+
exec(conn1, "select dtm_end_prepare($1, $2)", gtid, csn)
151+
exec(conn2, "select dtm_end_prepare($1, $2)", gtid, csn)
130152

131-
exec(conn1, "commit prepared '" + gtid + "'")
132-
exec(conn2, "commit prepared '" + gtid + "'")
133-
}
153+
exec(conn1, "commit prepared '" + gtid + "'")
154+
exec(conn2, "commit prepared '" + gtid + "'")
155+
nGlobalTrans++
156+
} else {
157+
exec(dst, "commit")
158+
}
159+
}
134160

135-
fmt.Println("Test completed")
161+
fmt.Printf("Test completed, performed %d global transactions\n", nGlobalTrans)
136162
wg.Done()
137163
}
138164

0 commit comments

Comments
 (0)