Skip to content

Commit e1d6dd3

Browse files
committed
Merge branch 'master' of github.com:postgrespro/pg_dtm
2 parents dd2a58e + 19d012c commit e1d6dd3

File tree

3 files changed

+307
-0
lines changed

3 files changed

+307
-0
lines changed

tests/dtmbench.cpp

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
#include <time.h>
2+
#include <stdio.h>
3+
#include <stdarg.h>
4+
#include <stdlib.h>
5+
#include <inttypes.h>
6+
#include <sys/time.h>
7+
#include <pthread.h>
8+
9+
#include <string>
10+
#include <vector>
11+
12+
#include <pqxx/connection>
13+
#include <pqxx/transaction>
14+
#include <pqxx/nontransaction>
15+
#include <pqxx/pipeline>
16+
17+
using namespace std;
18+
using namespace pqxx;
19+
20+
template<class T>
21+
class unique_ptr
22+
{
23+
T* ptr;
24+
25+
public:
26+
unique_ptr(T* p = NULL) : ptr(p) {}
27+
~unique_ptr() { delete ptr; }
28+
T& operator*() { return *ptr; }
29+
T* operator->() { return ptr; }
30+
void operator=(T* p) { ptr = p; }
31+
void operator=(unique_ptr& other) {
32+
ptr = other.ptr;
33+
other.ptr = NULL;
34+
}
35+
};
36+
37+
typedef void* (*thread_proc_t)(void*);
38+
typedef uint32_t xid_t;
39+
40+
struct thread
41+
{
42+
pthread_t t;
43+
size_t proceeded;
44+
size_t aborts;
45+
int id;
46+
47+
void start(int tid, thread_proc_t proc) {
48+
id = tid;
49+
proceeded = 0;
50+
aborts = 0;
51+
pthread_create(&t, NULL, proc, this);
52+
}
53+
54+
void wait() {
55+
pthread_join(t, NULL);
56+
}
57+
};
58+
59+
struct config
60+
{
61+
int nReaders;
62+
int nWriters;
63+
int nIterations;
64+
int nAccounts;
65+
char const* isolationLevel;
66+
vector<string> connections;
67+
68+
config() {
69+
nReaders = 1;
70+
nWriters = 10;
71+
nIterations = 1000;
72+
nAccounts = 1000;
73+
isolationLevel = "read committed";
74+
}
75+
};
76+
77+
config cfg;
78+
bool running;
79+
80+
#define USEC 1000000
81+
82+
static time_t getCurrentTime()
83+
{
84+
struct timeval tv;
85+
gettimeofday(&tv, NULL);
86+
return (time_t)tv.tv_sec*USEC + tv.tv_usec;
87+
}
88+
89+
90+
void exec(transaction_base& txn, char const* sql, ...)
91+
{
92+
va_list args;
93+
va_start(args, sql);
94+
char buf[1024];
95+
vsprintf(buf, sql, args);
96+
va_end(args);
97+
txn.exec(buf);
98+
}
99+
100+
xid_t execQuery( transaction_base& txn, char const* sql, ...)
101+
{
102+
va_list args;
103+
va_start(args, sql);
104+
char buf[1024];
105+
vsprintf(buf, sql, args);
106+
va_end(args);
107+
result r = txn.exec(buf);
108+
return r[0][0].as(xid_t());
109+
}
110+
111+
void* reader(void* arg)
112+
{
113+
thread& t = *(thread*)arg;
114+
vector< unique_ptr<connection> > conns(cfg.connections.size());
115+
for (size_t i = 0; i < conns.size(); i++) {
116+
conns[i] = new connection(cfg.connections[i]);
117+
}
118+
int64_t prevSum = 0;
119+
120+
while (running) {
121+
xid_t xid = 0;
122+
for (size_t i = 0; i < conns.size(); i++) {
123+
work txn(*conns[i]);
124+
if (i == 0) {
125+
xid = execQuery(txn, "select dtm_begin_transaction()");
126+
} else {
127+
exec(txn, "select dtm_join_transaction(%u)", xid);
128+
}
129+
txn.commit();
130+
}
131+
vector< unique_ptr<nontransaction> > txns(conns.size());
132+
vector< unique_ptr<pipeline> > pipes(conns.size());
133+
vector<pipeline::query_id> results(conns.size());
134+
for (size_t i = 0; i < conns.size(); i++) {
135+
txns[i] = new nontransaction(*conns[i]);
136+
pipes[i] = new pipeline(*txns[i]);
137+
results[i] = pipes[i]->insert("select sum(v) from t");
138+
}
139+
int64_t sum = 0;
140+
for (size_t i = 0; i < conns.size(); i++) {
141+
pipes[i]->complete();
142+
result r = pipes[i]->retrieve(results[i]);
143+
sum += r[0][0].as(int64_t());
144+
}
145+
if (sum != prevSum) {
146+
printf("Total=%ld xid=%u\n", sum, xid);
147+
prevSum = sum;
148+
}
149+
t.proceeded += 1;
150+
}
151+
return NULL;
152+
}
153+
154+
void* writer(void* arg)
155+
{
156+
thread& t = *(thread*)arg;
157+
vector< unique_ptr<connection> > conns(cfg.connections.size());
158+
for (size_t i = 0; i < conns.size(); i++) {
159+
conns[i] = new connection(cfg.connections[i]);
160+
}
161+
for (int i = 0; i < cfg.nIterations; i++)
162+
{
163+
int srcCon, dstCon;
164+
int srcAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
165+
int dstAcc = (random() % ((cfg.nAccounts-cfg.nWriters)/cfg.nWriters))*cfg.nWriters + t.id;
166+
167+
do {
168+
srcCon = random() % cfg.connections.size();
169+
dstCon = random() % cfg.connections.size();
170+
} while (srcCon == dstCon);
171+
172+
nontransaction srcTx(*conns[srcCon]);
173+
nontransaction dstTx(*conns[dstCon]);
174+
175+
xid_t xid = execQuery(srcTx, "select dtm_begin_transaction()");
176+
exec(dstTx, "select dtm_join_transaction(%u)", xid);
177+
178+
exec(srcTx, "begin transaction isolation level %s", cfg.isolationLevel);
179+
exec(dstTx, "begin transaction isolation level %s", cfg.isolationLevel);
180+
181+
try {
182+
exec(srcTx, "update t set v = v - 1 where u=%d", srcAcc);
183+
exec(dstTx, "update t set v = v + 1 where u=%d", dstAcc);
184+
} catch (pqxx_exception const& x) {
185+
exec(srcTx, "rollback");
186+
exec(srcTx, "rollback");
187+
t.aborts += 1;
188+
i -= 1;
189+
continue;
190+
}
191+
pipeline srcPipe(srcTx);
192+
pipeline dstPipe(dstTx);
193+
srcPipe.insert("commit transaction");
194+
dstPipe.insert("commit transaction");
195+
srcPipe.complete();
196+
dstPipe.complete();
197+
198+
t.proceeded += 1;
199+
}
200+
return NULL;
201+
}
202+
203+
void initializeDatabase()
204+
{
205+
for (size_t i = 0; i < cfg.connections.size(); i++) {
206+
connection conn(cfg.connections[i]);
207+
work txn(conn);
208+
exec(txn, "drop extension if exists pg_dtm");
209+
exec(txn, "create extension pg_dtm");
210+
exec(txn, "drop table if exists t");
211+
exec(txn, "create table t(u int primary key, v int)");
212+
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
213+
txn.commit();
214+
}
215+
}
216+
217+
int main (int argc, char* argv[])
218+
{
219+
bool initialize = false;
220+
for (int i = 1; i < argc; i++) {
221+
if (argv[i][0] == '-') {
222+
switch (argv[i][1]) {
223+
case 'r':
224+
cfg.nReaders = atoi(argv[++i]);
225+
continue;
226+
case 'w':
227+
cfg.nWriters = atoi(argv[++i]);
228+
continue;
229+
case 'a':
230+
cfg.nAccounts = atoi(argv[++i]);
231+
continue;
232+
case 'n':
233+
cfg.nIterations = atoi(argv[++i]);
234+
continue;
235+
case 'c':
236+
cfg.connections.push_back(string(argv[++i]));
237+
continue;
238+
case 'l':
239+
cfg.isolationLevel = argv[++i];
240+
continue;
241+
case 'i':
242+
initialize = true;
243+
continue;
244+
}
245+
}
246+
printf("Options:\n"
247+
"\t-r N\tnumber of readers (1)\n"
248+
"\t-w N\tnumber of writers (10)\n"
249+
"\t-a N\tnumber of accounts (1000)\n"
250+
"\t-n N\tnumber of iterations (1000)\n"
251+
"\t-l STR\tisolation level (read committed)\n"
252+
"\t-c STR\tdatabase connection string\n"
253+
"\t-i\tinitialize datanase\n");
254+
return 1;
255+
}
256+
if (initialize) {
257+
initializeDatabase();
258+
}
259+
260+
time_t start = getCurrentTime();
261+
running = true;
262+
263+
vector<thread> readers(cfg.nReaders);
264+
vector<thread> writers(cfg.nWriters);
265+
size_t nReads = 0;
266+
size_t nWrites = 0;
267+
size_t nAborts = 0;
268+
269+
for (int i = 0; i < cfg.nReaders; i++) {
270+
readers[i].start(i, reader);
271+
}
272+
for (int i = 0; i < cfg.nWriters; i++) {
273+
writers[i].start(i, writer);
274+
}
275+
276+
for (int i = 0; i < cfg.nWriters; i++) {
277+
writers[i].wait();
278+
nWrites += writers[i].proceeded;
279+
nAborts += writers[i].aborts;
280+
}
281+
282+
running = false;
283+
284+
for (int i = 0; i < cfg.nReaders; i++) {
285+
readers[i].wait();
286+
nReads += readers[i].proceeded;
287+
}
288+
289+
time_t elapsed = getCurrentTime() - start;
290+
printf("TPS(updates)=%f, TPS(selects)=%f, aborts=%ld\n", (double)(nWrites*USEC)/elapsed, (double)(nReads*USEC)/elapsed, nAborts);
291+
return 0;
292+
}

tests/makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CXX=g++
2+
CXXFLAGS=-g -Wall -O2 -pthread
3+
4+
all: dtmbench
5+
6+
dtmbench: dtmbench.cpp
7+
$(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
8+
9+
clean:
10+
rm -f dtmbench

tests/run.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
./dtmbench \
2+
-c "dbname=postgres host=localhost user=knizhnik port=5432 sslmode=disable" \
3+
-c "dbname=postgres host=localhost user=knizhnik port=5433 sslmode=disable" \
4+
-c "dbname=postgres host=localhost user=knizhnik port=5434 sslmode=disable" \
5+
-n 1000 -a 1000 -w 10 -r 1 $*

0 commit comments

Comments
 (0)