@@ -3,38 +3,72 @@ package main
3
3
import (
4
4
"fmt"
5
5
"sync"
6
- // "github.com/jackc/pgx"
7
- "pgx"
6
+ _ "github.com/jgallagher/go-libpq"
7
+ "database/sql"
8
+ "strconv"
8
9
)
9
10
10
11
const (
11
- TRANSFER_CONNECTIONS = 1
12
+ TRANSFER_CONNECTIONS = 8
12
13
INIT_AMOUNT = 10000
13
- N_ITERATIONS = 5000
14
- N_ACCOUNTS = TRANSFER_CONNECTIONS
14
+ N_ITERATIONS = 10000
15
+ N_ACCOUNTS = 100 //2* TRANSFER_CONNECTIONS
15
16
)
16
17
17
- var cfg = pgx.ConnConfig {
18
- Host : "127.0.0.1" ,
19
- Port : 5433 ,
20
- // Database: "postgres",
21
- }
18
+ var cfg = "host=127.0.0.1 port=5432 sslmode=disable"
19
+ var cfg1 = "host=127.0.0.1 port=5433 sslmode=disable"
20
+ var cfg2 = "host=127.0.0.1 port=5434 sslmode=disable"
22
21
23
22
var running = false
24
23
25
- func transfer (id int , wg * sync.WaitGroup ) {
26
- var err error
27
- var conn * pgx.Conn
24
+ func prepare_db () {
25
+ conn1 , err := sql .Open ("libpq" , cfg1 )
26
+ checkErr (err )
27
+ exec (conn1 , "drop table if exists t_10000" )
28
+ conn1 .Close ()
29
+
30
+ conn2 , err := sql .Open ("libpq" , cfg2 )
31
+ checkErr (err )
32
+ exec (conn2 , "drop table if exists t_10001" )
33
+ conn2 .Close ()
34
+
28
35
29
- conn , err = pgx .Connect (cfg )
36
+ conn , err := sql .Open ("libpq" , cfg )
37
+ checkErr (err )
38
+
39
+ exec (conn , "drop extension if exists pg_shard CASCADE" )
40
+ exec (conn , "create extension pg_shard" )
41
+ exec (conn , "drop table if exists t" )
42
+ exec (conn , "create table t(u int, v int)" )
43
+ exec (conn , "select master_create_distributed_table(table_name := 't', partition_column := 'u')" )
44
+ exec (conn , "select master_create_worker_shards(table_name := 't', shard_count := 2, replication_factor := 1)" )
45
+
46
+ for i := 1 ; i <= N_ACCOUNTS ; i ++ {
47
+ exec (conn , "insert into t values(" + strconv .Itoa (i ) + ",10000)" )
48
+ }
49
+
50
+ conn .Close ()
51
+ }
52
+
53
+ func transfer (id int , wg * sync.WaitGroup ) {
54
+ conn , err := sql .Open ("libpq" , cfg )
30
55
checkErr (err )
31
56
defer conn .Close ()
32
57
58
+ uids1 := []int {1 ,3 ,4 , 5 , 7 , 8 ,10 ,14 }
59
+ uids2 := []int {2 ,6 ,9 ,11 ,12 ,13 ,18 ,21 }
60
+
33
61
for i := 0 ; i < N_ITERATIONS ; i ++ {
34
62
exec (conn , "begin" )
35
- exec (conn , "update t_10000 set v = v + 1 where u=3" )
36
- exec (conn , "update t_10000 set v = v - 1 where u=4" )
63
+ exec (conn , "update t set v = v + 1 where u=" + strconv .Itoa (uids1 [id ]))
64
+ exec (conn , "update t set v = v - 1 where u=" + strconv .Itoa (uids2 [id ]))
65
+ // exec(conn, "update t set v = v + 1 where u=1")
66
+ // exec(conn, "update t set v = v - 1 where u=2")
37
67
exec (conn , "commit" )
68
+
69
+ if i % 1000 == 0 {
70
+ fmt .Printf ("%u tx processed.\n " , i )
71
+ }
38
72
}
39
73
40
74
wg .Done ()
@@ -44,14 +78,14 @@ func inspect(wg *sync.WaitGroup) {
44
78
var sum int64
45
79
var prevSum int64 = 0
46
80
47
- conn , err := pgx . Connect ( cfg )
81
+ conn , err := sql . Open ( "libpq" , cfg )
48
82
checkErr (err )
49
83
50
84
for running {
51
- sum = execQuery (conn , "select sum(v) from t_10000 " )
85
+ sum = execQuery (conn , "select sum(v) from t " )
52
86
if sum != prevSum {
53
- fmt .Println ("Total = " , sum );
54
- prevSum = sum
87
+ fmt .Println ("Total = " , sum );
88
+ prevSum = sum
55
89
}
56
90
}
57
91
@@ -60,52 +94,47 @@ func inspect(wg *sync.WaitGroup) {
60
94
}
61
95
62
96
func main () {
63
- // var transferWg sync.WaitGroup
64
- // var inspectWg sync.WaitGroup
65
- var err error
66
- var conn * pgx.Conn
67
- var s int64
97
+ var transferWg sync.WaitGroup
98
+ var inspectWg sync.WaitGroup
68
99
69
- conn , err = pgx .Connect (cfg )
70
- checkErr (err )
71
- defer conn .Close ()
72
-
73
- // err = conn.QueryRow("select sum(v) from t_10000").Scan(&s)
74
- // checkErr(err)
100
+ prepare_db ()
75
101
76
- s = execQuery (conn , "select sum(v) from t_10000" )
77
- fmt .Println (s )
102
+ transferWg .Add (TRANSFER_CONNECTIONS )
103
+ for i := 0 ; i < TRANSFER_CONNECTIONS ; i ++ {
104
+ go transfer (i , & transferWg )
105
+ }
78
106
107
+ running = true
108
+ inspectWg .Add (1 )
109
+ go inspect (& inspectWg )
79
110
80
- // transferWg.Add(TRANSFER_CONNECTIONS )
81
- // for i:=0; i<TRANSFER_CONNECTIONS; i++ {
82
- // go transfer(i, &transferWg)
83
- // }
111
+ transferWg .Wait ( )
112
+ running = false
113
+
114
+ inspectWg . Wait ()
84
115
85
- // running = true
86
- // inspectWg.Add(1)
87
- // go inspect(&inspectWg)
116
+ // conn, err := sql.Open("libpq", cfg)
117
+ // checkErr(err)
88
118
89
- // transferWg.Wait()
119
+ // exec(conn, "begin")
120
+ // sum := execQuery(conn, "select sum(v) from t")
121
+ // exec(conn, "commit")
90
122
91
- // running = false
92
- // inspectWg.Wait()
123
+ // fmt.Println(sum)
93
124
94
125
fmt .Printf ("done\n " )
95
126
}
96
127
97
- func exec (conn * pgx. Conn , stmt string ) {
128
+ func exec (conn * sql. DB , stmt string ) {
98
129
var err error
99
130
_ , err = conn .Exec (stmt )
100
131
checkErr (err )
101
132
}
102
133
103
- func execQuery (conn * pgx. Conn , stmt string ) int64 {
134
+ func execQuery (conn * sql. DB , stmt string ) int64 {
104
135
var err error
105
136
var result int64
106
- // result, err = conn.SimpleQuery(stmt)
107
- // err = conn.QueryRow(stmt).Scan(&result)
108
- err = conn .SimpleQuery (stmt ).Scan (& result )
137
+ err = conn .QueryRow (stmt ).Scan (& result )
109
138
checkErr (err )
110
139
return result
111
140
}
@@ -116,3 +145,4 @@ func checkErr(err error) {
116
145
}
117
146
}
118
147
148
+
0 commit comments