@@ -3,7 +3,7 @@ package main
3
3
import (
4
4
"fmt"
5
5
"sync"
6
- "math/rand"
6
+ // "math/rand"
7
7
"github.com/jackc/pgx"
8
8
)
9
9
@@ -94,6 +94,7 @@ func max(a, b int64) int64 {
94
94
func transfer (id int , wg * sync.WaitGroup ) {
95
95
var err error
96
96
var xids []int32 = make ([]int32 , 2 )
97
+ var nConflicts = 0
97
98
98
99
conn1 , err := pgx .Connect (cfg1 )
99
100
checkErr (err )
@@ -106,12 +107,12 @@ func transfer(id int, wg *sync.WaitGroup) {
106
107
for i := 0 ; i < N_ITERATIONS ; i ++ {
107
108
//amount := 2*rand.Intn(2) - 1
108
109
amount := 1
109
- account1 := rand .Intn (N_ACCOUNTS )
110
- account2 := rand .Intn (N_ACCOUNTS )
110
+ account1 := id // rand.Intn(N_ACCOUNTS)
111
+ account2 := id // rand.Intn(N_ACCOUNTS)
111
112
112
113
// strt transaction
113
- exec (conn1 , "begin" )
114
- exec (conn2 , "begin" )
114
+ exec (conn1 , "begin transaction isolation level repeatable read " )
115
+ exec (conn2 , "begin transaction isolation level repeatable read " )
115
116
116
117
// obtain XIDs of paticipants
117
118
xids [0 ] = execQuery (conn1 , "select txid_current()" )
@@ -121,13 +122,17 @@ func transfer(id int, wg *sync.WaitGroup) {
121
122
exec (conn1 , "select dtm_begin_transaction($1, $2)" , nodes , xids )
122
123
exec (conn2 , "select dtm_begin_transaction($1, $2)" , nodes , xids )
123
124
124
- exec (conn1 , "update t set v = v + $1 where u=$2" , amount , account1 )
125
- exec (conn2 , "update t set v = v - $1 where u=$2" , amount , account2 )
126
-
127
- commit (conn1 , conn2 )
125
+ if ! execUpdate (conn1 , "update t set v = v + $1 where u=$2" , amount , account1 ) ||
126
+ ! execUpdate (conn2 , "update t set v = v - $1 where u=$2" , amount , account2 ) {
127
+ exec (conn1 , "rollback" )
128
+ exec (conn2 , "rollback" )
129
+ nConflicts += 1
130
+ i -= 1
131
+ } else {
132
+ commit (conn1 , conn2 )
133
+ }
128
134
}
129
-
130
- fmt .Println ("Test completed" )
135
+ fmt .Println ("Test completed with " ,nConflicts ," conflicts" )
131
136
wg .Done ()
132
137
}
133
138
@@ -143,8 +148,8 @@ func inspect(wg *sync.WaitGroup) {
143
148
conn2 , err := pgx .Connect (cfg2 )
144
149
checkErr (err )
145
150
146
- exec (conn1 , "begin" )
147
- exec (conn2 , "begin" )
151
+ exec (conn1 , "begin transaction isolation level repeatable read " )
152
+ exec (conn2 , "begin transaction isolation level repeatable read " )
148
153
149
154
// obtain XIDs of paticipants
150
155
xids [0 ] = execQuery (conn1 , "select txid_current()" )
@@ -192,10 +197,17 @@ func main() {
192
197
func exec (conn * pgx.Conn , stmt string , arguments ... interface {}) {
193
198
var err error
194
199
// fmt.Println(stmt)
195
- _ , _ = conn .Exec (stmt , arguments ... )
200
+ _ , err = conn .Exec (stmt , arguments ... )
196
201
checkErr (err )
197
202
}
198
203
204
+ func execUpdate (conn * pgx.Conn , stmt string , arguments ... interface {}) bool {
205
+ var err error
206
+ // fmt.Println(stmt)
207
+ _ , err = conn .Exec (stmt , arguments ... )
208
+ return err == nil
209
+ }
210
+
199
211
func execQuery (conn * pgx.Conn , stmt string , arguments ... interface {}) int32 {
200
212
var err error
201
213
var result int64
0 commit comments