Skip to content

Commit 7a43e43

Browse files
committed
Add progress indication to the transfers test.
1 parent 2b560c2 commit 7a43e43

File tree

2 files changed

+90
-45
lines changed

2 files changed

+90
-45
lines changed

contrib/pg_xtm/tests/daemons.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ func postgres(bin string, datadir string, port int, nodeid int, wg *sync.WaitGro
9191
"-D", datadir,
9292
"-p", strconv.Itoa(port),
9393
"-c", "dtm.node_id=" + strconv.Itoa(nodeid),
94+
"-c", "autovacuum=off",
95+
"-c", "fsync=off",
96+
"-c", "synchronous_commit=off",
9497
}
9598
name := "postgres " + datadir
9699
c := make(chan string)

contrib/pg_xtm/tests/transfers.go

Lines changed: 87 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"sync"
66
"math/rand"
7+
"time"
78
"github.com/jackc/pgx"
89
)
910

@@ -34,10 +35,10 @@ var running = false
3435
var nodes []int32 = []int32{0,1}
3536

3637
func asyncCommit(conn *pgx.Conn, wg *sync.WaitGroup) {
37-
exec(conn, "commit")
38+
exec(conn, "commit")
3839
wg.Done()
3940
}
40-
41+
4142
func commit(conn1, conn2 *pgx.Conn) {
4243
var wg sync.WaitGroup
4344
wg.Add(2)
@@ -66,33 +67,52 @@ func prepare_db() {
6667
exec(conn2, "create extension pg_dtm")
6768
exec(conn2, "drop table if exists t")
6869
exec(conn2, "create table t(u int primary key, v int)")
69-
70+
7071
// xid = execQuery(conn1, "select dtm_begin_transaction(2)")
7172
// exec(conn2, "select dtm_join_transaction($1)", xid)
7273

7374
// strt transaction
7475
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
7576
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
76-
77+
7778
for i := 0; i < N_ACCOUNTS; i++ {
7879
exec(conn1, "insert into t values($1, $2)", i, INIT_AMOUNT)
7980
exec(conn2, "insert into t values($1, $2)", i, INIT_AMOUNT)
8081
}
81-
82+
8283
commit(conn1, conn2)
8384
}
8485

8586
func max(a, b int64) int64 {
8687
if a >= b {
8788
return a
88-
}
89+
}
8990
return b
9091
}
9192

92-
func transfer(id int, wg *sync.WaitGroup) {
93+
func progress(total int, cCommits chan int, cAborts chan int) {
94+
commits := 0
95+
aborts := 0
96+
start := time.Now()
97+
for newcommits := range cCommits {
98+
newaborts := <-cAborts
99+
commits += newcommits
100+
aborts += newaborts
101+
if time.Since(start).Seconds() > 1 {
102+
fmt.Printf(
103+
"progress %0.2f%%: %d commits, %d aborts\n",
104+
float32(commits) * 100.0 / float32(total), commits, aborts,
105+
)
106+
start = time.Now()
107+
}
108+
}
109+
}
110+
111+
func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
93112
var err error
94113
var xid int32
95-
var nConflicts = 0
114+
var nAborts = 0
115+
var nCommits = 0
96116

97117
conn1, err := pgx.Connect(cfg1)
98118
checkErr(err)
@@ -102,10 +122,11 @@ func transfer(id int, wg *sync.WaitGroup) {
102122
checkErr(err)
103123
defer conn2.Close()
104124

105-
for i := 0; i < N_ITERATIONS; i++ {
106-
//amount := 2*rand.Intn(2) - 1
107-
amount := 1
108-
account1 := rand.Intn(N_ACCOUNTS)
125+
start := time.Now()
126+
for nCommits < N_ITERATIONS {
127+
amount := 2*rand.Intn(2000) - 1
128+
//amount := 1
129+
account1 := rand.Intn(N_ACCOUNTS)
109130
account2 := rand.Intn(N_ACCOUNTS)
110131

111132
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
@@ -114,25 +135,34 @@ func transfer(id int, wg *sync.WaitGroup) {
114135
// start transaction
115136
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
116137
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
117-
118-
ok1 := execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1)
119-
ok2 := execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2)
120-
if !ok1 || !ok2 {
138+
139+
ok1 := execUpdate(conn1, "update t set v = v + $1 where u=$2", amount, account1)
140+
ok2 := execUpdate(conn2, "update t set v = v - $1 where u=$2", amount, account2)
141+
if !ok1 || !ok2 {
121142
exec(conn1, "rollback")
122143
exec(conn2, "rollback")
123-
nConflicts += 1
124-
i -= 1
125-
} else {
144+
nAborts += 1
145+
} else {
126146
commit(conn1, conn2)
127-
}
147+
nCommits += 1
148+
}
149+
150+
if time.Since(start).Seconds() > 1 {
151+
cCommits <- nCommits
152+
cAborts <- nAborts
153+
nCommits = 0
154+
nAborts = 0
155+
start = time.Now()
156+
}
128157
}
129-
fmt.Println("Test completed with ",nConflicts," conflicts")
158+
cCommits <- nCommits
159+
cAborts <- nAborts
130160
wg.Done()
131161
}
132162

133163
func inspect(wg *sync.WaitGroup) {
134164
var sum1, sum2, sum int64
135-
var prevSum int64 = 0
165+
var prevSum int64 = 0
136166
var xid int32
137167

138168
{
@@ -142,28 +172,33 @@ func inspect(wg *sync.WaitGroup) {
142172
conn2, err := pgx.Connect(cfg2)
143173
checkErr(err)
144174

145-
for running {
175+
for running {
176+
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
177+
exec(conn2, "select dtm_join_transaction($1)", xid)
178+
179+
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
180+
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
181+
182+
sum1 = execQuery64(conn1, "select sum(v) from t")
183+
sum2 = execQuery64(conn2, "select sum(v) from t")
184+
185+
sum = sum1 + sum2
186+
if (sum != prevSum) {
187+
xmin1 := execQuery(conn1, "select dtm_get_current_snapshot_xmin()")
188+
xmax1 := execQuery(conn1, "select dtm_get_current_snapshot_xmax()")
189+
xmin2 := execQuery(conn2, "select dtm_get_current_snapshot_xmin()")
190+
xmax2 := execQuery(conn2, "select dtm_get_current_snapshot_xmax()")
191+
fmt.Printf(
192+
"Total=%d xid=%d snap1=[%d, %d) snap2=[%d, %d)\n",
193+
sum, xid, xmin1, xmax1, xmin2, xmax2,
194+
)
195+
prevSum = sum
196+
}
146197

147-
148-
xid = execQuery(conn1, "select dtm_begin_transaction(2)")
149-
exec(conn2, "select dtm_join_transaction($1)", xid)
150-
151-
exec(conn1, "begin transaction isolation level " + ISOLATION_LEVEL)
152-
exec(conn2, "begin transaction isolation level " + ISOLATION_LEVEL)
153-
154-
sum1 = execQuery64(conn1, "select sum(v) from t")
155-
sum2 = execQuery64(conn2, "select sum(v) from t")
156-
157-
sum = sum1 + sum2
158-
if (sum != prevSum) {
159-
fmt.Println("Total = ", sum, "xid=", xid, "snap1={", execQuery(conn1, "select dtm_get_current_snapshot_xmin()"), execQuery(conn1, "select dtm_get_current_snapshot_xmax()"), "}, snap2={", execQuery(conn2, "select dtm_get_current_snapshot_xmin()"), execQuery(conn2, "select dtm_get_current_snapshot_xmax()"), "}")
160-
prevSum = sum
161-
}
162-
163-
commit(conn1, conn2)
164-
}
165-
conn1.Close()
166-
conn2.Close()
198+
commit(conn1, conn2)
199+
}
200+
conn1.Close()
201+
conn2.Close()
167202
}
168203
wg.Done()
169204
}
@@ -174,9 +209,13 @@ func main() {
174209

175210
prepare_db()
176211

212+
cCommits := make(chan int)
213+
cAborts := make(chan int)
214+
go progress(TRANSFER_CONNECTIONS * N_ITERATIONS, cCommits, cAborts)
215+
177216
transferWg.Add(TRANSFER_CONNECTIONS)
178217
for i:=0; i<TRANSFER_CONNECTIONS; i++ {
179-
go transfer(i, &transferWg)
218+
go transfer(i, cCommits, cAborts, &transferWg)
180219
}
181220
running = true
182221
inspectWg.Add(1)
@@ -185,6 +224,8 @@ func main() {
185224
transferWg.Wait()
186225
running = false
187226
inspectWg.Wait()
227+
228+
fmt.Printf("done\n")
188229
}
189230

190231
func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
@@ -216,10 +257,11 @@ func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
216257
checkErr(err)
217258
return result
218259
}
260+
219261
func checkErr(err error) {
220262
if err != nil {
221263
panic(err)
222264
}
223265
}
224266

225-
267+
// vim: expandtab ts=4 sts=4 sw=4

0 commit comments

Comments
 (0)