Skip to content

Commit c7644ba

Browse files
committed
Add local transaction to the transfers test.
1 parent 2513a48 commit c7644ba

File tree

1 file changed

+42
-20
lines changed

1 file changed

+42
-20
lines changed

contrib/pg_xtm/tests/transfers.go

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,15 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
115115
var nCommits = 0
116116
var myCommits = 0
117117

118-
conn1, err := pgx.Connect(cfg1)
118+
var conn [2]*pgx.Conn
119+
120+
conn[0], err = pgx.Connect(cfg1)
119121
checkErr(err)
120-
defer conn1.Close()
122+
defer conn[0].Close()
121123

122-
conn2, err := pgx.Connect(cfg2)
124+
conn[1], err = pgx.Connect(cfg2)
123125
checkErr(err)
124-
defer conn2.Close()
126+
defer conn[1].Close()
125127

126128
start := time.Now()
127129
for myCommits < N_ITERATIONS {
@@ -130,23 +132,43 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
130132
account1 := rand.Intn(N_ACCOUNTS)
131133
account2 := rand.Intn(N_ACCOUNTS)
132134

133-
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
134-
exec(conn2, "select dtm_join_transaction($1)", xid)
135-
136-
// start transaction
137-
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
138-
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
139-
140-
ok1 := execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1)
141-
ok2 := execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2)
142-
if !ok1 || !ok2 {
143-
exec(conn1, "rollback")
144-
exec(conn2, "rollback")
145-
nAborts += 1
135+
src := conn[rand.Intn(2)]
136+
dst := conn[rand.Intn(2)]
137+
138+
if src == dst {
139+
// local transaction
140+
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
141+
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
142+
ok2 := execUpdate(src, "update t set v = v + $1 where u=$2", amount, account2)
143+
if !ok1 || !ok2 {
144+
exec(src, "rollback")
145+
nAborts += 1
146+
} else {
147+
exec(src, "commit")
148+
nCommits += 1
149+
myCommits += 1
150+
}
146151
} else {
147-
commit(conn1, conn2)
148-
nCommits += 1
149-
myCommits += 1
152+
// global transaction
153+
xid = execQuery(src, "select dtm_begin_transaction(2)")
154+
exec(dst, "select dtm_join_transaction($1)", xid)
155+
156+
// start transaction
157+
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
158+
exec(dst, "begin transaction isolation level " + ISOLATION_LEVEL)
159+
160+
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
161+
ok2 := execUpdate(dst, "update t set v = v + $1 where u=$2", amount, account2)
162+
163+
if !ok1 || !ok2 {
164+
exec(src, "rollback")
165+
exec(dst, "rollback")
166+
nAborts += 1
167+
} else {
168+
commit(src, dst)
169+
nCommits += 1
170+
myCommits += 1
171+
}
150172
}
151173

152174
if time.Since(start).Seconds() > 1 {

0 commit comments

Comments
 (0)