Skip to content

Commit bf4a71e

Browse files
committed
readers backend. (partial support)
1 parent df4606c commit bf4a71e

File tree

4 files changed

+119
-24
lines changed

4 files changed

+119
-24
lines changed

contrib/pg_dtm/tests/perf/perf.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type ConnStrings []string
1414
var backend interface{
1515
prepare(connstrs []string)
1616
writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup)
17-
reader(wg *sync.WaitGroup, inconsistency *bool)
17+
reader(wg *sync.WaitGroup, cFetches chan int, inconsistency *bool)
1818
}
1919

2020
var cfg struct {
@@ -27,10 +27,10 @@ var cfg struct {
2727
Isolation string
2828
AccountsNum int
2929
ReadersNum int
30+
IterNum int
3031

3132
Writers struct {
3233
Num int
33-
Updates int
3434
StartId int
3535
}
3636
}
@@ -72,7 +72,7 @@ func dump_cfg() {
7272

7373
fmt.Printf(
7474
"Writers: %d × %d updates\n",
75-
cfg.Writers.Num, cfg.Writers.Updates,
75+
cfg.Writers.Num, cfg.IterNum,
7676
)
7777
}
7878

@@ -89,8 +89,8 @@ func init() {
8989
"The number of bank accounts")
9090
flag.IntVar(&cfg.Writers.StartId, "s", 0,
9191
"StartID. Script will update rows starting from this value")
92-
flag.IntVar(&cfg.Writers.Updates, "u", 10000,
93-
"The number updates each writer performs")
92+
flag.IntVar(&cfg.IterNum, "n", 10000,
93+
"The number updates each writer (reader in case of Reades backend) performs")
9494
flag.IntVar(&cfg.ReadersNum, "r", 1,
9595
"The number of readers")
9696
flag.IntVar(&cfg.Writers.Num, "w", 8,
@@ -125,12 +125,18 @@ func init() {
125125
}
126126

127127
func main() {
128+
if len(cfg.ConnStrs) < 2 {
129+
fmt.Println("ERROR: This test needs at leas two connections")
130+
os.Exit(1)
131+
}
128132

129133
switch cfg.Backend {
130134
case "transfers":
131135
backend = new(Transfers)
132136
case "fdw":
133137
backend = new(TransfersFDW)
138+
case "readers":
139+
backend = new(Readers)
134140
default:
135141
fmt.Println("No backend named: '%s'\n", cfg.Backend)
136142
return
@@ -148,9 +154,10 @@ func main() {
148154
var readerWg sync.WaitGroup
149155

150156
cCommits := make(chan int)
157+
cFetches:= make(chan int)
151158
cAborts := make(chan int)
152159

153-
go progress(cfg.Writers.Num * cfg.Writers.Updates, cCommits, cAborts)
160+
go progress(cfg.Writers.Num * cfg.IterNum, cCommits, cAborts)
154161

155162
start = time.Now()
156163
writerWg.Add(cfg.Writers.Num)
@@ -162,17 +169,17 @@ func main() {
162169
inconsistency := false
163170
readerWg.Add(cfg.ReadersNum)
164171
for i := 0; i < cfg.ReadersNum; i++ {
165-
go backend.reader(&readerWg, &inconsistency)
172+
go backend.reader(&readerWg, cFetches, &inconsistency)
166173
}
167174

168175
writerWg.Wait()
176+
running = false
177+
readerWg.Wait()
178+
169179
fmt.Printf("writers finished in %0.2f seconds\n",
170180
time.Since(start).Seconds())
171181
fmt.Printf("TPS = %0.2f\n",
172-
float64(cfg.Writers.Num*cfg.Writers.Updates)/time.Since(start).Seconds())
173-
174-
running = false
175-
readerWg.Wait()
182+
float64(cfg.Writers.Num*cfg.IterNum)/time.Since(start).Seconds())
176183

177184
if inconsistency {
178185
fmt.Printf("INCONSISTENCY DETECTED\n")

contrib/pg_dtm/tests/perf/readers.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package main
2+
3+
import (
4+
"sync"
5+
"math/rand"
6+
"github.com/jackc/pgx"
7+
)
8+
9+
type Readers struct {}
10+
11+
func (t Readers) prepare(connstrs []string) {
12+
var wg sync.WaitGroup
13+
wg.Add(len(connstrs))
14+
for _, connstr := range connstrs {
15+
go t.prepare_one(connstr, &wg)
16+
}
17+
wg.Wait()
18+
}
19+
20+
func (t Readers) prepare_one(connstr string, wg *sync.WaitGroup) {
21+
dbconf, err := pgx.ParseDSN(connstr)
22+
checkErr(err)
23+
conn, err := pgx.Connect(dbconf)
24+
checkErr(err)
25+
defer conn.Close()
26+
27+
if cfg.UseDtm {
28+
exec(conn, "drop extension if exists pg_dtm")
29+
exec(conn, "create extension pg_dtm")
30+
}
31+
exec(conn, "drop table if exists t cascade")
32+
exec(conn, "create table t(u int primary key, v int)")
33+
exec(conn, "insert into t (select generate_series(0,$1-1), $2)", cfg.AccountsNum, 0)
34+
wg.Done()
35+
}
36+
37+
func (t Readers) writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
38+
var updates = 0
39+
var conns []*pgx.Conn
40+
41+
for _, connstr := range cfg.ConnStrs {
42+
dbconf, err := pgx.ParseDSN(connstr)
43+
checkErr(err)
44+
conn, err := pgx.Connect(dbconf)
45+
checkErr(err)
46+
defer conn.Close()
47+
conns = append(conns, conn)
48+
}
49+
for updates < cfg.IterNum {
50+
acc := rand.Intn(cfg.AccountsNum)
51+
52+
if cfg.UseDtm {
53+
xid := execQuery(conns[0], "select dtm_begin_transaction()")
54+
for i := 1; i < len(conns); i++ {
55+
exec(conns[i], "select dtm_join_transaction($1)", xid)
56+
}
57+
}
58+
for _, conn := range conns {
59+
exec(conn, "begin transaction isolation level " + cfg.Isolation)
60+
exec(conn, "update t set v = v + 1 where u=$1", acc)
61+
}
62+
commit(conns...)
63+
updates++
64+
}
65+
// cCommits <- updates
66+
wg.Done()
67+
}
68+
69+
func (t Readers) reader(wg *sync.WaitGroup, cFetches chan int, inconsistency *bool) {
70+
var fetches = 0
71+
var conns []*pgx.Conn
72+
var sum int32 = 0
73+
74+
for _, connstr := range cfg.ConnStrs {
75+
dbconf, err := pgx.ParseDSN(connstr)
76+
checkErr(err)
77+
conn, err := pgx.Connect(dbconf)
78+
checkErr(err)
79+
defer conn.Close()
80+
conns = append(conns, conn)
81+
}
82+
for running {
83+
acc := rand.Intn(cfg.AccountsNum)
84+
con := rand.Intn(len(conns))
85+
sum += execQuery(conns[con], "select v from t where u=$1", acc)
86+
fetches++
87+
}
88+
// cFetches <- fetches
89+
wg.Done()
90+
}
91+
92+
// vim: expandtab ts=4 sts=4 sw=4

contrib/pg_dtm/tests/perf/transfers-fdw.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,6 @@ func (t TransfersFDW) prepare(connstrs []string) {
1414
var wg sync.WaitGroup
1515
wg.Add(len(connstrs))
1616

17-
if len(connstrs) < 2 {
18-
fmt.Println("ERROR: FDW test needs at leas two connections")
19-
os.Exit(1)
20-
}
21-
if len(connstrs) < 2 {
22-
fmt.Println("ERROR: FDW test need explicit usernames in connection strings")
23-
os.Exit(2)
24-
}
25-
2617
for i, connstr := range connstrs {
2718
go t.prepare_slave(i, connstr, &wg)
2819
}
@@ -37,6 +28,11 @@ func (t TransfersFDW) prepare_slave(id int, connstr string, wg *sync.WaitGroup)
3728
checkErr(err)
3829
defer conn.Close()
3930

31+
if len(dbconf.User) == 0 {
32+
fmt.Println("ERROR: FDW test need explicit usernames in connection strings")
33+
os.Exit(2)
34+
}
35+
4036
if cfg.UseDtm {
4137
exec(conn, "drop extension if exists pg_dtm")
4238
exec(conn, "create extension pg_dtm")
@@ -94,7 +90,7 @@ func (t TransfersFDW) writer(id int, cCommits chan int, cAborts chan int, wg *sy
9490
defer conn.Close()
9591

9692
start := time.Now()
97-
for myCommits < cfg.Writers.Updates {
93+
for myCommits < cfg.IterNum {
9894
amount := 1
9995
from_acc := cfg.Writers.StartId + 2*id + 1
10096
to_acc := cfg.Writers.StartId + 2*id + 2
@@ -128,7 +124,7 @@ func (t TransfersFDW) writer(id int, cCommits chan int, cAborts chan int, wg *sy
128124
wg.Done()
129125
}
130126

131-
func (t TransfersFDW) reader(wg *sync.WaitGroup, inconsistency *bool) {
127+
func (t TransfersFDW) reader(wg *sync.WaitGroup, cFetches chan int, inconsistency *bool) {
132128
var sum int64
133129
var prevSum int64 = 0
134130

contrib/pg_dtm/tests/perf/transfers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (t Transfers) writer(id int, cCommits chan int, cAborts chan int, wg *sync.
6363
}
6464

6565
start := time.Now()
66-
for myCommits < cfg.Writers.Updates {
66+
for myCommits < cfg.IterNum {
6767
amount := 1
6868

6969
from_acc := cfg.Writers.StartId + 2*id + 1
@@ -119,7 +119,7 @@ func (t Transfers) writer(id int, cCommits chan int, cAborts chan int, wg *sync.
119119
wg.Done()
120120
}
121121

122-
func (t Transfers) reader(wg *sync.WaitGroup, inconsistency *bool) {
122+
func (t Transfers) reader(wg *sync.WaitGroup, cFetches chan int, inconsistency *bool) {
123123
var prevSum int64 = 0
124124

125125
var conns []*pgx.Conn

0 commit comments

Comments
 (0)