Skip to content

Commit 14bf526

Browse files
committed
transfers-pgshard as a perf backend
1 parent c459061 commit 14bf526

File tree

2 files changed

+126
-0
lines changed

2 files changed

+126
-0
lines changed

contrib/pg_dtm/tests/perf/perf.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ func main() {
137137
backend = new(TransfersFDW)
138138
case "readers":
139139
backend = new(Readers)
140+
case "pgshard":
141+
backend = new(TransfersPgShard)
140142
default:
141143
fmt.Println("No backend named: '%s'\n", cfg.Backend)
142144
return
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
_ "github.com/lib/pq"
7+
"database/sql"
8+
"strconv"
9+
"math/rand"
10+
)
11+
12+
type TransfersPgShard struct {}
13+
14+
func (t TransfersPgShard) prepare(connstrs []string) {
15+
var wg sync.WaitGroup
16+
wg.Add(len(connstrs)-1)
17+
for i, connstr := range connstrs[1:] {
18+
go t.prepare_slave(i, connstr, &wg)
19+
}
20+
wg.Wait()
21+
t.prepare_master()
22+
}
23+
24+
func (t TransfersPgShard) prepare_slave(id int, connstr string, wg *sync.WaitGroup) {
25+
conn, err := sql.Open("postgres", connstr)
26+
checkErr(err)
27+
defer conn.Close()
28+
29+
if cfg.UseDtm {
30+
_exec(conn, "drop extension if exists pg_dtm --")
31+
_exec(conn, "create extension pg_dtm")
32+
}
33+
34+
drop_sql := fmt.Sprintf("drop table if exists t_1000%d", id)
35+
_exec(conn, drop_sql)
36+
37+
conn.Close()
38+
wg.Done()
39+
}
40+
41+
func (t TransfersPgShard) prepare_master() {
42+
conn, err := sql.Open("postgres", cfg.ConnStrs[0])
43+
checkErr(err)
44+
45+
_exec(conn, "drop extension if exists pg_shard CASCADE")
46+
_exec(conn, "create extension pg_shard")
47+
_exec(conn, "drop table if exists t")
48+
_exec(conn, "create table t(u int primary key, v int)")
49+
_exec(conn, "select master_create_distributed_table(table_name := 't', partition_column := 'u')")
50+
51+
master_sql := fmt.Sprintf(
52+
"select master_create_worker_shards(table_name := 't', shard_count := %d, replication_factor := 1)",
53+
len(cfg.ConnStrs)-1)
54+
_exec(conn, master_sql)
55+
56+
fmt.Println("Database feed started")
57+
for i:=0; i<=cfg.AccountsNum; i++ {
58+
_exec(conn, "insert into t values(" + strconv.Itoa(i) + ", 0)")
59+
}
60+
fmt.Println("Database feed finished")
61+
62+
conn.Close()
63+
}
64+
65+
func (t TransfersPgShard) writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
66+
conn, err := sql.Open("postgres", cfg.ConnStrs[0])
67+
checkErr(err)
68+
69+
i:=0
70+
for i=0; i < cfg.IterNum; i++ {
71+
amount := 1
72+
account1 := rand.Intn(cfg.AccountsNum-1)+1
73+
account2 := rand.Intn(cfg.AccountsNum-1)+1
74+
75+
_exec(conn, "begin")
76+
_exec(conn, fmt.Sprintf("update t set v = v - %d where u=%d", amount, account1))
77+
_exec(conn, fmt.Sprintf("update t set v = v + %d where u=%d", amount, account2))
78+
_exec(conn, "commit")
79+
80+
if i%1000==0 {
81+
fmt.Printf("%d tx processed.\n", i)
82+
}
83+
}
84+
85+
cCommits <- i
86+
cAborts <- 0
87+
88+
conn.Close()
89+
wg.Done()
90+
}
91+
92+
func (t TransfersPgShard) reader(wg *sync.WaitGroup, cFetches chan int, inconsistency *bool) {
93+
var sum int64
94+
var prevSum int64 = 0
95+
96+
conn, err := sql.Open("postgres", cfg.ConnStrs[0])
97+
checkErr(err)
98+
99+
for running {
100+
sum = _execQuery(conn, "select sum(v) from t")
101+
if sum != prevSum {
102+
fmt.Println("Total = ", sum)
103+
*inconsistency = true
104+
prevSum = sum
105+
}
106+
}
107+
108+
conn.Close()
109+
wg.Done()
110+
}
111+
112+
func _exec(conn *sql.DB, stmt string) {
113+
var err error
114+
_, err = conn.Exec(stmt)
115+
checkErr(err)
116+
}
117+
118+
func _execQuery(conn *sql.DB, stmt string) int64 {
119+
var err error
120+
var result int64
121+
err = conn.QueryRow(stmt).Scan(&result)
122+
checkErr(err)
123+
return result
124+
}

0 commit comments

Comments
 (0)