6
6
"os"
7
7
"sync"
8
8
"math/rand"
9
+ "strconv"
9
10
"time"
10
11
"github.com/jackc/pgx"
11
12
)
@@ -306,7 +307,7 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
306
307
conns = append (conns , conn )
307
308
}
308
309
309
- start := time .Now ()
310
+ // start := time.Now()
310
311
for myCommits < cfg .Writers .Updates {
311
312
amount := 1
312
313
@@ -320,43 +321,20 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
320
321
}
321
322
322
323
if cfg .UseDtm {
323
- xid := execQuery (src , "select dtm_begin_transaction()" )
324
- exec (dst , "select dtm_join_transaction($1)" , xid )
324
+ xid := execQuery (src , "select dtm_begin_transaction(); begin transaction isolation level " + cfg . Isolation )
325
+ exec (dst , "select dtm_join_transaction(" + strconv . Itoa ( xid ) + "); begin transaction isolation level " + cfg . Isolation )
325
326
}
326
327
327
- parallel_exec ([]* pgx.Conn {src ,dst }, []string {"begin transaction isolation level " + cfg .Isolation , "begin transaction isolation level " + cfg .Isolation })
328
+ // parallel_exec([]*pgx.Conn{src,dst}, []string{"begin transaction isolation level " + cfg.Isolation, "begin transaction isolation level " + cfg.Isolation})
328
329
329
330
ok := true
330
- if (cfg .Writers .UseCursors ) {
331
- exec (
332
- src ,
333
- "declare cur0 cursor for select * from t where u=$1 for update" ,
334
- from_acc ,
335
- )
336
- exec (
337
- dst ,
338
- "declare cur0 cursor for select * from t where u=$1 for update" ,
339
- to_acc ,
340
- )
341
-
342
- ok = execUpdate (src , "fetch from cur0" ) && ok
343
- ok = execUpdate (dst , "fetch from cur0" ) && ok
344
-
345
- ok = execUpdate (
346
- src , "update t set v = v - $1 where current of cur0" ,
347
- amount ,
348
- ) && ok
349
- ok = execUpdate (
350
- dst , "update t set v = v + $1 where current of cur0" ,
351
- amount ,
352
- ) && ok
353
- } else {
354
331
355
- sql1 := fmt .Sprintf ("update t set v = v - %d where u=%d" , amount , from_acc )
356
- sql2 := fmt .Sprintf ("update t set v = v + %d where u=%d" , amount , to_acc )
357
332
358
- ok = parallel_exec ([]* pgx.Conn {src ,dst }, []string {sql1 ,sql2 })
359
- }
333
+ sql1 := "update t set v = v - " + strconv .Itoa (amount ) + " where u=" + strconv .Itoa (from_acc )
334
+ sql2 := "update t set v = v + " + strconv .Itoa (amount ) + " where u=" + strconv .Itoa (to_acc )
335
+
336
+ ok = parallel_exec ([]* pgx.Conn {src ,dst }, []string {sql1 ,sql2 })
337
+
360
338
361
339
if ok {
362
340
commit (src , dst )
@@ -368,13 +346,13 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
368
346
nAborts += 1
369
347
}
370
348
371
- if time .Since (start ).Seconds () > 1 {
372
- cCommits <- nCommits
373
- cAborts <- nAborts
374
- nCommits = 0
375
- nAborts = 0
376
- start = time .Now ()
377
- }
349
+ // if time.Since(start).Seconds() > 1 {
350
+ // cCommits <- nCommits
351
+ // cAborts <- nAborts
352
+ // nCommits = 0
353
+ // nAborts = 0
354
+ // start = time.Now()
355
+ // }
378
356
}
379
357
cCommits <- nCommits
380
358
cAborts <- nAborts
0 commit comments