Skip to content

Commit 774da5a

Browse files
committed
2 parents 6059981 + 7f3e06a commit 774da5a

File tree

3 files changed

+185
-9
lines changed

3 files changed

+185
-9
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,6 @@ lib*.pc
3939
/Debug/
4040
/Release/
4141
/tmp_install/
42+
/contrib/mmts/tests/node*
43+
/install/
44+
/.vscode/

contrib/mmts/tests2/client2.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
#!/usr/bin/env python3
2+
import asyncio
3+
import uvloop
4+
import aiopg
5+
import random
6+
import psycopg2
7+
import time
8+
import datetime
9+
import copy
10+
import aioprocessing
11+
import multiprocessing
12+
13+
class MtmTxAggregate(object):
14+
15+
def __init__(self, name):
16+
self.name = name
17+
self.isolation = 0
18+
self.clear_values()
19+
20+
def clear_values(self):
21+
self.max_latency = 0.0
22+
self.finish = {}
23+
24+
def start_tx(self):
25+
self.start_time = datetime.datetime.now()
26+
27+
def finish_tx(self, name):
28+
latency = (datetime.datetime.now() - self.start_time).total_seconds()
29+
30+
if latency > self.max_latency:
31+
self.max_latency = latency
32+
33+
if name not in self.finish:
34+
self.finish[name] = 1
35+
else:
36+
self.finish[name] += 1
37+
38+
def as_dict(self):
39+
return {
40+
'running_latency': (datetime.datetime.now() - self.start_time).total_seconds(),
41+
'max_latency': self.max_latency,
42+
'isolation': self.isolation,
43+
'finish': copy.deepcopy(self.finish)
44+
}
45+
46+
class MtmClient(object):
47+
48+
def __init__(self, dsns, n_accounts=100000):
49+
self.n_accounts = n_accounts
50+
self.dsns = dsns
51+
self.aggregates = {}
52+
self.initdb()
53+
54+
def initdb(self):
55+
conn = psycopg2.connect(self.dsns[0])
56+
cur = conn.cursor()
57+
cur.execute('create extension if not exists multimaster')
58+
conn.commit()
59+
cur.execute('drop table if exists bank_test')
60+
cur.execute('create table bank_test(uid int primary key, amount int)')
61+
cur.execute('''
62+
insert into bank_test
63+
select *, 0 from generate_series(0, %s)''',
64+
(self.n_accounts,))
65+
conn.commit()
66+
cur.close()
67+
conn.close()
68+
69+
async def status(self):
70+
while True:
71+
msg = await self.child_pipe.coro_recv()
72+
if msg == 'status':
73+
serialized_aggs = {}
74+
for name, aggregate in self.aggregates.items():
75+
serialized_aggs[name] = aggregate.as_dict()
76+
aggregate.clear_values()
77+
self.child_pipe.send(serialized_aggs)
78+
79+
async def exec_tx(self, tx_block, aggname_prefix, conn_i):
80+
aggname = "%s_%i" % (aggname_prefix, conn_i)
81+
agg = self.aggregates[aggname] = MtmTxAggregate(aggname)
82+
83+
pool = await aiopg.create_pool(self.dsns[conn_i])
84+
async with pool.acquire() as conn:
85+
async with conn.cursor() as cur:
86+
while True:
87+
agg.start_tx()
88+
try:
89+
await tx_block(conn, cur)
90+
agg.finish_tx('commit')
91+
except psycopg2.Error as e:
92+
await cur.execute('rollback')
93+
agg.finish_tx(e.pgerror)
94+
95+
async def transfer_tx(self, conn, cur):
96+
amount = 1
97+
# to avoid deadlocks:
98+
from_uid = random.randint(1, self.n_accounts - 2)
99+
to_uid = from_uid + 1
100+
await cur.execute('begin')
101+
await cur.execute('''update bank_test
102+
set amount = amount - %s
103+
where uid = %s''',
104+
(amount, from_uid))
105+
await cur.execute('''update bank_test
106+
set amount = amount + %s
107+
where uid = %s''',
108+
(amount, to_uid))
109+
await cur.execute('commit')
110+
111+
async def total_tx(self, conn, cur):
112+
await cur.execute('select sum(amount) from bank_test')
113+
total = await cur.fetchone()
114+
if total[0] != 0:
115+
self.isolation_errors += 1
116+
117+
def run(self):
118+
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
119+
self.loop = asyncio.get_event_loop()
120+
121+
for i, _ in enumerate(self.dsns):
122+
asyncio.ensure_future(self.exec_tx(self.transfer_tx, 'transfer', i))
123+
asyncio.ensure_future(self.exec_tx(self.total_tx, 'sumtotal', i))
124+
125+
asyncio.ensure_future(self.status())
126+
127+
self.loop.run_forever()
128+
129+
def bgrun(self):
130+
print('Starting evloop in different process');
131+
self.parent_pipe, self.child_pipe = aioprocessing.AioPipe()
132+
self.evloop_process = multiprocessing.Process(target=self.run, args=())
133+
self.evloop_process.start()
134+
135+
def get_status(self):
136+
c.parent_pipe.send('status')
137+
return c.parent_pipe.recv()
138+
139+
def print_aggregates(serialized_agg):
140+
columns = ['running_latency', 'max_latency', 'isolation', 'finish']
141+
142+
# print table header
143+
print("\t\t", end="")
144+
for col in columns:
145+
print(col, end="\t")
146+
print("\n", end="")
147+
148+
serialized_agg
149+
150+
for aggname in sorted(serialized_agg.keys()):
151+
agg = serialized_agg[aggname]
152+
print("%s\t" % aggname, end="")
153+
for col in columns:
154+
if col in agg:
155+
if isinstance(agg[col], float):
156+
print("%.2f\t" % (agg[col],), end="\t")
157+
else:
158+
print(agg[col], end="\t")
159+
else:
160+
print("-\t", end="")
161+
print("")
162+
print("")
163+
164+
c = MtmClient(['dbname=postgres user=stas host=127.0.0.1',
165+
'dbname=postgres user=stas host=127.0.0.1 port=5433',
166+
'dbname=postgres user=stas host=127.0.0.1 port=5434'], n_accounts=10000)
167+
c.bgrun()
168+
169+
while True:
170+
time.sleep(1)
171+
aggs = c.get_status()
172+
print_aggregates(aggs)
173+
# for k, v in aggs.items():
174+
# print(k, v.finish)
175+

contrib/raftable/worker.c

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ static int create_listening_socket(const char *host, int port)
107107
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
108108
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char const*)&optval, sizeof(optval));
109109

110-
fprintf(stderr, "binding tcp %s:%d\n", host, port);
110+
elog(DEBUG1, "binding tcp %s:%d\n", host, port);
111111
if (bind(s, a->ai_addr, a->ai_addrlen) < 0)
112112
{
113113
elog(WARNING, "cannot bind the listening socket: %s", strerror(errno));
@@ -133,7 +133,7 @@ static bool add_client(int sock)
133133

134134
if (server.clientnum >= MAX_CLIENTS)
135135
{
136-
fprintf(stderr, "client limit hit\n");
136+
elog(WARNING, "client limit hit\n");
137137
return false;
138138
}
139139

@@ -193,17 +193,17 @@ static bool accept_client(void)
193193
{
194194
int fd;
195195

196-
fprintf(stderr, "a new connection is queued\n");
196+
elog(DEBUG1, "a new connection is queued\n");
197197

198198
fd = accept(server.listener, NULL, NULL);
199199
if (fd == -1) {
200-
fprintf(stderr, "failed to accept a connection: %s\n", strerror(errno));
200+
elog(WARNING, "failed to accept a connection: %s\n", strerror(errno));
201201
return false;
202202
}
203-
fprintf(stderr, "a new connection fd=%d accepted\n", fd);
203+
elog(DEBUG1, "a new connection fd=%d accepted\n", fd);
204204

205205
if (!raft_is_leader(raft)) {
206-
fprintf(stderr, "not a leader, disconnecting the accepted connection fd=%d\n", fd);
206+
elog(DEBUG1, "not a leader, disconnecting the accepted connection fd=%d\n", fd);
207207
close(fd);
208208
return false;
209209
}
@@ -256,7 +256,6 @@ static void on_message_recv(Client *c)
256256
key = f->data;
257257

258258
value = state_get(state, key, &vallen);
259-
fprintf(stderr, "query='%s' answer(%d)='%.*s'\n", key, (int)vallen, (int)vallen, value);
260259
answer = make_single_value_message(key, value, vallen, &answersize);
261260
answer->meaning = MEAN_OK;
262261
if (value) pfree(value);
@@ -270,7 +269,7 @@ static void on_message_recv(Client *c)
270269
}
271270
else
272271
{
273-
fprintf(stderr, "unknown meaning %d (%c) of the client's message\n", rm->meaning, rm->meaning);
272+
elog(WARNING, "unknown meaning %d (%c) of the client's message\n", rm->meaning, rm->meaning);
274273
c->state = CLIENT_SICK;
275274
}
276275
}
@@ -280,7 +279,6 @@ static void on_message_send(Client *c)
280279
Assert(c->state == CLIENT_RECVING);
281280
Assert(c->msg != NULL);
282281
Assert(c->cursor == c->msg->len + sizeof(c->msg->len));
283-
fprintf(stderr, "freeing msg = %p\n", c->msg);
284282
pfree(c->msg);
285283
c->msg = NULL;
286284
c->state = CLIENT_SENDING;

0 commit comments

Comments
 (0)