@@ -15,6 +15,8 @@ const (
15
15
N_ACCOUNTS = TRANSFER_CONNECTIONS //100000
16
16
//ISOLATION_LEVEL = "repeatable read"
17
17
ISOLATION_LEVEL = "read committed"
18
+ GLOBAL_UPDATES = true
19
+ LOCAL_UPDATES = false
18
20
)
19
21
20
22
@@ -115,13 +117,15 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
115
117
var nCommits = 0
116
118
var myCommits = 0
117
119
118
- conn1 , err := pgx .Connect (cfg1 )
120
+ var conn [2 ]* pgx.Conn
121
+
122
+ conn [0 ], err = pgx .Connect (cfg1 )
119
123
checkErr (err )
120
- defer conn1 .Close ()
124
+ defer conn [ 0 ] .Close ()
121
125
122
- conn2 , err : = pgx .Connect (cfg2 )
126
+ conn [ 1 ] , err = pgx .Connect (cfg2 )
123
127
checkErr (err )
124
- defer conn2 .Close ()
128
+ defer conn [ 1 ] .Close ()
125
129
126
130
start := time .Now ()
127
131
for myCommits < N_ITERATIONS {
@@ -130,23 +134,57 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
130
134
account1 := rand .Intn (N_ACCOUNTS )
131
135
account2 := rand .Intn (N_ACCOUNTS )
132
136
133
- xid = execQuery (conn1 , "select dtm_begin_transaction(2)" )
134
- exec (conn2 , "select dtm_join_transaction($1)" , xid )
137
+ if (account1 >= account2 ) {
138
+ continue
139
+ }
140
+
141
+ src := conn [rand .Intn (2 )]
142
+ dst := conn [rand .Intn (2 )]
135
143
136
- // start transaction
137
- exec (conn1 , "begin transaction isolation level " + ISOLATION_LEVEL )
138
- exec (conn2 , "begin transaction isolation level " + ISOLATION_LEVEL )
144
+ if src == dst {
145
+ // local update
146
+ if ! LOCAL_UPDATES {
147
+ // which we do not want
148
+ continue
149
+ }
139
150
140
- ok1 := execUpdate (conn1 , "update t set v = v + $1 where u=$2" , amount , account1 )
141
- ok2 := execUpdate (conn2 , "update t set v = v - $1 where u=$2" , amount , account2 )
142
- if ! ok1 || ! ok2 {
143
- exec (conn1 , "rollback" )
144
- exec (conn2 , "rollback" )
145
- nAborts += 1
151
+ exec (src , "begin transaction isolation level " + ISOLATION_LEVEL )
152
+ ok1 := execUpdate (src , "update t set v = v - $1 where u=$2" , amount , account1 )
153
+ ok2 := execUpdate (src , "update t set v = v + $1 where u=$2" , amount , account2 )
154
+ if ! ok1 || ! ok2 {
155
+ exec (src , "rollback" )
156
+ nAborts += 1
157
+ } else {
158
+ exec (src , "commit" )
159
+ nCommits += 1
160
+ myCommits += 1
161
+ }
146
162
} else {
147
- commit (conn1 , conn2 )
148
- nCommits += 1
149
- myCommits += 1
163
+ // global update
164
+ if ! GLOBAL_UPDATES {
165
+ // which we do not want
166
+ continue
167
+ }
168
+
169
+ xid = execQuery (src , "select dtm_begin_transaction(2)" )
170
+ exec (dst , "select dtm_join_transaction($1)" , xid )
171
+
172
+ // start transaction
173
+ exec (src , "begin transaction isolation level " + ISOLATION_LEVEL )
174
+ exec (dst , "begin transaction isolation level " + ISOLATION_LEVEL )
175
+
176
+ ok1 := execUpdate (src , "update t set v = v - $1 where u=$2" , amount , account1 )
177
+ ok2 := execUpdate (dst , "update t set v = v + $1 where u=$2" , amount , account2 )
178
+
179
+ if ! ok1 || ! ok2 {
180
+ exec (src , "rollback" )
181
+ exec (dst , "rollback" )
182
+ nAborts += 1
183
+ } else {
184
+ commit (src , dst )
185
+ nCommits += 1
186
+ myCommits += 1
187
+ }
150
188
}
151
189
152
190
if time .Since (start ).Seconds () > 1 {
@@ -241,6 +279,9 @@ func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
241
279
var err error
242
280
// fmt.Println(stmt)
243
281
_ , err = conn .Exec (stmt , arguments ... )
282
+ if err != nil {
283
+ fmt .Println (err )
284
+ }
244
285
return err == nil
245
286
}
246
287
0 commit comments