@@ -43,7 +43,6 @@ func (t TransfersTS) prepare_one(connstr string, wg *sync.WaitGroup) {
43
43
}
44
44
45
45
func (t TransfersTS ) writer (id int , cCommits chan int , cAborts chan int , wg * sync.WaitGroup ) {
46
- var conns []* pgx.Conn
47
46
var nGlobalTrans = 0
48
47
var snapshot int64
49
48
var csn int64
@@ -52,45 +51,64 @@ func (t TransfersTS) writer(id int, cCommits chan int, cAborts chan int, wg *syn
52
51
cfg .ConnStrs .Set (cfg .ConnStrs [0 ])
53
52
}
54
53
55
- for _ , connstr := range cfg .ConnStrs {
56
- dbconf , err := pgx .ParseDSN (connstr )
57
- checkErr (err )
58
- conn , err := pgx .Connect (dbconf )
59
- checkErr (err )
60
- defer conn .Close ()
61
- conns = append (conns , conn )
62
- }
54
+ // for _, connstr := range cfg.ConnStrs {
55
+ // dbconf, err := pgx.ParseDSN(connstr)
56
+ // checkErr(err)
57
+ // conn, err := pgx.Connect(dbconf)
58
+ // checkErr(err)
59
+ // defer conn.Close()
60
+ // conns = append(conns, conn)
61
+ // }
62
+
63
+ dbconf1 , err := pgx .ParseDSN (cfg .ConnStrs [ id % len (cfg .ConnStrs ) ])
64
+ checkErr (err )
65
+ conn1 , err := pgx .Connect (dbconf1 )
66
+ checkErr (err )
67
+ defer conn1 .Close ()
68
+
69
+ dbconf2 , err := pgx .ParseDSN (cfg .ConnStrs [ (id + 1 ) % len (cfg .ConnStrs ) ])
70
+ checkErr (err )
71
+ conn2 , err := pgx .Connect (dbconf2 )
72
+ checkErr (err )
73
+ defer conn2 .Close ()
63
74
64
75
65
76
for i := 0 ; i < cfg .IterNum ; i ++ {
66
77
67
-
68
78
gtid := strconv .Itoa (id ) + "." + strconv .Itoa (i )
69
79
amount := 2 * rand .Intn (2 ) - 1
70
80
from_acc := rand .Intn (cfg .AccountsNum )//cfg.Writers.StartId + 2*id + 1
71
81
to_acc := rand .Intn (cfg .AccountsNum )//cfg.Writers.StartId + 2*id + 2
72
82
73
- conn1 := conns [rand .Intn (len (conns ))]
74
- conn2 := conns [rand .Intn (len (conns ))]
75
- for conn1 == conn2 {
76
- conn1 = conns [rand .Intn (len (conns ))]
77
- conn2 = conns [rand .Intn (len (conns ))]
78
- }
83
+ // conn1 := conns[rand.Intn(len(conns))]
84
+ // conn2 := conns[rand.Intn(len(conns))]
85
+ // for conn1 == conn2 {
86
+ // conn1 = conns[rand.Intn(len(conns))]
87
+ // conn2 = conns[rand.Intn(len(conns))]
88
+ // }
79
89
80
90
exec (conn1 , "begin transaction" )
81
91
exec (conn2 , "begin transaction" )
82
- snapshot = _execQuery (conn1 , "select dtm_extend($1)" , gtid )
83
- snapshot = _execQuery (conn2 , "select dtm_access($1, $2)" , snapshot , gtid )
92
+
93
+ if cfg .UseDtm {
94
+ snapshot = _execQuery (conn1 , "select dtm_extend($1)" , gtid )
95
+ snapshot = _execQuery (conn2 , "select dtm_access($1, $2)" , snapshot , gtid )
96
+ }
97
+
84
98
exec (conn1 , "update t set v = v - $1 where u=$2" , amount , from_acc )
85
99
exec (conn2 , "update t set v = v + $1 where u=$2" , amount , to_acc )
86
100
exec (conn1 , "prepare transaction '" + gtid + "'" )
87
101
exec (conn2 , "prepare transaction '" + gtid + "'" )
88
- exec (conn1 , "select dtm_begin_prepare($1)" , gtid )
89
- exec (conn2 , "select dtm_begin_prepare($1)" , gtid )
90
- csn = _execQuery (conn1 , "select dtm_prepare($1, 0)" , gtid )
91
- csn = _execQuery (conn2 , "select dtm_prepare($1, $2)" , gtid , csn )
92
- exec (conn1 , "select dtm_end_prepare($1, $2)" , gtid , csn )
93
- exec (conn2 , "select dtm_end_prepare($1, $2)" , gtid , csn )
102
+
103
+ if cfg .UseDtm {
104
+ exec (conn1 , "select dtm_begin_prepare($1)" , gtid )
105
+ exec (conn2 , "select dtm_begin_prepare($1)" , gtid )
106
+ csn = _execQuery (conn1 , "select dtm_prepare($1, 0)" , gtid )
107
+ csn = _execQuery (conn2 , "select dtm_prepare($1, $2)" , gtid , csn )
108
+ exec (conn1 , "select dtm_end_prepare($1, $2)" , gtid , csn )
109
+ exec (conn2 , "select dtm_end_prepare($1, $2)" , gtid , csn )
110
+ }
111
+
94
112
exec (conn1 , "commit prepared '" + gtid + "'" )
95
113
exec (conn2 , "commit prepared '" + gtid + "'" )
96
114
nGlobalTrans ++
0 commit comments