Skip to content

Commit 1e3909c

Browse files
committed
new pg_shard driller
1 parent ae9398a commit 1e3909c

File tree

1 file changed

+78
-48
lines changed

1 file changed

+78
-48
lines changed

contrib/pg_xtm/tests/pg_shard_transfers.go

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,72 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
// "github.com/jackc/pgx"
7-
"pgx"
6+
_ "github.com/jgallagher/go-libpq"
7+
"database/sql"
8+
"strconv"
89
)
910

1011
const (
11-
TRANSFER_CONNECTIONS = 1
12+
TRANSFER_CONNECTIONS = 8
1213
INIT_AMOUNT = 10000
13-
N_ITERATIONS = 5000
14-
N_ACCOUNTS = TRANSFER_CONNECTIONS
14+
N_ITERATIONS = 10000
15+
N_ACCOUNTS = 100 //2*TRANSFER_CONNECTIONS
1516
)
1617

17-
var cfg = pgx.ConnConfig{
18-
Host: "127.0.0.1",
19-
Port: 5433,
20-
// Database: "postgres",
21-
}
18+
var cfg = "host=127.0.0.1 port=5432 sslmode=disable"
19+
var cfg1 = "host=127.0.0.1 port=5433 sslmode=disable"
20+
var cfg2 = "host=127.0.0.1 port=5434 sslmode=disable"
2221

2322
var running = false
2423

25-
func transfer(id int, wg *sync.WaitGroup) {
26-
var err error
27-
var conn *pgx.Conn
24+
func prepare_db() {
25+
conn1, err := sql.Open("libpq", cfg1)
26+
checkErr(err)
27+
exec(conn1, "drop table if exists t_10000")
28+
conn1.Close()
29+
30+
conn2, err := sql.Open("libpq", cfg2)
31+
checkErr(err)
32+
exec(conn2, "drop table if exists t_10001")
33+
conn2.Close()
34+
2835

29-
conn, err = pgx.Connect(cfg)
36+
conn, err := sql.Open("libpq", cfg)
37+
checkErr(err)
38+
39+
exec(conn, "drop extension if exists pg_shard CASCADE")
40+
exec(conn, "create extension pg_shard")
41+
exec(conn, "drop table if exists t")
42+
exec(conn, "create table t(u int, v int)")
43+
exec(conn, "select master_create_distributed_table(table_name := 't', partition_column := 'u')")
44+
exec(conn, "select master_create_worker_shards(table_name := 't', shard_count := 2, replication_factor := 1)")
45+
46+
for i:=1; i<=N_ACCOUNTS; i++ {
47+
exec(conn, "insert into t values(" + strconv.Itoa(i) + ",10000)")
48+
}
49+
50+
conn.Close()
51+
}
52+
53+
func transfer(id int, wg *sync.WaitGroup) {
54+
conn, err := sql.Open("libpq", cfg)
3055
checkErr(err)
3156
defer conn.Close()
3257

58+
uids1 := []int{1,3,4, 5, 7, 8,10,14}
59+
uids2 := []int{2,6,9,11,12,13,18,21}
60+
3361
for i:=0; i < N_ITERATIONS; i++ {
3462
exec(conn, "begin")
35-
exec(conn, "update t_10000 set v = v + 1 where u=3")
36-
exec(conn, "update t_10000 set v = v - 1 where u=4")
63+
exec(conn, "update t set v = v + 1 where u="+strconv.Itoa(uids1[id]))
64+
exec(conn, "update t set v = v - 1 where u="+strconv.Itoa(uids2[id]))
65+
// exec(conn, "update t set v = v + 1 where u=1")
66+
// exec(conn, "update t set v = v - 1 where u=2")
3767
exec(conn, "commit")
68+
69+
if i%1000==0 {
70+
fmt.Printf("%u tx processed.\n", i)
71+
}
3872
}
3973

4074
wg.Done()
@@ -44,14 +78,14 @@ func inspect(wg *sync.WaitGroup) {
4478
var sum int64
4579
var prevSum int64 = 0
4680

47-
conn, err := pgx.Connect(cfg)
81+
conn, err := sql.Open("libpq", cfg)
4882
checkErr(err)
4983

5084
for running {
51-
sum = execQuery(conn, "select sum(v) from t_10000")
85+
sum = execQuery(conn, "select sum(v) from t")
5286
if sum != prevSum {
53-
fmt.Println("Total = ", sum);
54-
prevSum = sum
87+
fmt.Println("Total = ", sum);
88+
prevSum = sum
5589
}
5690
}
5791

@@ -60,52 +94,47 @@ func inspect(wg *sync.WaitGroup) {
6094
}
6195

6296
func main() {
63-
// var transferWg sync.WaitGroup
64-
// var inspectWg sync.WaitGroup
65-
var err error
66-
var conn *pgx.Conn
67-
var s int64
97+
var transferWg sync.WaitGroup
98+
var inspectWg sync.WaitGroup
6899

69-
conn, err = pgx.Connect(cfg)
70-
checkErr(err)
71-
defer conn.Close()
72-
73-
// err = conn.QueryRow("select sum(v) from t_10000").Scan(&s)
74-
// checkErr(err)
100+
prepare_db()
75101

76-
s = execQuery(conn, "select sum(v) from t_10000")
77-
fmt.Println(s)
102+
transferWg.Add(TRANSFER_CONNECTIONS)
103+
for i:=0; i<TRANSFER_CONNECTIONS; i++ {
104+
go transfer(i, &transferWg)
105+
}
78106

107+
running = true
108+
inspectWg.Add(1)
109+
go inspect(&inspectWg)
79110

80-
// transferWg.Add(TRANSFER_CONNECTIONS)
81-
// for i:=0; i<TRANSFER_CONNECTIONS; i++ {
82-
// go transfer(i, &transferWg)
83-
// }
111+
transferWg.Wait()
112+
running = false
113+
114+
inspectWg.Wait()
84115

85-
// running = true
86-
// inspectWg.Add(1)
87-
// go inspect(&inspectWg)
116+
// conn, err := sql.Open("libpq", cfg)
117+
// checkErr(err)
88118

89-
// transferWg.Wait()
119+
// exec(conn, "begin")
120+
// sum := execQuery(conn, "select sum(v) from t")
121+
// exec(conn, "commit")
90122

91-
// running = false
92-
// inspectWg.Wait()
123+
// fmt.Println(sum)
93124

94125
fmt.Printf("done\n")
95126
}
96127

97-
func exec(conn *pgx.Conn, stmt string) {
128+
func exec(conn *sql.DB, stmt string) {
98129
var err error
99130
_, err = conn.Exec(stmt)
100131
checkErr(err)
101132
}
102133

103-
func execQuery(conn *pgx.Conn, stmt string) int64 {
134+
func execQuery(conn *sql.DB, stmt string) int64 {
104135
var err error
105136
var result int64
106-
// result, err = conn.SimpleQuery(stmt)
107-
// err = conn.QueryRow(stmt).Scan(&result)
108-
err = conn.SimpleQuery(stmt).Scan(&result)
137+
err = conn.QueryRow(stmt).Scan(&result)
109138
checkErr(err)
110139
return result
111140
}
@@ -116,3 +145,4 @@ func checkErr(err error) {
116145
}
117146
}
118147

148+

0 commit comments

Comments
 (0)