Skip to content

Commit c3aba8b

Browse files
committed
trying out an event based solution to handle connections. This ought to allow for a lot more websocket connections at once
1 parent 155a09d commit c3aba8b

File tree

3 files changed

+229
-77
lines changed

3 files changed

+229
-77
lines changed

ws4py/manager.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# -*- coding: utf-8 -*-
2+
import logging
3+
import select
4+
import threading
5+
import time
6+
7+
logger = logging.getLogger('ws4py')
8+
9+
def format_addresses(ws):
10+
me_ip, me_port = ws.local_address
11+
peer_ip, peer_port = ws.peer_address
12+
return "[Local => %s:%d | Remote => %s:%d]" % (me_ip, me_port, peer_ip, peer_port)
13+
14+
class SelectPoller(object):
15+
def __init__(self, timeout=0.1):
16+
self._fds = []
17+
self.timeout = timeout
18+
19+
def release(self):
20+
self._fds = []
21+
22+
def register(self, fd):
23+
if fd not in self._fds:
24+
self._fds.append(fd)
25+
26+
def unregister(self, fd):
27+
if fd in self._fds:
28+
self._fds.remove(fd)
29+
30+
def poll(self):
31+
r, w, x = select.select(self._fds, [], [], self.timeout)
32+
return r
33+
34+
class EPollPoller(object):
35+
def __init__(self, timeout=0.1):
36+
self.poller = select.epoll()
37+
self.timeout = timeout
38+
39+
def release(self):
40+
self.poller.close()
41+
42+
def release(self):
43+
self.poller.close()
44+
45+
def register(self, fd):
46+
try:
47+
self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI)
48+
except IOError:
49+
pass
50+
51+
def unregister(self, fd):
52+
self.poller.unregister(fd)
53+
54+
def poll(self):
55+
events = self.poller.poll(timeout=self.timeout)
56+
for fd, event in events:
57+
if event | select.EPOLLIN | select.EPOLLPRI:
58+
yield fd
59+
60+
class WebSocketManager(threading.Thread):
61+
def __init__(self, poller=None):
62+
threading.Thread.__init__(self)
63+
self.lock = threading.Lock()
64+
self.websockets = {}
65+
66+
if hasattr(select, "epoll"):
67+
self.poller = EPollPoller()
68+
logger.info("Using epoll")
69+
else:
70+
self.poller = SelectPoller()
71+
logger.info("Using select as epoll is not available")
72+
73+
def __len__(self):
74+
return len(self.websockets)
75+
76+
def add(self, websocket):
77+
logger.info("Managing websocket %s" % format_addresses(websocket))
78+
websocket.opened()
79+
with self.lock:
80+
fd = websocket.sock.fileno()
81+
self.websockets[fd] = websocket
82+
self.poller.register(fd)
83+
84+
def remove(self, websocket):
85+
logger.info("Removing websocket %s" % format_addresses(websocket))
86+
with self.lock:
87+
fd = websocket.sock.fileno()
88+
self.websockets.pop(fd)
89+
self.poller.unregister(fd)
90+
91+
def stop(self):
92+
self.running = False
93+
with self.lock:
94+
self.websockets.clear()
95+
self.poller.release()
96+
97+
def run(self):
98+
self.running = True
99+
while self.running:
100+
with self.lock:
101+
polled = self.poller.poll()
102+
103+
if not self.running:
104+
break
105+
106+
for fd in polled:
107+
if not self.running:
108+
break
109+
110+
ws = self.websockets.get(fd)
111+
112+
if ws:
113+
if not ws.terminated and not ws.once():
114+
with self.lock:
115+
fd = ws.sock.fileno()
116+
self.websockets.pop(fd)
117+
self.poller.unregister(fd)
118+
119+
if not ws.terminated:
120+
logger.info("Terminating websocket %s" % format_addresses(ws))
121+
ws.terminate()
122+
123+
def close_all(self, code=1001, message='Server is shutting down'):
124+
with self.lock:
125+
logger.info("Closing all websockets with [%d] '%s'" % (code, message))
126+
for ws in self.websockets.itervalues():
127+
ws.close(code=1001, reason=message)
128+
129+
def broadcast(self, message, binary=False):
130+
with self.lock:
131+
websockets = self.websockets.copy()
132+
133+
for ws in websockets.itervalues():
134+
if not ws.terminated:
135+
try:
136+
ws.send(message, binary)
137+
except Exception, ex:
138+
pass

ws4py/server/cherrypyserver.py

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def ws(self):
7777
from ws4py.exc import HandshakeError
7878
from ws4py.websocket import WebSocket
7979
from ws4py.compat import py3k, enc, dec, get_connection, detach_connection
80+
from ws4py.manager import WebSocketManager
8081

8182
__all__ = ['WebSocketTool', 'WebSocketPlugin']
8283

@@ -280,18 +281,17 @@ def _set_internal_flags(self):
280281
class WebSocketPlugin(plugins.SimplePlugin):
281282
def __init__(self, bus):
282283
plugins.SimplePlugin.__init__(self, bus)
283-
self.pool = {}
284+
self.manager = WebSocketManager()
284285

285286
def start(self):
286287
cherrypy.log("Starting WebSocket processing")
287-
self.bus.subscribe('main', self.monitor)
288288
self.bus.subscribe('stop', self.cleanup)
289289
self.bus.subscribe('handle-websocket', self.handle)
290290
self.bus.subscribe('websocket-broadcast', self.broadcast)
291+
self.manager.start()
291292

292293
def stop(self):
293294
cherrypy.log("Terminating WebSocket processing")
294-
self.bus.unsubscribe('main', self.monitor)
295295
self.bus.unsubscribe('stop', self.cleanup)
296296
self.bus.unsubscribe('handle-websocket', self.handle)
297297
self.bus.unsubscribe('websocket-broadcast', self.broadcast)
@@ -303,39 +303,15 @@ def handle(self, ws_handler, peer_addr):
303303
:param ws_handler: websocket handler instance
304304
:param peer_addr: remote peer address for tracing purpose
305305
"""
306-
cherrypy.log("Managing WebSocket connection from %s:%d" % (peer_addr[0], peer_addr[1]))
307-
th = threading.Thread(target=ws_handler.run, name="WebSocket client at %s:%d" % (peer_addr[0], peer_addr[1]))
308-
th.daemon = True
309-
self.pool[ws_handler] = (th, peer_addr)
310-
th.start()
311-
312-
def monitor(self):
313-
"""
314-
Called within the engine's mainloop to drop connections
315-
that have terminated since last iteration.
316-
"""
317-
if py3k:
318-
handlers = list(self.pool.keys())
319-
else:
320-
handlers = self.pool.keys()[:]
321-
322-
for handler in handlers:
323-
if handler.terminated:
324-
th, addr = self.pool[handler]
325-
cherrypy.log("Removing WebSocket connection %s:%d" % (addr[0], addr[1]))
326-
th.join()
327-
del self.pool[handler]
306+
self.manager.add(ws_handler)
328307

329308
def cleanup(self):
330309
"""
331310
Terminate all connections and clear the pool. Executed when the engine stops.
332311
"""
333-
cherrypy.log("Closing %d WebSocket connections" % len(self.pool))
334-
for handler in self.pool:
335-
handler.close(code=1001, reason='Server is shutting down')
336-
th, addr = self.pool[handler]
337-
th.join()
338-
self.pool.clear()
312+
self.manager.close_all()
313+
self.manager.stop()
314+
self.manager.join()
339315

340316
def broadcast(self, message, binary=False):
341317
"""
@@ -346,12 +322,7 @@ def broadcast(self, message, binary=False):
346322
of the connected handler.
347323
:param binary: whether or not the message is a binary one
348324
"""
349-
for ws_handler in self.pool:
350-
try:
351-
if not ws_handler.terminated:
352-
ws_handler.send(message, binary)
353-
except:
354-
cherrypy.log(traceback=True)
325+
self.manager.broadcast(message, binary)
355326

356327
if __name__ == '__main__':
357328
import random

0 commit comments

Comments
 (0)