Skip to content

Commit 3440a20

Browse files
author
sunny
committed
Add dbpool.py
1 parent 7c7b1d9 commit 3440a20

File tree

1 file changed

+324
-0
lines changed

1 file changed

+324
-0
lines changed

lib/mysql/connector/dbpool.py

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import time
5+
import queue
6+
from typing import (
7+
NamedTuple,
8+
Optional,
9+
)
10+
import threading
11+
import logging
12+
import math
13+
14+
from mysql.connector import MySQLConnection
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
class PoolError(Exception):
20+
"""Pool Error"""
21+
22+
23+
class NoAvailableConnectionError(PoolError):
24+
"""no available conneciton"""
25+
26+
27+
class CreateConnectionError(PoolError):
28+
"""When create new conneciton """
29+
30+
31+
class TestConnectionError(PoolError):
32+
"""test connection error"""
33+
34+
35+
class PoolOption(NamedTuple):
36+
"""
37+
38+
Arguments:
39+
min_idle (int): Hold min idle connection count.
40+
max_idle (int): Hold max idle connection count.
41+
max_age_in_sec (float): When a connection expired, reconnect.
42+
Default: 300.0 seconds
43+
check_idle_interval (float): Check idle thread run interval time.
44+
Default: 60.0 seconds
45+
46+
"""
47+
min_idle: int
48+
max_idle: int
49+
max_age_in_sec: float = 300.0
50+
check_idle_interval: float = 60.0
51+
52+
def check(self):
53+
if self.min_idle < 0:
54+
raise PoolError('min_idle should be greater than 0')
55+
if self.max_idle < self.min_idle:
56+
raise PoolError('max_idle shouble be granter than min_idle')
57+
58+
59+
class ConnectionQueue(queue.Queue):
60+
def __init__(self, maxsize):
61+
super().__init__(maxsize)
62+
63+
def __contains__(self, item) -> bool:
64+
with self.mutex:
65+
return item in self.queue
66+
67+
def remove(self, item) -> Optional:
68+
with self.mutex:
69+
try:
70+
self.queue.remove(item)
71+
return item
72+
except ValueError:
73+
return None
74+
75+
def free_all(self) -> int:
76+
with self.mutex:
77+
free_cnt = 0
78+
for conn in self.queue:
79+
conn.discard_and_close()
80+
free_cnt += 1
81+
self.queue.clear()
82+
return free_cnt
83+
84+
85+
class PooledConnection(MySQLConnection):
86+
"""
87+
Inherit from ``mysql.connector.MySQLConnection``.
88+
89+
Client should not create PooledConnection.
90+
91+
Just call :py:meth:`dbpool.ConnectionPool.borrow_connection`.
92+
93+
"""
94+
95+
def __init__(self, *args, **kwargs):
96+
self._pool = None
97+
super().__init__(*args, **kwargs)
98+
self._last_connected = time.time()
99+
100+
def set_pool(self, pool: 'ConnectionPool') -> None:
101+
self._pool = pool
102+
103+
def close(self) -> None:
104+
"""
105+
Return this connection to the pool.
106+
"""
107+
if not self._pool:
108+
super().close()
109+
else:
110+
self._pool.return_connection(self)
111+
112+
def is_max_age_expired(self) -> bool:
113+
age = time.time() - self._last_connected
114+
return age > self._pool.option.max_age_in_sec
115+
116+
def force_reconnect(self) -> None:
117+
try:
118+
self.reconnect(attempts=3, delay=0.1)
119+
self._last_connected = time.time()
120+
except Exception as e:
121+
super().close() # 如何重连失败,则放弃这个连接
122+
raise e
123+
124+
def discard_and_close(self) -> None:
125+
self._pool = None
126+
super().close()
127+
128+
def test(self) -> bool:
129+
try:
130+
self.ping(reconnect=True, attempts=3, delay=0.1)
131+
self._last_connected = time.time()
132+
return True
133+
except Exception:
134+
return False
135+
136+
137+
class ConnectionPool:
138+
#pylint: disable=too-many-instance-attributes
139+
def __init__(self, option: PoolOption, **kwargs):
140+
option.check()
141+
self._option = option
142+
self._lock = threading.RLock()
143+
self._closed = False
144+
self._idle_cnt = 0
145+
self._busy_cnt = 0
146+
self._idle = ConnectionQueue(option.max_idle)
147+
self._busy = ConnectionQueue(option.max_idle)
148+
for pname in ('pool_name', 'pool_size'):
149+
if pname in kwargs:
150+
del kwargs[pname]
151+
self._mysql_settings = kwargs
152+
# 建立最小核心连接
153+
with self._lock:
154+
count = self.option.min_idle
155+
for _ in range(count):
156+
self._idle.put(self._create_connection())
157+
self._idle_cnt += 1
158+
159+
self._check_idle_event = threading.Event()
160+
thd = threading.Thread(
161+
target=self,
162+
daemon=True,
163+
name=f'Check idle thread, with option={option}',
164+
)
165+
thd.start()
166+
self._check_idle_thread = thd
167+
168+
@property
169+
def option(self) -> PoolOption:
170+
return self._option
171+
172+
@property
173+
def idle_cnt(self) -> int:
174+
with self._lock:
175+
return self._idle_cnt
176+
177+
@property
178+
def busy_cnt(self) -> int:
179+
with self._lock:
180+
return self._busy_cnt
181+
182+
def __call__(self, *args, **kwargs):
183+
while not self._check_idle_event.is_set():
184+
try:
185+
to = self.option.check_idle_interval
186+
if not self._check_idle_event.wait(timeout=to):
187+
self.check_idle()
188+
except Exception as e:
189+
logger.error(e)
190+
191+
def is_closed(self) -> bool:
192+
with self._lock:
193+
return self._closed
194+
195+
def close(self):
196+
"""
197+
Close the :py:class:`ConnectionPool`.
198+
Free idle connections.
199+
"""
200+
with self._lock:
201+
self._closed = True
202+
self._check_idle_event.set()
203+
# 已经释放锁
204+
time.sleep(0.5)
205+
with self._lock:
206+
self._idle_cnt -= self._idle.free_all()
207+
208+
def borrow_connection(self) -> PooledConnection:
209+
"""
210+
Borrow one connection from pool.
211+
212+
Returns:
213+
PooledConnection: The available connection.
214+
215+
Raises:
216+
TestConnectionError: Test ping got error.
217+
CreateConnectionError: Create new connection failed.
218+
PoolError: When pool is closed.
219+
NoAvailableConnectionError: The pool is busy fully.
220+
221+
"""
222+
if self.is_closed():
223+
raise PoolError('ConnectionPool is closed!')
224+
try:
225+
idle_conn = self._idle.get_nowait()
226+
except queue.Empty:
227+
idle_conn = None
228+
if idle_conn:
229+
with self._lock:
230+
self._idle_cnt -= 1
231+
if idle_conn.is_max_age_expired():
232+
idle_conn.force_reconnect()
233+
if not idle_conn.test():
234+
idle_conn.discard_and_close()
235+
raise TestConnectionError()
236+
try:
237+
self._busy.put(idle_conn)
238+
with self._lock:
239+
self._busy_cnt += 1
240+
return idle_conn
241+
except queue.Full as e:
242+
logger.error(e)
243+
raise e
244+
with self._lock:
245+
total_cnt = self._idle_cnt + self._busy_cnt
246+
if total_cnt >= self.option.max_idle:
247+
raise NoAvailableConnectionError('ConnectionPool is full!')
248+
try:
249+
new_conn = self._create_connection()
250+
except Exception as e:
251+
raise CreateConnectionError() from e
252+
if not new_conn.test():
253+
new_conn.discard_and_close()
254+
raise TestConnectionError()
255+
try:
256+
self._busy.put(new_conn)
257+
self._busy_cnt += 1
258+
except queue.Full as e:
259+
logger.error(e)
260+
return new_conn
261+
262+
def _create_connection(self) -> PooledConnection:
263+
config = self._mysql_settings
264+
conn = PooledConnection(**config)
265+
conn.set_pool(self)
266+
return conn
267+
268+
def return_connection(self, conn: PooledConnection) -> None:
269+
removed_conn = self._busy.remove(conn)
270+
if not removed_conn:
271+
logger.warning('Connection has already been returned?')
272+
else: # 成功删除
273+
with self._lock:
274+
self._busy_cnt -= 1
275+
if self.is_closed():
276+
conn.discard_and_close()
277+
return
278+
if conn.is_max_age_expired():
279+
conn.force_reconnect()
280+
try:
281+
self._idle.put(conn)
282+
self._idle_cnt += 1
283+
except queue.Full as e:
284+
logger.error(e)
285+
conn.discard_and_close()
286+
287+
def check_idle(self) -> None:
288+
if self._idle.empty():
289+
return
290+
min_idle = self.option.min_idle
291+
with self._lock:
292+
free_cnt = self._idle_cnt - min_idle
293+
# if _idle queue needs to reduce ,at least free one idle conneciton
294+
remove_cnt = math.ceil(free_cnt * 0.1)
295+
if remove_cnt <= 0:
296+
return
297+
logger.info(f'Check idle on {self}, remove_cnt={remove_cnt}')
298+
while remove_cnt > 0:
299+
try:
300+
idle_conn = self._idle.get(timeout=1.0)
301+
except queue.Empty:
302+
# now _idle queue is empty
303+
# just quit check_idle
304+
break
305+
else:
306+
with self._lock:
307+
idle_conn.discard_and_close()
308+
self._idle_cnt -= 1
309+
if self._idle_cnt - min_idle <= 0:
310+
break
311+
remove_cnt -= 1
312+
313+
def __repr__(self) -> str:
314+
mysql_settings = self._mysql_settings.copy()
315+
mysql_settings['password'] = 'xxxxxxxxxxxx'
316+
with self._lock:
317+
return (f'<ConnectionPool(option={self.option},'
318+
f'mysql_settings={mysql_settings},'
319+
f'idle_cnt={self._idle_cnt},'
320+
f'idle_qsize={self._idle.qsize()},'
321+
f'busy_cnt={self._busy_cnt},'
322+
f'busy_qsize={self._busy.qsize()},'
323+
f'closed={self._closed}'
324+
f' at {hex(id(self))}>')

0 commit comments

Comments
 (0)