4
4
"fmt"
5
5
"sync"
6
6
"math/rand"
7
+ "time"
7
8
"github.com/jackc/pgx"
8
9
)
9
10
@@ -34,10 +35,10 @@ var running = false
34
35
var nodes []int32 = []int32 {0 ,1 }
35
36
36
37
func asyncCommit (conn * pgx.Conn , wg * sync.WaitGroup ) {
37
- exec (conn , "commit" )
38
+ exec (conn , "commit" )
38
39
wg .Done ()
39
40
}
40
-
41
+
41
42
func commit (conn1 , conn2 * pgx.Conn ) {
42
43
var wg sync.WaitGroup
43
44
wg .Add (2 )
@@ -66,33 +67,52 @@ func prepare_db() {
66
67
exec (conn2 , "create extension pg_dtm" )
67
68
exec (conn2 , "drop table if exists t" )
68
69
exec (conn2 , "create table t(u int primary key, v int)" )
69
-
70
+
70
71
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
71
72
// exec(conn2, "select dtm_join_transaction($1)", xid)
72
73
73
74
// strt transaction
74
75
exec (conn1 , "begin transaction isolation level " + ISOLATION_LEVEL )
75
76
exec (conn2 , "begin transaction isolation level " + ISOLATION_LEVEL )
76
-
77
+
77
78
for i := 0 ; i < N_ACCOUNTS ; i ++ {
78
79
exec (conn1 , "insert into t values($1, $2)" , i , INIT_AMOUNT )
79
80
exec (conn2 , "insert into t values($1, $2)" , i , INIT_AMOUNT )
80
81
}
81
-
82
+
82
83
commit (conn1 , conn2 )
83
84
}
84
85
85
86
func max (a , b int64 ) int64 {
86
87
if a >= b {
87
88
return a
88
- }
89
+ }
89
90
return b
90
91
}
91
92
92
- func transfer (id int , wg * sync.WaitGroup ) {
93
+ func progress (total int , cCommits chan int , cAborts chan int ) {
94
+ commits := 0
95
+ aborts := 0
96
+ start := time .Now ()
97
+ for newcommits := range cCommits {
98
+ newaborts := <- cAborts
99
+ commits += newcommits
100
+ aborts += newaborts
101
+ if time .Since (start ).Seconds () > 1 {
102
+ fmt .Printf (
103
+ "progress %0.2f%%: %d commits, %d aborts\n " ,
104
+ float32 (commits ) * 100.0 / float32 (total ), commits , aborts ,
105
+ )
106
+ start = time .Now ()
107
+ }
108
+ }
109
+ }
110
+
111
+ func transfer (id int , cCommits chan int , cAborts chan int , wg * sync.WaitGroup ) {
93
112
var err error
94
113
var xid int32
95
- var nConflicts = 0
114
+ var nAborts = 0
115
+ var nCommits = 0
96
116
97
117
conn1 , err := pgx .Connect (cfg1 )
98
118
checkErr (err )
@@ -102,10 +122,11 @@ func transfer(id int, wg *sync.WaitGroup) {
102
122
checkErr (err )
103
123
defer conn2 .Close ()
104
124
105
- for i := 0 ; i < N_ITERATIONS ; i ++ {
106
- //amount := 2*rand.Intn(2) - 1
107
- amount := 1
108
- account1 := rand .Intn (N_ACCOUNTS )
125
+ start := time .Now ()
126
+ for nCommits < N_ITERATIONS {
127
+ amount := 2 * rand .Intn (2000 ) - 1
128
+ //amount := 1
129
+ account1 := rand .Intn (N_ACCOUNTS )
109
130
account2 := rand .Intn (N_ACCOUNTS )
110
131
111
132
xid = execQuery (conn1 , "select dtm_begin_transaction(2)" )
@@ -114,25 +135,34 @@ func transfer(id int, wg *sync.WaitGroup) {
114
135
// start transaction
115
136
exec (conn1 , "begin transaction isolation level " + ISOLATION_LEVEL )
116
137
exec (conn2 , "begin transaction isolation level " + ISOLATION_LEVEL )
117
-
118
- ok1 := execUpdate (conn1 , "update t set v = v + $1 where u=$2" , amount , account1 )
119
- ok2 := execUpdate (conn2 , "update t set v = v - $1 where u=$2" , amount , account2 )
120
- if ! ok1 || ! ok2 {
138
+
139
+ ok1 := execUpdate (conn1 , "update t set v = v + $1 where u=$2" , amount , account1 )
140
+ ok2 := execUpdate (conn2 , "update t set v = v - $1 where u=$2" , amount , account2 )
141
+ if ! ok1 || ! ok2 {
121
142
exec (conn1 , "rollback" )
122
143
exec (conn2 , "rollback" )
123
- nConflicts += 1
124
- i -= 1
125
- } else {
144
+ nAborts += 1
145
+ } else {
126
146
commit (conn1 , conn2 )
127
- }
147
+ nCommits += 1
148
+ }
149
+
150
+ if time .Since (start ).Seconds () > 1 {
151
+ cCommits <- nCommits
152
+ cAborts <- nAborts
153
+ nCommits = 0
154
+ nAborts = 0
155
+ start = time .Now ()
156
+ }
128
157
}
129
- fmt .Println ("Test completed with " ,nConflicts ," conflicts" )
158
+ cCommits <- nCommits
159
+ cAborts <- nAborts
130
160
wg .Done ()
131
161
}
132
162
133
163
func inspect (wg * sync.WaitGroup ) {
134
164
var sum1 , sum2 , sum int64
135
- var prevSum int64 = 0
165
+ var prevSum int64 = 0
136
166
var xid int32
137
167
138
168
{
@@ -142,28 +172,33 @@ func inspect(wg *sync.WaitGroup) {
142
172
conn2 , err := pgx .Connect (cfg2 )
143
173
checkErr (err )
144
174
145
- for running {
175
+ for running {
176
+ xid = execQuery (conn1 , "select dtm_begin_transaction(2)" )
177
+ exec (conn2 , "select dtm_join_transaction($1)" , xid )
178
+
179
+ exec (conn1 , "begin transaction isolation level " + ISOLATION_LEVEL )
180
+ exec (conn2 , "begin transaction isolation level " + ISOLATION_LEVEL )
181
+
182
+ sum1 = execQuery64 (conn1 , "select sum(v) from t" )
183
+ sum2 = execQuery64 (conn2 , "select sum(v) from t" )
184
+
185
+ sum = sum1 + sum2
186
+ if (sum != prevSum ) {
187
+ xmin1 := execQuery (conn1 , "select dtm_get_current_snapshot_xmin()" )
188
+ xmax1 := execQuery (conn1 , "select dtm_get_current_snapshot_xmax()" )
189
+ xmin2 := execQuery (conn2 , "select dtm_get_current_snapshot_xmin()" )
190
+ xmax2 := execQuery (conn2 , "select dtm_get_current_snapshot_xmax()" )
191
+ fmt .Printf (
192
+ "Total=%d xid=%d snap1=[%d, %d) snap2=[%d, %d)\n " ,
193
+ sum , xid , xmin1 , xmax1 , xmin2 , xmax2 ,
194
+ )
195
+ prevSum = sum
196
+ }
146
197
147
-
148
- xid = execQuery (conn1 , "select dtm_begin_transaction(2)" )
149
- exec (conn2 , "select dtm_join_transaction($1)" , xid )
150
-
151
- exec (conn1 , "begin transaction isolation level " + ISOLATION_LEVEL )
152
- exec (conn2 , "begin transaction isolation level " + ISOLATION_LEVEL )
153
-
154
- sum1 = execQuery64 (conn1 , "select sum(v) from t" )
155
- sum2 = execQuery64 (conn2 , "select sum(v) from t" )
156
-
157
- sum = sum1 + sum2
158
- if (sum != prevSum ) {
159
- fmt .Println ("Total = " , sum , "xid=" , xid , "snap1={" , execQuery (conn1 , "select dtm_get_current_snapshot_xmin()" ), execQuery (conn1 , "select dtm_get_current_snapshot_xmax()" ), "}, snap2={" , execQuery (conn2 , "select dtm_get_current_snapshot_xmin()" ), execQuery (conn2 , "select dtm_get_current_snapshot_xmax()" ), "}" )
160
- prevSum = sum
161
- }
162
-
163
- commit (conn1 , conn2 )
164
- }
165
- conn1 .Close ()
166
- conn2 .Close ()
198
+ commit (conn1 , conn2 )
199
+ }
200
+ conn1 .Close ()
201
+ conn2 .Close ()
167
202
}
168
203
wg .Done ()
169
204
}
@@ -174,9 +209,13 @@ func main() {
174
209
175
210
prepare_db ()
176
211
212
+ cCommits := make (chan int )
213
+ cAborts := make (chan int )
214
+ go progress (TRANSFER_CONNECTIONS * N_ITERATIONS , cCommits , cAborts )
215
+
177
216
transferWg .Add (TRANSFER_CONNECTIONS )
178
217
for i := 0 ; i < TRANSFER_CONNECTIONS ; i ++ {
179
- go transfer (i , & transferWg )
218
+ go transfer (i , cCommits , cAborts , & transferWg )
180
219
}
181
220
running = true
182
221
inspectWg .Add (1 )
@@ -185,6 +224,8 @@ func main() {
185
224
transferWg .Wait ()
186
225
running = false
187
226
inspectWg .Wait ()
227
+
228
+ fmt .Printf ("done\n " )
188
229
}
189
230
190
231
func exec (conn * pgx.Conn , stmt string , arguments ... interface {}) {
@@ -216,10 +257,11 @@ func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
216
257
checkErr (err )
217
258
return result
218
259
}
260
+
219
261
func checkErr (err error ) {
220
262
if err != nil {
221
263
panic (err )
222
264
}
223
265
}
224
266
225
-
267
+ // vim: expandtab ts=4 sts=4 sw=4
0 commit comments