@@ -299,22 +299,67 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
299
299
dst := conns [rand .Intn (len (conns ))]
300
300
301
301
if src == dst {
302
- // local update
303
- if ! cfg .Writers .AllowLocal {
304
- // which we do not want
305
- continue
306
- }
307
-
308
- exec (src , "begin transaction isolation level " + cfg .Isolation )
309
- ok1 := execUpdate (src , "update t set v = v - $1 where u=$2" , amount , from_acc )
310
- ok2 := execUpdate (src , "update t set v = v + $1 where u=$2" , amount , to_acc )
311
- if ! ok1 || ! ok2 {
312
- exec (src , "rollback" )
313
- nAborts += 1
302
+ if cfg .Writers .AllowLocal {
303
+ // local update
304
+ exec (src , "begin transaction isolation level " + cfg .Isolation )
305
+ ok1 := execUpdate (src , "update t set v = v - $1 where u=$2" , amount , from_acc )
306
+ ok2 := execUpdate (src , "update t set v = v + $1 where u=$2" , amount , to_acc )
307
+ if ! ok1 || ! ok2 {
308
+ exec (src , "rollback" )
309
+ nAborts += 1
310
+ } else {
311
+ exec (src , "commit" )
312
+ nCommits += 1
313
+ myCommits += 1
314
+ }
314
315
} else {
315
- exec (src , "commit" )
316
- nCommits += 1
317
- myCommits += 1
316
+ if len (conns ) > 1 {
317
+ continue
318
+ }
319
+
320
+ // global single-node update
321
+ execQuery (src , "select dtm_begin_transaction()" )
322
+
323
+ // start transaction
324
+ exec (src , "begin transaction isolation level " + cfg .Isolation )
325
+
326
+ ok := true
327
+ if (cfg .Writers .UseCursors ) {
328
+ exec (
329
+ src ,
330
+ "declare cur0 cursor for select * from t where u=$1 for update" ,
331
+ from_acc ,
332
+ )
333
+
334
+ ok = execUpdate (src , "fetch from cur0" ) && ok
335
+
336
+ ok = execUpdate (
337
+ src , "update t set v = v - $1 where current of cur0" ,
338
+ amount ,
339
+ ) && ok
340
+ ok = execUpdate (
341
+ src , "update t set v = v + $1 where current of cur0" ,
342
+ amount ,
343
+ ) && ok
344
+ } else {
345
+ ok = execUpdate (
346
+ src , "update t set v = v - $1 where u=$2" ,
347
+ amount , from_acc ,
348
+ ) && ok
349
+ ok = execUpdate (
350
+ src , "update t set v = v + $1 where u=$2" ,
351
+ amount , to_acc ,
352
+ ) && ok
353
+ }
354
+
355
+ if ok {
356
+ commit (src )
357
+ nCommits += 1
358
+ myCommits += 1
359
+ } else {
360
+ exec (src , "rollback" )
361
+ nAborts += 1
362
+ }
318
363
}
319
364
} else {
320
365
// global update
0 commit comments