Skip to content

Commit 2d626c2

Browse files
committed
2 parents 9226662 + eeeb868 commit 2d626c2

File tree

2 files changed

+93
-96
lines changed

2 files changed

+93
-96
lines changed

contrib/pg_xtm/tests/pg_shard_transfers.go

Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,72 @@ package main
33
import (
44
"fmt"
55
"sync"
6-
// "github.com/jackc/pgx"
7-
"pgx"
6+
_ "github.com/jgallagher/go-libpq"
7+
"database/sql"
8+
"strconv"
89
)
910

1011
const (
11-
TRANSFER_CONNECTIONS = 1
12+
TRANSFER_CONNECTIONS = 8
1213
INIT_AMOUNT = 10000
13-
N_ITERATIONS = 5000
14-
N_ACCOUNTS = TRANSFER_CONNECTIONS
14+
N_ITERATIONS = 10000
15+
N_ACCOUNTS = 100 //2*TRANSFER_CONNECTIONS
1516
)
1617

17-
var cfg = pgx.ConnConfig{
18-
Host: "127.0.0.1",
19-
Port: 5433,
20-
// Database: "postgres",
21-
}
18+
var cfg = "host=127.0.0.1 port=5432 sslmode=disable"
19+
var cfg1 = "host=127.0.0.1 port=5433 sslmode=disable"
20+
var cfg2 = "host=127.0.0.1 port=5434 sslmode=disable"
2221

2322
var running = false
2423

25-
func transfer(id int, wg *sync.WaitGroup) {
26-
var err error
27-
var conn *pgx.Conn
24+
func prepare_db() {
25+
conn1, err := sql.Open("libpq", cfg1)
26+
checkErr(err)
27+
exec(conn1, "drop table if exists t_10000")
28+
conn1.Close()
29+
30+
conn2, err := sql.Open("libpq", cfg2)
31+
checkErr(err)
32+
exec(conn2, "drop table if exists t_10001")
33+
conn2.Close()
34+
2835

29-
conn, err = pgx.Connect(cfg)
36+
conn, err := sql.Open("libpq", cfg)
37+
checkErr(err)
38+
39+
exec(conn, "drop extension if exists pg_shard CASCADE")
40+
exec(conn, "create extension pg_shard")
41+
exec(conn, "drop table if exists t")
42+
exec(conn, "create table t(u int, v int)")
43+
exec(conn, "select master_create_distributed_table(table_name := 't', partition_column := 'u')")
44+
exec(conn, "select master_create_worker_shards(table_name := 't', shard_count := 2, replication_factor := 1)")
45+
46+
for i:=1; i<=N_ACCOUNTS; i++ {
47+
exec(conn, "insert into t values(" + strconv.Itoa(i) + ",10000)")
48+
}
49+
50+
conn.Close()
51+
}
52+
53+
func transfer(id int, wg *sync.WaitGroup) {
54+
conn, err := sql.Open("libpq", cfg)
3055
checkErr(err)
3156
defer conn.Close()
3257

58+
uids1 := []int{1,3,4, 5, 7, 8,10,14}
59+
uids2 := []int{2,6,9,11,12,13,18,21}
60+
3361
for i:=0; i < N_ITERATIONS; i++ {
3462
exec(conn, "begin")
35-
exec(conn, "update t_10000 set v = v + 1 where u=3")
36-
exec(conn, "update t_10000 set v = v - 1 where u=4")
63+
exec(conn, "update t set v = v + 1 where u="+strconv.Itoa(uids1[id]))
64+
exec(conn, "update t set v = v - 1 where u="+strconv.Itoa(uids2[id]))
65+
// exec(conn, "update t set v = v + 1 where u=1")
66+
// exec(conn, "update t set v = v - 1 where u=2")
3767
exec(conn, "commit")
68+
69+
if i%1000==0 {
70+
fmt.Printf("%u tx processed.\n", i)
71+
}
3872
}
3973

4074
wg.Done()
@@ -44,14 +78,14 @@ func inspect(wg *sync.WaitGroup) {
4478
var sum int64
4579
var prevSum int64 = 0
4680

47-
conn, err := pgx.Connect(cfg)
81+
conn, err := sql.Open("libpq", cfg)
4882
checkErr(err)
4983

5084
for running {
51-
sum = execQuery(conn, "select sum(v) from t_10000")
85+
sum = execQuery(conn, "select sum(v) from t")
5286
if sum != prevSum {
53-
fmt.Println("Total = ", sum);
54-
prevSum = sum
87+
fmt.Println("Total = ", sum);
88+
prevSum = sum
5589
}
5690
}
5791

@@ -60,52 +94,47 @@ func inspect(wg *sync.WaitGroup) {
6094
}
6195

6296
func main() {
63-
// var transferWg sync.WaitGroup
64-
// var inspectWg sync.WaitGroup
65-
var err error
66-
var conn *pgx.Conn
67-
var s int64
97+
var transferWg sync.WaitGroup
98+
var inspectWg sync.WaitGroup
6899

69-
conn, err = pgx.Connect(cfg)
70-
checkErr(err)
71-
defer conn.Close()
72-
73-
// err = conn.QueryRow("select sum(v) from t_10000").Scan(&s)
74-
// checkErr(err)
100+
prepare_db()
75101

76-
s = execQuery(conn, "select sum(v) from t_10000")
77-
fmt.Println(s)
102+
transferWg.Add(TRANSFER_CONNECTIONS)
103+
for i:=0; i<TRANSFER_CONNECTIONS; i++ {
104+
go transfer(i, &transferWg)
105+
}
78106

107+
running = true
108+
inspectWg.Add(1)
109+
go inspect(&inspectWg)
79110

80-
// transferWg.Add(TRANSFER_CONNECTIONS)
81-
// for i:=0; i<TRANSFER_CONNECTIONS; i++ {
82-
// go transfer(i, &transferWg)
83-
// }
111+
transferWg.Wait()
112+
running = false
113+
114+
inspectWg.Wait()
84115

85-
// running = true
86-
// inspectWg.Add(1)
87-
// go inspect(&inspectWg)
116+
// conn, err := sql.Open("libpq", cfg)
117+
// checkErr(err)
88118

89-
// transferWg.Wait()
119+
// exec(conn, "begin")
120+
// sum := execQuery(conn, "select sum(v) from t")
121+
// exec(conn, "commit")
90122

91-
// running = false
92-
// inspectWg.Wait()
123+
// fmt.Println(sum)
93124

94125
fmt.Printf("done\n")
95126
}
96127

97-
func exec(conn *pgx.Conn, stmt string) {
128+
func exec(conn *sql.DB, stmt string) {
98129
var err error
99130
_, err = conn.Exec(stmt)
100131
checkErr(err)
101132
}
102133

103-
func execQuery(conn *pgx.Conn, stmt string) int64 {
134+
func execQuery(conn *sql.DB, stmt string) int64 {
104135
var err error
105136
var result int64
106-
// result, err = conn.SimpleQuery(stmt)
107-
// err = conn.QueryRow(stmt).Scan(&result)
108-
err = conn.SimpleQuery(stmt).Scan(&result)
137+
err = conn.QueryRow(stmt).Scan(&result)
109138
checkErr(err)
110139
return result
111140
}
@@ -116,3 +145,4 @@ func checkErr(err error) {
116145
}
117146
}
118147

148+

install_pg_shard_xtm.sh

Lines changed: 15 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/sh
22

3-
PG_SHARD_DIR=~/code/pg_shard
3+
PG_SHARD_DIR=~/code/pg_shard_master
44
PG_DIR=~/code/postgresql
55
PG_XTM_DIR=$PG_DIR/contrib/pg_xtm
66

@@ -42,24 +42,23 @@ cd $PG_DIR
4242
./install/bin/initdb -D ./install/data2
4343
./install/bin/initdb -D ./install/data3
4444

45-
46-
sed -i '' 's/#port =.*/port = 5433/' ./install/data2/postgresql.conf
47-
sed -i '' 's/#port =.*/port = 5434/' ./install/data3/postgresql.conf
48-
4945
sed -i '' "s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm,pg_shard'/" ./install/data1/postgresql.conf
50-
sed -i '' "s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm,pg_shard'/" ./install/data2/postgresql.conf
51-
sed -i '' "s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm,pg_shard'/" ./install/data3/postgresql.conf
46+
sed -i '' "s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm'/" ./install/data2/postgresql.conf
47+
sed -i '' "s/shared_preload_libraries =.*/shared_preload_libraries = 'pg_dtm'/" ./install/data3/postgresql.conf
5248

53-
sed -i '' 's/#fsync =.*/fsync = off/' ./install/data1/postgresql.conf
54-
sed -i '' 's/#fsync =.*/fsync = off/' ./install/data2/postgresql.conf
55-
sed -i '' 's/#fsync =.*/fsync = off/' ./install/data3/postgresql.conf
49+
echo "port = 5433" >> ./install/data2/postgresql.conf
50+
echo "port = 5434" >> ./install/data3/postgresql.conf
5651

52+
echo 'fsync = off' >> ./install/data1/postgresql.conf
53+
echo 'fsync = off' >> ./install/data2/postgresql.conf
54+
echo 'fsync = off' >> ./install/data3/postgresql.conf
5755

5856
echo 'pg_shard.use_dtm_transactions=1' >> ./install/data1/postgresql.conf
59-
echo 'pg_shard.use_dtm_transactions=1' >> ./install/data2/postgresql.conf
60-
echo 'pg_shard.use_dtm_transactions=1' >> ./install/data3/postgresql.conf
61-
57+
echo 'pg_shard.all_modifications_commutative=1' >> ./install/data1/postgresql.conf
6258

59+
echo "log_statement = 'all'" >> ./install/data1/postgresql.conf
60+
echo "log_statement = 'all'" >> ./install/data2/postgresql.conf
61+
echo "log_statement = 'all'" >> ./install/data3/postgresql.conf
6362

6463
./install/bin/pg_ctl -D ./install/data1 -l ./install/data1/log start
6564
./install/bin/pg_ctl -D ./install/data2 -l ./install/data2/log start
@@ -85,46 +84,14 @@ echo "127.0.0.1 5434" >> ./install/data3/pg_worker_list.conf
8584
./install/bin/createdb `whoami` -p5434
8685

8786

88-
./install/bin/psql -p 5433 << SQL
87+
./install/bin/psql -p 5432 << SQL
8988
CREATE EXTENSION pg_dtm;
9089
SQL
9190

92-
./install/bin/psql -p 5434 << SQL
91+
./install/bin/psql -p 5433 << SQL
9392
CREATE EXTENSION pg_dtm;
9493
SQL
9594

96-
./install/bin/psql << SQL
97-
95+
./install/bin/psql -p 5434 << SQL
9896
CREATE EXTENSION pg_dtm;
99-
CREATE EXTENSION pg_shard;
100-
CREATE TABLE t(u int, v int);
101-
SELECT master_create_distributed_table(table_name := 't', partition_column := 'u');
102-
SELECT master_create_worker_shards(table_name := 't', shard_count := 8, replication_factor := 1);
103-
insert into t values(1,10000);
104-
insert into t values(2,10000);
105-
insert into t values(3,10000);
106-
insert into t values(4,10000);
107-
insert into t values(5,10000);
108-
insert into t values(6,10000);
109-
insert into t values(7,10000);
110-
insert into t values(8,10000);
111-
11297
SQL
113-
114-
115-
116-
117-
# insert into t (select generate_series(0,10), random()::integer);
118-
119-
# cd contrib/pg_xtm/dtmd
120-
# make clean
121-
# make
122-
# rm -rf /tmp/clog/*
123-
# ./bin/dtmd
124-
125-
126-
127-
128-
129-
130-

0 commit comments

Comments
 (0)