Skip to content

Commit 21e563f

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents 7a19770 + 88a60f5 commit 21e563f

File tree

3 files changed

+282
-0
lines changed

3 files changed

+282
-0
lines changed

contrib/pg_dtm/docs/DTM.odp

962 KB
Binary file not shown.

contrib/pg_dtm/tests/benchmark.go

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"flag"
6+
"os"
7+
"sync"
8+
"math/rand"
9+
"time"
10+
"github.com/jackc/pgx"
11+
)
12+
13+
type ConnStrings []string
14+
15+
// The first method of flag.Value interface
16+
func (c *ConnStrings) String() string {
17+
if len(*c) > 0 {
18+
return (*c)[0]
19+
} else {
20+
return ""
21+
}
22+
}
23+
24+
// The second method of flag.Value interface
25+
func (c *ConnStrings) Set(value string) error {
26+
*c = append(*c, value)
27+
return nil
28+
}
29+
30+
var cfg struct {
31+
ConnStrs ConnStrings
32+
33+
Verbose bool
34+
UseDtm bool
35+
InitOnly bool
36+
SkipInit bool
37+
38+
Isolation string // "repeatable read" or "read committed"
39+
40+
Time int
41+
42+
Accounts struct {
43+
Num int
44+
Balance int
45+
}
46+
47+
Readers struct {
48+
Num int
49+
}
50+
51+
Writers struct {
52+
Num int
53+
}
54+
}
55+
func append_with_comma(s *string, x string) {
56+
if len(*s) > 0 {
57+
*s = *s + ", " + x
58+
} else {
59+
*s = x
60+
}
61+
}
62+
63+
func dump_cfg() {
64+
fmt.Printf("Connections: %d\n", len(cfg.ConnStrs))
65+
for _, cs := range cfg.ConnStrs {
66+
fmt.Printf(" %s\n", cs)
67+
}
68+
fmt.Printf("Isolation: %s\n", cfg.Isolation)
69+
fmt.Printf(
70+
"Accounts: %d × $%d\n",
71+
cfg.Accounts.Num, cfg.Accounts.Balance,
72+
)
73+
fmt.Printf("Readers: %d\n", cfg.Readers.Num)
74+
fmt.Printf("Writers: %d\n", cfg.Writers.Num)
75+
}
76+
77+
func init() {
78+
flag.Var(&cfg.ConnStrs, "d", "Connection string (repeat for multiple connections)")
79+
repread := flag.Bool("i", false, "Use 'repeatable read' isolation level instead of 'read committed'")
80+
flag.IntVar(&cfg.Accounts.Num, "a", 100000, "The number of bank accounts")
81+
flag.IntVar(&cfg.Accounts.Balance, "b", 0, "The initial balance of each bank account")
82+
flag.IntVar(&cfg.Readers.Num, "r", 4, "The number of readers")
83+
flag.IntVar(&cfg.Writers.Num, "w", 4, "The number of writers")
84+
flag.IntVar(&cfg.Time, "t", 10, "Time in seconds of running test")
85+
flag.BoolVar(&cfg.UseDtm, "m", false, "Use DTM to keep global consistency")
86+
flag.BoolVar(&cfg.InitOnly, "f", false, "Only feed databses with data")
87+
flag.BoolVar(&cfg.SkipInit, "s", false, "Skip init phase")
88+
flag.Parse()
89+
90+
if len(cfg.ConnStrs) == 0 {
91+
flag.PrintDefaults()
92+
os.Exit(1)
93+
}
94+
95+
if *repread {
96+
cfg.Isolation = "repeatable read"
97+
} else {
98+
cfg.Isolation = "read committed"
99+
}
100+
101+
dump_cfg()
102+
}
103+
104+
105+
func main() {
106+
start := time.Now()
107+
108+
if (!cfg.SkipInit){
109+
prepare(cfg.ConnStrs)
110+
fmt.Printf("database prepared in %0.2f seconds\n", time.Since(start).Seconds())
111+
}
112+
113+
if (cfg.InitOnly) {
114+
return
115+
}
116+
117+
var wg sync.WaitGroup
118+
119+
cUpdates := make(chan int)
120+
cFetches := make(chan int)
121+
122+
wg.Add(cfg.Writers.Num + cfg.Readers.Num)
123+
running = true
124+
125+
for i := 0; i < cfg.Writers.Num; i++ {
126+
go writer(&wg, cUpdates)
127+
}
128+
129+
for i := 0; i < cfg.Readers.Num; i++ {
130+
go reader(&wg, cFetches)
131+
}
132+
133+
time.Sleep(time.Duration(cfg.Time) * time.Second)
134+
running = false
135+
136+
totalUpdates := 0
137+
for i := 0; i < cfg.Writers.Num; i++ {
138+
totalUpdates += <- cUpdates
139+
}
140+
141+
totalFetches := 0
142+
for i := 0; i < cfg.Readers.Num; i++ {
143+
totalFetches += <- cFetches
144+
}
145+
146+
wg.Wait()
147+
fmt.Printf("Perform %d updates and %d fetches\n", totalUpdates, totalFetches)
148+
}
149+
150+
var running = false
151+
152+
func asyncCommit(conn *pgx.Conn, wg *sync.WaitGroup) {
153+
exec(conn, "commit")
154+
wg.Done()
155+
}
156+
157+
func commit(conns ...*pgx.Conn) {
158+
var wg sync.WaitGroup
159+
wg.Add(len(conns))
160+
for _, conn := range conns {
161+
go asyncCommit(conn, &wg)
162+
}
163+
wg.Wait()
164+
}
165+
166+
func prepare_one(connstr string, wg *sync.WaitGroup) {
167+
dbconf, err := pgx.ParseDSN(connstr)
168+
checkErr(err)
169+
170+
conn, err := pgx.Connect(dbconf)
171+
checkErr(err)
172+
173+
defer conn.Close()
174+
175+
if cfg.UseDtm {
176+
exec(conn, "drop extension if exists pg_dtm")
177+
exec(conn, "create extension pg_dtm")
178+
}
179+
exec(conn, "drop table if exists t")
180+
exec(conn, "create table t(u int primary key, v int)")
181+
exec(conn, "insert into t (select generate_series(0,$1-1), $2)", cfg.Accounts.Num, cfg.Accounts.Balance)
182+
exec(conn, "commit")
183+
wg.Done()
184+
}
185+
186+
func prepare(connstrs []string) {
187+
var wg sync.WaitGroup
188+
wg.Add(len(connstrs))
189+
for _, connstr := range connstrs {
190+
go prepare_one(connstr, &wg)
191+
}
192+
wg.Wait()
193+
}
194+
195+
func writer(wg *sync.WaitGroup, cUpdates chan int) {
196+
var updates = 0
197+
var conns []*pgx.Conn
198+
199+
for _, connstr := range cfg.ConnStrs {
200+
dbconf, err := pgx.ParseDSN(connstr)
201+
checkErr(err)
202+
203+
conn, err := pgx.Connect(dbconf)
204+
checkErr(err)
205+
206+
defer conn.Close()
207+
conns = append(conns, conn)
208+
}
209+
for running {
210+
acc := rand.Intn(cfg.Accounts.Num)
211+
xid := execQuery(conns[0], "select dtm_begin_transaction()")
212+
for i := 1; i < len(conns); i++ {
213+
exec(conns[i], "select dtm_join_transaction($1)", xid)
214+
}
215+
for _, conn := range conns {
216+
exec(conn, "begin transaction isolation level " + cfg.Isolation)
217+
exec(conn, "update t set v = v + 1 where u=$1", acc)
218+
}
219+
commit(conns...)
220+
updates++
221+
}
222+
cUpdates <- updates
223+
wg.Done()
224+
}
225+
226+
227+
func reader(wg *sync.WaitGroup, cFetches chan int) {
228+
var fetches = 0
229+
var conns []*pgx.Conn
230+
var sum int32 = 0
231+
232+
for _, connstr := range cfg.ConnStrs {
233+
dbconf, err := pgx.ParseDSN(connstr)
234+
checkErr(err)
235+
236+
conn, err := pgx.Connect(dbconf)
237+
checkErr(err)
238+
239+
defer conn.Close()
240+
conns = append(conns, conn)
241+
}
242+
for running {
243+
acc := rand.Intn(cfg.Accounts.Num)
244+
con := rand.Intn(len(conns))
245+
sum += execQuery(conns[con], "select v from t where u=$1", acc)
246+
fetches++
247+
}
248+
cFetches <- fetches
249+
wg.Done()
250+
}
251+
252+
func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
253+
var err error
254+
// fmt.Println(stmt)
255+
_, err = conn.Exec(stmt, arguments... )
256+
checkErr(err)
257+
}
258+
259+
func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int32 {
260+
var err error
261+
var result int32
262+
err = conn.QueryRow(stmt, arguments...).Scan(&result)
263+
checkErr(err)
264+
return result
265+
}
266+
267+
func checkErr(err error) {
268+
if err != nil {
269+
panic(err)
270+
}
271+
}
272+
273+
// vim: expandtab ts=4 sts=4 sw=4

contrib/pg_dtm/tests/benchmark.sh

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/bin/sh
2+
3+
go run benchmark.go \
4+
-d 'dbname=postgres port=5432' \
5+
-d 'dbname=postgres port=5433' \
6+
-m \
7+
-w 4 \
8+
-r 4 \
9+
-t 10

0 commit comments

Comments
 (0)