Skip to content

Commit 9c3af61

Browse files
author
clowwindy
committed
add statistics
1 parent e08845d commit 9c3af61

File tree

3 files changed

+45
-14
lines changed

3 files changed

+45
-14
lines changed

shadowsocks/manager.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
from shadowsocks import common, eventloop, tcprelay, udprelay, asyncdns, shell
2929

3030

31-
BUF_SIZE = 2048
31+
BUF_SIZE = 1506
32+
STAT_SEND_LIMIT = 100
3233

3334

3435
class Manager(object):
@@ -44,6 +45,7 @@ def __init__(self, config):
4445
self._statistics = collections.defaultdict(int)
4546
self._control_client_addr = None
4647
try:
48+
# TODO use address instead of port
4749
self._control_socket.bind(('127.0.0.1',
4850
int(config['manager_port'])))
4951
self._control_socket.setblocking(False)
@@ -53,6 +55,7 @@ def __init__(self, config):
5355
exit(1)
5456
self._loop.add(self._control_socket,
5557
eventloop.POLL_IN, self)
58+
self._loop.add_periodic(self.handle_periodic)
5659

5760
port_password = config['port_password']
5861
del config['port_password']
@@ -70,8 +73,10 @@ def add_port(self, config):
7073
port))
7174
return
7275
logging.info("adding server at %s:%d" % (config['server'], port))
73-
t = tcprelay.TCPRelay(config, self._dns_resolver, False)
74-
u = udprelay.UDPRelay(config, self._dns_resolver, False)
76+
t = tcprelay.TCPRelay(config, self._dns_resolver, False,
77+
self.stat_callback)
78+
u = udprelay.UDPRelay(config, self._dns_resolver, False,
79+
self.stat_callback)
7580
t.add_to_loop(self._loop)
7681
u.add_to_loop(self._loop)
7782
self._relays[port] = (t, u)
@@ -126,9 +131,27 @@ def _parse_command(self, data):
126131
logging.error(e)
127132
return None
128133

134+
def stat_callback(self, port, data_len):
135+
self._statistics[port] += data_len
136+
129137
def handle_periodic(self):
130-
# TODO send statistics
131-
pass
138+
r = {}
139+
i = 0
140+
141+
def send_data(data_dict):
142+
if data_dict:
143+
data = common.to_bytes(json.dumps(data_dict,
144+
separators=(',', ':')))
145+
self._send_control_data(b'stat: ' + data)
146+
147+
for k, v in self._statistics.items():
148+
r[k] = v
149+
i += 1
150+
if i >= STAT_SEND_LIMIT:
151+
send_data(r)
152+
r.clear()
153+
send_data(r)
154+
self._statistics.clear()
132155

133156
def _send_control_data(self, data):
134157
if self._control_client_addr:

shadowsocks/tcprelay.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,10 @@ def _get_a_server(self):
147147
logging.debug('chosen server: %s:%d', server, server_port)
148148
return server, server_port
149149

150-
def _update_activity(self):
150+
def _update_activity(self, data_len=0):
151151
# tell the TCP Relay we have activities recently
152152
# else it will think we are inactive and timed out
153-
self._server.update_activity(self)
153+
self._server.update_activity(self, data_len)
154154

155155
def _update_stream(self, stream, status):
156156
# update a stream to a new waiting status
@@ -317,7 +317,6 @@ def _handle_stage_addr(self, data):
317317
self._log_error(e)
318318
if self._config['verbose']:
319319
traceback.print_exc()
320-
# TODO use logging when debug completed
321320
self.destroy()
322321

323322
def _create_remote_socket(self, ip, port):
@@ -388,7 +387,6 @@ def _handle_dns_resolved(self, result, error):
388387
def _on_local_read(self):
389388
# handle all local read events and dispatch them to methods for
390389
# each stage
391-
self._update_activity()
392390
if not self._local_sock:
393391
return
394392
is_local = self._is_local
@@ -402,6 +400,7 @@ def _on_local_read(self):
402400
if not data:
403401
self.destroy()
404402
return
403+
self._update_activity(len(data))
405404
if not is_local:
406405
data = self._encryptor.decrypt(data)
407406
if not data:
@@ -424,17 +423,18 @@ def _on_local_read(self):
424423

425424
def _on_remote_read(self):
426425
# handle all remote read events
427-
self._update_activity()
428426
data = None
429427
try:
430428
data = self._remote_sock.recv(BUF_SIZE)
429+
431430
except (OSError, IOError) as e:
432431
if eventloop.errno_from_exception(e) in \
433432
(errno.ETIMEDOUT, errno.EAGAIN, errno.EWOULDBLOCK):
434433
return
435434
if not data:
436435
self.destroy()
437436
return
437+
self._update_activity(len(data))
438438
if self._is_local:
439439
data = self._encryptor.decrypt(data)
440440
else:
@@ -549,7 +549,7 @@ def destroy(self):
549549

550550

551551
class TCPRelay(object):
552-
def __init__(self, config, dns_resolver, is_local):
552+
def __init__(self, config, dns_resolver, is_local, stat_callback=None):
553553
self._config = config
554554
self._is_local = is_local
555555
self._dns_resolver = dns_resolver
@@ -589,6 +589,7 @@ def __init__(self, config, dns_resolver, is_local):
589589
self._config['fast_open'] = False
590590
server_socket.listen(1024)
591591
self._server_socket = server_socket
592+
self._stat_callback = stat_callback
592593

593594
def add_to_loop(self, loop):
594595
if self._eventloop:
@@ -607,7 +608,10 @@ def remove_handler(self, handler):
607608
self._timeouts[index] = None
608609
del self._handler_to_timeouts[hash(handler)]
609610

610-
def update_activity(self, handler):
611+
def update_activity(self, handler, data_len):
612+
if data_len and self._stat_callback:
613+
self._stat_callback(self._listen_port, data_len)
614+
611615
# set handler to active
612616
now = int(time.time())
613617
if now - handler.last_activity < eventloop.TIMEOUT_PRECISION:

shadowsocks/udprelay.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def client_key(source_addr, server_af):
8181

8282

8383
class UDPRelay(object):
84-
def __init__(self, config, dns_resolver, is_local):
84+
def __init__(self, config, dns_resolver, is_local, stat_callback=None):
8585
self._config = config
8686
if is_local:
8787
self._listen_addr = config['local_address']
@@ -121,6 +121,7 @@ def __init__(self, config, dns_resolver, is_local):
121121
server_socket.bind((self._listen_addr, self._listen_port))
122122
server_socket.setblocking(False)
123123
self._server_socket = server_socket
124+
self._stat_callback = stat_callback
124125

125126
def _get_a_server(self):
126127
server = self._config['server']
@@ -146,6 +147,8 @@ def _handle_server(self):
146147
data, r_addr = server.recvfrom(BUF_SIZE)
147148
if not data:
148149
logging.debug('UDP handle_server: data is empty')
150+
if self._stat_callback:
151+
self._stat_callback(self._listen_port, len(data))
149152
if self._is_local:
150153
frag = common.ord(data[2])
151154
if frag != 0:
@@ -181,7 +184,6 @@ def _handle_server(self):
181184

182185
af, socktype, proto, canonname, sa = addrs[0]
183186
key = client_key(r_addr, af)
184-
logging.debug(key)
185187
client = self._cache.get(key, None)
186188
if not client:
187189
# TODO async getaddrinfo
@@ -221,6 +223,8 @@ def _handle_client(self, sock):
221223
if not data:
222224
logging.debug('UDP handle_client: data is empty')
223225
return
226+
if self._stat_callback:
227+
self._stat_callback(self._listen_port, len(data))
224228
if not self._is_local:
225229
addrlen = len(r_addr[0])
226230
if addrlen > 255:

0 commit comments

Comments
 (0)