4
4
"fmt"
5
5
"sync"
6
6
"math/rand"
7
+ "time"
7
8
"github.com/jackc/pgx"
8
9
)
9
10
@@ -34,10 +35,10 @@ var running = false
34
35
var nodes []int32 = []int32 {0 ,1 }
35
36
36
37
func asyncCommit (conn * pgx.Conn , wg * sync.WaitGroup ) {
37
- exec (conn , "commit" )
38
+ exec (conn , "commit" )
38
39
wg .Done ()
39
40
}
40
-
41
+
41
42
func commit (conn1 , conn2 * pgx.Conn ) {
42
43
var wg sync.WaitGroup
43
44
wg .Add (2 )
@@ -66,33 +67,53 @@ func prepare_db() {
66
67
exec (conn2 , "create extension pg_dtm" )
67
68
exec (conn2 , "drop table if exists t" )
68
69
exec (conn2 , "create table t(u int primary key, v int)" )
69
-
70
+
70
71
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
71
72
// exec(conn2, "select dtm_join_transaction($1)", xid)
72
73
73
74
// strt transaction
74
75
exec (conn1 , "begin transaction isolation level " + ISOLATION_LEVEL )
75
76
exec (conn2 , "begin transaction isolation level " + ISOLATION_LEVEL )
76
-
77
+
77
78
for i := 0 ; i < N_ACCOUNTS ; i ++ {
78
79
exec (conn1 , "insert into t values($1, $2)" , i , INIT_AMOUNT )
79
80
exec (conn2 , "insert into t values($1, $2)" , i , INIT_AMOUNT )
80
81
}
81
-
82
+
82
83
commit (conn1 , conn2 )
83
84
}
84
85
85
86
func max (a , b int64 ) int64 {
86
87
if a >= b {
87
88
return a
88
- }
89
+ }
89
90
return b
90
91
}
91
92
92
- func transfer (id int , wg * sync.WaitGroup ) {
93
+ func progress (total int , cCommits chan int , cAborts chan int ) {
94
+ commits := 0
95
+ aborts := 0
96
+ start := time .Now ()
97
+ for newcommits := range cCommits {
98
+ newaborts := <- cAborts
99
+ commits += newcommits
100
+ aborts += newaborts
101
+ if time .Since (start ).Seconds () > 1 {
102
+ fmt .Printf (
103
+ "progress %0.2f%%: %d commits, %d aborts\n " ,
104
+ float32 (commits ) * 100.0 / float32 (total ), commits , aborts ,
105
+ )
106
+ start = time .Now ()
107
+ }
108
+ }
109
+ }
110
+
111
+ func transfer (id int , cCommits chan int , cAborts chan int , wg * sync.WaitGroup ) {
93
112
var err error
94
113
var xid int32
95
- var nConflicts = 0
114
+ var nAborts = 0
115
+ var nCommits = 0
116
+ var myCommits = 0
96
117
97
118
conn1 , err := pgx .Connect (cfg1 )
98
119
checkErr (err )
@@ -102,10 +123,11 @@ func transfer(id int, wg *sync.WaitGroup) {
102
123
checkErr (err )
103
124
defer conn2 .Close ()
104
125
105
- for i := 0 ; i < N_ITERATIONS ; i ++ {
106
- //amount := 2*rand.Intn(2) - 1
107
- amount := 1
108
- account1 := rand .Intn (N_ACCOUNTS )
126
+ start := time .Now ()
127
+ for myCommits < N_ITERATIONS {
128
+ amount := 2 * rand .Intn (2000 ) - 1
129
+ //amount := 1
130
+ account1 := rand .Intn (N_ACCOUNTS )
109
131
account2 := rand .Intn (N_ACCOUNTS )
110
132
111
133
xid = execQuery (conn1 , "select dtm_begin_transaction(2)" )
@@ -114,25 +136,35 @@ func transfer(id int, wg *sync.WaitGroup) {
114
136
// start transaction
115
137
exec (conn1 , "begin transaction isolation level " + ISOLATION_LEVEL )
116
138
exec (conn2 , "begin transaction isolation level " + ISOLATION_LEVEL )
117
-
118
- ok1 := execUpdate (conn1 , "update t set v = v + $1 where u=$2" , amount , account1 )
119
- ok2 := execUpdate (conn2 , "update t set v = v - $1 where u=$2" , amount , account2 )
120
- if ! ok1 || ! ok2 {
139
+
140
+ ok1 := execUpdate (conn1 , "update t set v = v + $1 where u=$2" , amount , account1 )
141
+ ok2 := execUpdate (conn2 , "update t set v = v - $1 where u=$2" , amount , account2 )
142
+ if ! ok1 || ! ok2 {
121
143
exec (conn1 , "rollback" )
122
144
exec (conn2 , "rollback" )
123
- nConflicts += 1
124
- i -= 1
125
- } else {
145
+ nAborts += 1
146
+ } else {
126
147
commit (conn1 , conn2 )
127
- }
148
+ nCommits += 1
149
+ myCommits += 1
150
+ }
151
+
152
+ if time .Since (start ).Seconds () > 1 {
153
+ cCommits <- nCommits
154
+ cAborts <- nAborts
155
+ nCommits = 0
156
+ nAborts = 0
157
+ start = time .Now ()
158
+ }
128
159
}
129
- fmt .Println ("Test completed with " ,nConflicts ," conflicts" )
160
+ cCommits <- nCommits
161
+ cAborts <- nAborts
130
162
wg .Done ()
131
163
}
132
164
133
165
func inspect (wg * sync.WaitGroup ) {
134
166
var sum1 , sum2 , sum int64
135
- var prevSum int64 = 0
167
+ var prevSum int64 = 0
136
168
var xid int32
137
169
138
170
{
@@ -142,28 +174,33 @@ func inspect(wg *sync.WaitGroup) {
142
174
conn2 , err := pgx .Connect (cfg2 )
143
175
checkErr (err )
144
176
145
- for running {
177
+ for running {
178
+ xid = execQuery (conn1 , "select dtm_begin_transaction(2)" )
179
+ exec (conn2 , "select dtm_join_transaction($1)" , xid )
180
+
181
+ exec (conn1 , "begin transaction isolation level " + ISOLATION_LEVEL )
182
+ exec (conn2 , "begin transaction isolation level " + ISOLATION_LEVEL )
183
+
184
+ sum1 = execQuery64 (conn1 , "select sum(v) from t" )
185
+ sum2 = execQuery64 (conn2 , "select sum(v) from t" )
186
+
187
+ sum = sum1 + sum2
188
+ if (sum != prevSum ) {
189
+ xmin1 := execQuery (conn1 , "select dtm_get_current_snapshot_xmin()" )
190
+ xmax1 := execQuery (conn1 , "select dtm_get_current_snapshot_xmax()" )
191
+ xmin2 := execQuery (conn2 , "select dtm_get_current_snapshot_xmin()" )
192
+ xmax2 := execQuery (conn2 , "select dtm_get_current_snapshot_xmax()" )
193
+ fmt .Printf (
194
+ "Total=%d xid=%d snap1=[%d, %d) snap2=[%d, %d)\n " ,
195
+ sum , xid , xmin1 , xmax1 , xmin2 , xmax2 ,
196
+ )
197
+ prevSum = sum
198
+ }
146
199
147
-
148
- xid = execQuery (conn1 , "select dtm_begin_transaction(2)" )
149
- exec (conn2 , "select dtm_join_transaction($1)" , xid )
150
-
151
- exec (conn1 , "begin transaction isolation level " + ISOLATION_LEVEL )
152
- exec (conn2 , "begin transaction isolation level " + ISOLATION_LEVEL )
153
-
154
- sum1 = execQuery64 (conn1 , "select sum(v) from t" )
155
- sum2 = execQuery64 (conn2 , "select sum(v) from t" )
156
-
157
- sum = sum1 + sum2
158
- if (sum != prevSum ) {
159
- fmt .Println ("Total = " , sum , "xid=" , xid , "snap1={" , execQuery (conn1 , "select dtm_get_current_snapshot_xmin()" ), execQuery (conn1 , "select dtm_get_current_snapshot_xmax()" ), "}, snap2={" , execQuery (conn2 , "select dtm_get_current_snapshot_xmin()" ), execQuery (conn2 , "select dtm_get_current_snapshot_xmax()" ), "}" )
160
- prevSum = sum
161
- }
162
-
163
- commit (conn1 , conn2 )
164
- }
165
- conn1 .Close ()
166
- conn2 .Close ()
200
+ commit (conn1 , conn2 )
201
+ }
202
+ conn1 .Close ()
203
+ conn2 .Close ()
167
204
}
168
205
wg .Done ()
169
206
}
@@ -174,9 +211,13 @@ func main() {
174
211
175
212
prepare_db ()
176
213
214
+ cCommits := make (chan int )
215
+ cAborts := make (chan int )
216
+ go progress (TRANSFER_CONNECTIONS * N_ITERATIONS , cCommits , cAborts )
217
+
177
218
transferWg .Add (TRANSFER_CONNECTIONS )
178
219
for i := 0 ; i < TRANSFER_CONNECTIONS ; i ++ {
179
- go transfer (i , & transferWg )
220
+ go transfer (i , cCommits , cAborts , & transferWg )
180
221
}
181
222
running = true
182
223
inspectWg .Add (1 )
@@ -185,6 +226,8 @@ func main() {
185
226
transferWg .Wait ()
186
227
running = false
187
228
inspectWg .Wait ()
229
+
230
+ fmt .Printf ("done\n " )
188
231
}
189
232
190
233
func exec (conn * pgx.Conn , stmt string , arguments ... interface {}) {
@@ -216,10 +259,11 @@ func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
216
259
checkErr (err )
217
260
return result
218
261
}
262
+
219
263
func checkErr (err error ) {
220
264
if err != nil {
221
265
panic (err )
222
266
}
223
267
}
224
268
225
-
269
+ // vim: expandtab ts=4 sts=4 sw=4
0 commit comments