4
4
import time
5
5
import sys
6
6
from event_history import *
7
+ import select
8
+ import signal
7
9
8
10
class ClientCollection (object ):
9
11
def __init__ (self , connstrs ):
@@ -30,17 +32,29 @@ def stop(self):
30
32
for client in self ._clients :
31
33
client .stop ()
32
34
35
+ def set_acc_to_tx (self , max_acc ):
36
+ for client in self ._clients :
37
+ client .set_acc_to_tx (max_acc )
38
+
33
39
34
40
class BankClient (object ):
35
41
36
- def __init__ (self , connstr , node_id ):
42
+ def __init__ (self , connstr , node_id , accounts = 10000 ):
37
43
self .connstr = connstr
38
44
self .node_id = node_id
39
45
self .run = Value ('b' , True )
40
46
self ._history = EventHistory ()
41
- self .accounts = 10000
47
+ self .accounts = accounts
48
+ self .accounts_to_tx = accounts
42
49
self .show_errors = True
43
50
51
+ #x = self
52
+ #def on_sigint(sig, frame):
53
+ # x.stop()
54
+ #
55
+ #signal.signal(signal.SIGINT, on_sigint)
56
+
57
+
44
58
def initialize (self ):
45
59
conn = psycopg2 .connect (self .connstr )
46
60
cur = conn .cursor ()
@@ -57,6 +71,22 @@ def initialize(self):
57
71
cur .close ()
58
72
conn .close ()
59
73
74
+ def aconn (self ):
75
+ return psycopg2 .connect (self .connstr , async = 1 )
76
+
77
+ @classmethod
78
+ def wait (cls , conn ):
79
+ while 1 :
80
+ state = conn .poll ()
81
+ if state == psycopg2 .extensions .POLL_OK :
82
+ break
83
+ elif state == psycopg2 .extensions .POLL_WRITE :
84
+ select .select ([], [conn .fileno ()], [])
85
+ elif state == psycopg2 .extensions .POLL_READ :
86
+ select .select ([conn .fileno ()], [], [])
87
+ else :
88
+ raise psycopg2 .OperationalError ("poll() returned %s" % state )
89
+
60
90
@property
61
91
def history (self ):
62
92
return self ._history
@@ -74,25 +104,16 @@ def exec_tx(self, name, tx_block):
74
104
75
105
if conn .closed :
76
106
self .history .register_finish (event_id , 'ReConnect' )
77
- try :
78
- conn = psycopg2 .connect (self .connstr )
79
- cur = conn .cursor ()
80
- except :
81
- continue
82
- else :
83
- continue
107
+ conn = psycopg2 .connect (self .connstr )
108
+ cur = conn .cursor ()
84
109
85
110
try :
86
- tx_block (conn , cur )
111
+ tx_block (conn , cur )
112
+ self .history .register_finish (event_id , 'Commit' )
87
113
except psycopg2 .InterfaceError :
88
114
self .history .register_finish (event_id , 'InterfaceError' )
89
115
except psycopg2 .Error :
90
116
self .history .register_finish (event_id , 'PsycopgError' )
91
- except :
92
- print (sys .exc_info ())
93
- self .history .register_finish (event_id , 'OtherError' )
94
- else :
95
- self .history .register_finish (event_id , 'Commit' )
96
117
97
118
cur .close ()
98
119
conn .close ()
@@ -103,17 +124,20 @@ def tx(conn, cur):
103
124
cur .execute ('select sum(amount) from bank_test' )
104
125
res = cur .fetchone ()
105
126
if res [0 ] != 0 :
106
- print ("Isolation error, total = %d" % (res [0 ],))
127
+ print ("Isolation error, total = %d, node = %d " % (res [0 ],self . node_id ))
107
128
raise BaseException
108
129
109
130
self .exec_tx ('total' , tx )
110
131
132
+ def set_acc_to_tx (self , max_acc ):
133
+ self .accounts_to_tx = max_acc
134
+
111
135
def transfer_money (self ):
112
136
113
137
def tx (conn , cur ):
114
138
amount = 1
115
- from_uid = random .randrange (1 , self .accounts - 10 )
116
- to_uid = from_uid + 1 # random.randrange(1, self.accounts + 1)
139
+ from_uid = random .randrange (1 , self .accounts_to_tx - 1 )
140
+ to_uid = random .randrange (1 , self .accounts_to_tx - 1 )
117
141
118
142
conn .commit ()
119
143
cur .execute ('''update bank_test
@@ -129,7 +153,10 @@ def tx(conn, cur):
129
153
self .exec_tx ('transfer' , tx )
130
154
131
155
def start (self ):
132
- self .transfer_process = Process (target = self .transfer_money , args = ())
156
+ print ('Starting client' );
157
+ self .run .value = True
158
+
159
+ self .transfer_process = Process (target = self .transfer_money , name = "txor" , args = ())
133
160
self .transfer_process .start ()
134
161
135
162
self .total_process = Process (target = self .check_total , args = ())
@@ -138,7 +165,7 @@ def start(self):
138
165
return
139
166
140
167
def stop (self ):
141
- print ('Stopping! ' );
168
+ print ('Stopping client ' );
142
169
self .run .value = False
143
170
self .total_process .terminate ()
144
171
self .transfer_process .terminate ()
0 commit comments