Skip to content

Commit b0e05d0

Browse files
committed
manual merge
2 parents 754e495 + ed4f3e3 commit b0e05d0

File tree

8 files changed

+335
-24
lines changed

8 files changed

+335
-24
lines changed

pg_dtm.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,6 @@ static TransactionId DtmAdjustOldestXid(TransactionId xid)
405405
}
406406
}
407407
if (prev != NULL) {
408-
*(int*)0 = 0;
409408
local->trans_list_head = prev;
410409
xid = prev->xid;
411410
} else {

tests/dtmbench.cpp

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
#include <time.h>
2+
#include <stdio.h>
3+
#include <stdarg.h>
4+
#include <stdlib.h>
5+
#include <inttypes.h>
6+
#include <sys/time.h>
7+
#include <pthread.h>
8+
9+
#include <string>
10+
#include <vector>
11+
12+
#include <pqxx/connection>
13+
#include <pqxx/transaction>
14+
#include <pqxx/nontransaction>
15+
16+
using namespace std;
17+
using namespace pqxx;
18+
19+
template<class T>
20+
class unique_ptr
21+
{
22+
T* ptr;
23+
24+
public:
25+
unique_ptr(T* p = NULL) : ptr(p) {}
26+
~unique_ptr() { delete ptr; }
27+
T& operator*() { return *ptr; }
28+
T* operator->() { return ptr; }
29+
void operator=(T* p) { ptr = p; }
30+
void operator=(unique_ptr& other) {
31+
ptr = other.ptr;
32+
other.ptr = NULL;
33+
}
34+
};
35+
36+
typedef void* (*thread_proc_t)(void*);
37+
typedef int64_t csn_t;
38+
39+
struct thread
40+
{
41+
pthread_t t;
42+
size_t proceeded;
43+
int id;
44+
45+
void start(int tid, thread_proc_t proc) {
46+
id = tid;
47+
proceeded = 0;
48+
pthread_create(&t, NULL, proc, this);
49+
}
50+
51+
void wait() {
52+
pthread_join(t, NULL);
53+
}
54+
};
55+
56+
struct config
57+
{
58+
int nReaders;
59+
int nWriters;
60+
int nIterations;
61+
int nAccounts;
62+
vector<string> connections;
63+
64+
config() {
65+
nReaders = 1;
66+
nWriters = 10;
67+
nIterations = 1000;
68+
nAccounts = 1000;
69+
}
70+
};
71+
72+
config cfg;
73+
bool running;
74+
75+
#define USEC 1000000
76+
77+
static time_t getCurrentTime()
78+
{
79+
struct timeval tv;
80+
gettimeofday(&tv, NULL);
81+
return (time_t)tv.tv_sec*USEC + tv.tv_usec;
82+
}
83+
84+
85+
void exec(transaction_base& txn, char const* sql, ...)
86+
{
87+
va_list args;
88+
va_start(args, sql);
89+
char buf[1024];
90+
vsprintf(buf, sql, args);
91+
va_end(args);
92+
txn.exec(buf);
93+
}
94+
95+
int64_t execQuery( transaction_base& txn, char const* sql, ...)
96+
{
97+
va_list args;
98+
va_start(args, sql);
99+
char buf[1024];
100+
vsprintf(buf, sql, args);
101+
va_end(args);
102+
result r = txn.exec(buf);
103+
return r[0][0].as(int64_t());
104+
}
105+
106+
void* reader(void* arg)
107+
{
108+
thread& t = *(thread*)arg;
109+
vector< unique_ptr<connection> > conns(cfg.connections.size());
110+
for (size_t i = 0; i < conns.size(); i++) {
111+
conns[i] = new connection(cfg.connections[i]);
112+
}
113+
int64_t prevSum = 0;
114+
115+
while (running) {
116+
csn_t snapshot;
117+
vector< unique_ptr<work> > txns(conns.size());
118+
for (size_t i = 0; i < conns.size(); i++) {
119+
txns[i] = new work(*conns[i]);
120+
}
121+
for (size_t i = 0; i < txns.size(); i++) {
122+
if (i == 0) {
123+
snapshot = execQuery(*txns[i], "select dtm_extend()");
124+
} else {
125+
snapshot = execQuery(*txns[i], "select dtm_access(%ld)", snapshot);
126+
}
127+
}
128+
int64_t sum = 0;
129+
for (size_t i = 0; i < txns.size(); i++) {
130+
sum += execQuery(*txns[i], "select sum(v) from t");
131+
}
132+
if (sum != prevSum) {
133+
printf("Total=%ld snapshot=%ld\n", sum, snapshot);
134+
prevSum = sum;
135+
}
136+
t.proceeded += 1;
137+
}
138+
return NULL;
139+
}
140+
141+
void* writer(void* arg)
142+
{
143+
thread& t = *(thread*)arg;
144+
vector< unique_ptr<connection> > conns(cfg.connections.size());
145+
for (size_t i = 0; i < conns.size(); i++) {
146+
conns[i] = new connection(cfg.connections[i]);
147+
}
148+
for (int i = 0; i < cfg.nIterations; i++)
149+
{
150+
char gtid[32];
151+
int srcCon, dstCon;
152+
int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
153+
int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
154+
155+
sprintf(gtid, "%d.%d", t.id, i);
156+
157+
do {
158+
srcCon = random() % cfg.connections.size();
159+
dstCon = random() % cfg.connections.size();
160+
} while (srcCon == dstCon);
161+
162+
nontransaction srcTx(*conns[srcCon]);
163+
nontransaction dstTx(*conns[dstCon]);
164+
165+
exec(srcTx, "begin transaction");
166+
exec(dstTx, "begin transaction");
167+
168+
csn_t snapshot = execQuery(srcTx, "select dtm_extend('%s')", gtid);
169+
snapshot = execQuery(dstTx, "select dtm_access(%ld, '%s')", snapshot, gtid);
170+
171+
exec(srcTx, "update t set v = v - 1 where u=%d", srcAcc);
172+
exec(dstTx, "update t set v = v + 1 where u=%d", dstAcc);
173+
174+
exec(srcTx, "prepare transaction '%s'", gtid);
175+
exec(dstTx, "prepare transaction '%s'", gtid);
176+
exec(srcTx, "select dtm_begin_prepare('%s')", gtid);
177+
exec(dstTx, "select dtm_begin_prepare('%s')", gtid);
178+
csn_t csn = execQuery(srcTx, "select dtm_prepare('%s', 0)", gtid);
179+
csn = execQuery(dstTx, "select dtm_prepare('%s', %ld)", gtid, csn);
180+
exec(srcTx, "select dtm_end_prepare('%s', %ld)", gtid, csn);
181+
exec(dstTx, "select dtm_end_prepare('%s', %ld)", gtid, csn);
182+
exec(srcTx, "commit prepared '%s'", gtid);
183+
exec(dstTx, "commit prepared '%s'", gtid);
184+
185+
t.proceeded += 1;
186+
}
187+
return NULL;
188+
}
189+
190+
void initializeDatabase()
191+
{
192+
for (size_t i = 0; i < cfg.connections.size(); i++) {
193+
connection conn(cfg.connections[i]);
194+
work txn(conn);
195+
exec(txn, "drop extension if exists pg_dtm");
196+
exec(txn, "create extension pg_dtm");
197+
exec(txn, "drop table if exists t");
198+
exec(txn, "create table t(u int primary key, v int)");
199+
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
200+
txn.commit();
201+
202+
// nontransaction vacTx(conn);
203+
// exec(vacTx, "vacuum full");
204+
}
205+
}
206+
207+
int main (int argc, char* argv[])
208+
{
209+
bool initialize = false;
210+
for (int i = 1; i < argc; i++) {
211+
if (argv[i][0] == '-') {
212+
switch (argv[i][1]) {
213+
case 'r':
214+
cfg.nReaders = atoi(argv[++i]);
215+
continue;
216+
case 'w':
217+
cfg.nWriters = atoi(argv[++i]);
218+
continue;
219+
case 'a':
220+
cfg.nAccounts = atoi(argv[++i]);
221+
continue;
222+
case 'n':
223+
cfg.nIterations = atoi(argv[++i]);
224+
continue;
225+
case 'C':
226+
cfg.connections.push_back(string(argv[++i]));
227+
continue;
228+
case 'i':
229+
initialize = true;
230+
continue;
231+
}
232+
}
233+
printf("Options:\n"
234+
"\t-r N\tnumber of readers (1)\n"
235+
"\t-w N\tnumber of writers (10)\n"
236+
"\t-a N\tnumber of accounts (1000)\n"
237+
"\t-n N\tnumber of iterations (1000)\n"
238+
"\t-c STR\tdatabase connection string\n"
239+
"\t-i\tinitialize datanase\n");
240+
return 1;
241+
}
242+
if (initialize) {
243+
initializeDatabase();
244+
}
245+
246+
time_t start = getCurrentTime();
247+
running = true;
248+
249+
vector<thread> readers(cfg.nReaders);
250+
vector<thread> writers(cfg.nWriters);
251+
size_t nReads = 0;
252+
size_t nWrites = 0;
253+
254+
for (int i = 0; i < cfg.nReaders; i++) {
255+
readers[i].start(i, reader);
256+
}
257+
for (int i = 0; i < cfg.nWriters; i++) {
258+
writers[i].start(i, writer);
259+
}
260+
261+
for (int i = 0; i < cfg.nWriters; i++) {
262+
writers[i].wait();
263+
nWrites += writers[i].proceeded;
264+
}
265+
266+
running = false;
267+
268+
for (int i = 0; i < cfg.nReaders; i++) {
269+
readers[i].wait();
270+
nReads += readers[i].proceeded;
271+
}
272+
273+
time_t elapsed = getCurrentTime() - start;
274+
printf("TPS(updates)=%f, TPS(selects)=%f\n", (double)(nWrites*USEC)/elapsed, (double)(nReads*USEC)/elapsed);
275+
return 0;
276+
}

tests/makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CXX=g++
2+
CXXFLAGS=-g -Wall -O2 -pthread
3+
4+
all: dtmbench
5+
6+
dtmbench: dtmbench.cpp
7+
$(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
8+
9+
clean:
10+
rm -f dtmbench

tests/perf.yml

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,20 @@
1616
connections: "{{ connstrs.results | map(attribute='ansible_facts.connstr') | join }}"
1717

1818
- name: copy transfers binary
19-
copy: src=./perf/{{item}} dest=~/{{item}} mode=0755
19+
copy: src=./{{item}} dest=~/{{item}} mode=0755
2020
with_items:
21-
- "perf.go"
22-
- "transfers.go"
21+
- "dtmbench.cpp"
2322

24-
- hosts: clients[0]
25-
gather_facts: no
26-
tasks:
27-
- name: fill the databases
28-
shell: "go run ~/perf.go ~/transfers.go {{connections}} -g -i"
29-
register: transfers_result
30-
- debug: "var=transfers_result"
23+
- name: compile dtmbench
24+
shell: "g++ -g -Wall -O2 -o dtmbench dtmbench.cpp -lpqxx -pthread "
3125

3226
- hosts: clients[0]
3327
gather_facts: no
3428
tasks:
3529
- name: run transfers
36-
shell: "go run ~/perf.go ~/transfers.go {{connections}} {{runkeys | d('-g -w 200 -r 1 -n 1000 -a 100000')}}"
30+
shell: "./dtmbench {{connections}} -w {{item}} -r 1 -n 1000 -a 100000 -i "
3731
register: transfers_result
38-
- debug: var=transfers_result
32+
with_sequence: start=20 end=400 stride=20
33+
34+
- debug: msg={{transfers_result.results | map(attribute='stdout_lines') | join }}
3935

tests/perf/perf.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"os"
77
"sync"
88
"time"
9+
// "runtime"
910
"github.com/jackc/pgx"
11+
// "runtime/pprof"
1012
)
1113

1214
type ConnStrings []string
@@ -28,6 +30,7 @@ var cfg struct {
2830
AccountsNum int
2931
ReadersNum int
3032
IterNum int
33+
Profile string
3134

3235
Writers struct {
3336
Num int
@@ -99,6 +102,7 @@ func init() {
99102
"Show progress and other stuff for mortals")
100103
flag.BoolVar(&cfg.Parallel, "p", false,
101104
"Use parallel execs")
105+
flag.StringVar(&cfg.Profile, "cpuprofile", "", "write cpu profile to file")
102106
repread := flag.Bool("l", false,
103107
"Use 'repeatable read' isolation level instead of 'read committed'")
104108
flag.Parse()
@@ -129,6 +133,17 @@ func main() {
129133
fmt.Println("ERROR: This test needs at leas two connections")
130134
os.Exit(1)
131135
}
136+
// runtime.GOMAXPROCS(100)
137+
138+
// if cfg.Profile != "" {
139+
// f, err := os.Create(cfg.Profile)
140+
// if err != nil {
141+
// fmt.Println("Failed to create profile file")
142+
// os.Exit(1)
143+
// }
144+
// // pprof.StartCPUProfile(f)
145+
// // defer pprof.StopCPUProfile()
146+
// }
132147

133148
// switch cfg.Backend {
134149
// case "transfers":

tests/perf/run.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
go run *.go \
2+
-C "dbname=postgres user=knizhnik port=5432 sslmode=disable" \
3+
-C "dbname=postgres user=knizhnik port=5433 sslmode=disable" \
4+
-C "dbname=postgres user=knizhnik port=5434 sslmode=disable" \
5+
-g -n 1000 -a 1000 -w 10 -r 1 $*

0 commit comments

Comments
 (0)