35
35
from shadowsocks import encrypt , eventloop , utils , common
36
36
from shadowsocks .common import parse_header
37
37
38
-
38
+ # we clear at most TIMEOUTS_CLEAN_SIZE timeouts each time
39
39
TIMEOUTS_CLEAN_SIZE = 512
40
+
41
+ # we check timeouts every TIMEOUT_PRECISION seconds
40
42
TIMEOUT_PRECISION = 4
41
43
42
44
MSG_FASTOPEN = 0x20000000
43
45
46
+ # SOCKS CMD defination
44
47
CMD_CONNECT = 1
45
48
CMD_BIND = 2
46
49
CMD_UDP_ASSOCIATE = 3
47
50
51
+ # TCP Relay can be either sslocal or ssserver
52
+ # for sslocal it is called is_local=True
53
+
54
+ # for each opening port, we have a TCP Relay
55
+ # for each connection, we have a TCP Relay Handler to handle the connection
56
+
57
+ # for each handler, we have 2 sockets:
58
+ # local: connected to the client
59
+ # remote: connected to remote server
60
+
61
+ # for each handler, we have 2 streams:
62
+ # upstream: from client to server direction
63
+ # read local and write to remote
64
+ # downstream: from server to client direction
65
+ # read remote and write to local
66
+
67
+ # for each handler, it could be at one of several stages:
68
+
48
69
# local:
49
- # stage 0 init
50
- # stage 1 hello received, hello sent
70
+ # stage 0 SOCKS hello received from local, send hello to local
71
+ # stage 1 addr received from local, query DNS for remote
51
72
# stage 2 UDP assoc
52
- # stage 3 DNS
53
- # stage 4 addr received, reply sent
54
- # stage 5 remote connected
73
+ # stage 3 DNS resolved, connect to remote
74
+ # stage 4 still connecting, more data from local received
75
+ # stage 5 remote connected, piping local and remote
55
76
56
77
# remote:
57
- # stage 0 init
58
- # stage 3 DNS
59
- # stage 4 addr received, reply sent
60
- # stage 5 remote connected
78
+ # stage 0 just jump to stage 1
79
+ # stage 1 addr received from local, query DNS for remote
80
+ # stage 3 DNS resolved, connect to remote
81
+ # stage 4 still connecting, more data from local received
82
+ # stage 5 remote connected, piping local and remote
61
83
62
84
STAGE_INIT = 0
63
- STAGE_HELLO = 1
85
+ STAGE_ADDR = 1
64
86
STAGE_UDP_ASSOC = 2
65
87
STAGE_DNS = 3
66
- STAGE_REPLY = 4
88
+ STAGE_CONNECTING = 4
67
89
STAGE_STREAM = 5
68
90
STAGE_DESTROYED = - 1
69
91
70
92
# stream direction
71
93
STREAM_UP = 0
72
94
STREAM_DOWN = 1
73
95
74
- # stream wait status
96
+ # stream wait status, indicating it's waiting for reading, etc
75
97
WAIT_STATUS_INIT = 0
76
98
WAIT_STATUS_READING = 1
77
99
WAIT_STATUS_WRITING = 2
@@ -128,9 +150,15 @@ def _get_a_server(self):
128
150
return server , server_port
129
151
130
152
def _update_activity (self ):
153
+ # tell the TCP Relay we have activities recently
154
+ # else it will think we are inactive and timed out
131
155
self ._server .update_activity (self )
132
156
133
157
def _update_stream (self , stream , status ):
158
+ # update a stream to a new waiting status
159
+
160
+ # check if status is changed
161
+ # only update if dirty
134
162
dirty = False
135
163
if stream == STREAM_DOWN :
136
164
if self ._downstream_status != status :
@@ -157,6 +185,9 @@ def _update_stream(self, stream, status):
157
185
self ._loop .modify (self ._remote_sock , event )
158
186
159
187
def _write_to_sock (self , data , sock ):
188
+ # write data to sock
189
+ # if only some of the data are written, put remaining in the buffer
190
+ # and update the stream to wait for writing
160
191
if not data or not sock :
161
192
return False
162
193
uncomplete = False
@@ -195,13 +226,16 @@ def _write_to_sock(self, data, sock):
195
226
logging .error ('write_all_to_sock:unknown socket' )
196
227
return True
197
228
198
- def _handle_stage_reply (self , data ):
229
+ def _handle_stage_connecting (self , data ):
199
230
if self ._is_local :
200
231
data = self ._encryptor .encrypt (data )
201
232
self ._data_to_write_to_remote .append (data )
202
233
if self ._is_local and not self ._fastopen_connected and \
203
234
self ._config ['fast_open' ]:
235
+ # for sslocal and fastopen, we basically wait for data and use
236
+ # sendto to connect
204
237
try :
238
+ # only connect once
205
239
self ._fastopen_connected = True
206
240
remote_sock = \
207
241
self ._create_remote_socket (self ._chosen_server [0 ],
@@ -231,7 +265,7 @@ def _handle_stage_reply(self, data):
231
265
traceback .print_exc ()
232
266
self .destroy ()
233
267
234
- def _handle_stage_hello (self , data ):
268
+ def _handle_stage_addr (self , data ):
235
269
try :
236
270
if self ._is_local :
237
271
cmd = common .ord (data [1 ])
@@ -312,19 +346,23 @@ def _handle_dns_resolved(self, result, error):
312
346
ip = result [1 ]
313
347
if ip :
314
348
try :
315
- self ._stage = STAGE_REPLY
349
+ self ._stage = STAGE_CONNECTING
316
350
remote_addr = ip
317
351
if self ._is_local :
318
352
remote_port = self ._chosen_server [1 ]
319
353
else :
320
354
remote_port = self ._remote_address [1 ]
321
355
322
356
if self ._is_local and self ._config ['fast_open' ]:
357
+ # for fastopen:
323
358
# wait for more data to arrive and send them in one SYN
324
- self ._stage = STAGE_REPLY
359
+ self ._stage = STAGE_CONNECTING
360
+ # we don't have to wait for remote since it's not
361
+ # created
325
362
self ._update_stream (STREAM_UP , WAIT_STATUS_READING )
326
363
# TODO when there is already data in this packet
327
364
else :
365
+ # else do connect
328
366
remote_sock = self ._create_remote_socket (remote_addr ,
329
367
remote_port )
330
368
try :
@@ -335,7 +373,7 @@ def _handle_dns_resolved(self, result, error):
335
373
pass
336
374
self ._loop .add (remote_sock ,
337
375
eventloop .POLL_ERR | eventloop .POLL_OUT )
338
- self ._stage = STAGE_REPLY
376
+ self ._stage = STAGE_CONNECTING
339
377
self ._update_stream (STREAM_UP , WAIT_STATUS_READWRITING )
340
378
self ._update_stream (STREAM_DOWN , WAIT_STATUS_READING )
341
379
return
@@ -346,6 +384,8 @@ def _handle_dns_resolved(self, result, error):
346
384
self .destroy ()
347
385
348
386
def _on_local_read (self ):
387
+ # handle all local read events and dispatch them to methods for
388
+ # each stage
349
389
self ._update_activity ()
350
390
if not self ._local_sock :
351
391
return
@@ -372,15 +412,16 @@ def _on_local_read(self):
372
412
elif is_local and self ._stage == STAGE_INIT :
373
413
# TODO check auth method
374
414
self ._write_to_sock (b'\x05 \00 ' , self ._local_sock )
375
- self ._stage = STAGE_HELLO
415
+ self ._stage = STAGE_ADDR
376
416
return
377
- elif self ._stage == STAGE_REPLY :
378
- self ._handle_stage_reply (data )
379
- elif (is_local and self ._stage == STAGE_HELLO ) or \
417
+ elif self ._stage == STAGE_CONNECTING :
418
+ self ._handle_stage_connecting (data )
419
+ elif (is_local and self ._stage == STAGE_ADDR ) or \
380
420
(not is_local and self ._stage == STAGE_INIT ):
381
- self ._handle_stage_hello (data )
421
+ self ._handle_stage_addr (data )
382
422
383
423
def _on_remote_read (self ):
424
+ # handle all remote read events
384
425
self ._update_activity ()
385
426
data = None
386
427
try :
@@ -406,6 +447,7 @@ def _on_remote_read(self):
406
447
self .destroy ()
407
448
408
449
def _on_local_write (self ):
450
+ # handle local writable event
409
451
if self ._data_to_write_to_local :
410
452
data = b'' .join (self ._data_to_write_to_local )
411
453
self ._data_to_write_to_local = []
@@ -414,6 +456,7 @@ def _on_local_write(self):
414
456
self ._update_stream (STREAM_DOWN , WAIT_STATUS_READING )
415
457
416
458
def _on_remote_write (self ):
459
+ # handle remote writable event
417
460
self ._stage = STAGE_STREAM
418
461
if self ._data_to_write_to_remote :
419
462
data = b'' .join (self ._data_to_write_to_remote )
@@ -435,6 +478,7 @@ def _on_remote_error(self):
435
478
self .destroy ()
436
479
437
480
def handle_event (self , sock , event ):
481
+ # handle all events in this handler and dispatch them to methods
438
482
if self ._stage == STAGE_DESTROYED :
439
483
logging .debug ('ignore handle_event: destroyed' )
440
484
return
@@ -465,7 +509,15 @@ def handle_event(self, sock, event):
465
509
logging .warn ('unknown socket' )
466
510
467
511
def destroy (self ):
512
+ # destroy the handler and release any resources
513
+ # promises:
514
+ # 1. destroy won't make another destroy() call inside
515
+ # 2. destroy releases resources so it prevents future call to destroy
516
+ # 3. destroy won't raise any exceptions
517
+ # if any of the promises are broken, it indicates a bug have been
518
+ # introduced! mostly likely memory leaks, etc
468
519
if self ._stage == STAGE_DESTROYED :
520
+ # this couldn't happen
469
521
logging .debug ('already destroyed' )
470
522
return
471
523
self ._stage = STAGE_DESTROYED
@@ -552,7 +604,7 @@ def remove_handler(self, handler):
552
604
del self ._handler_to_timeouts [hash (handler )]
553
605
554
606
def update_activity (self , handler ):
555
- """ set handler to active """
607
+ # set handler to active
556
608
now = int (time .time ())
557
609
if now - handler .last_activity < TIMEOUT_PRECISION :
558
610
# thus we can lower timeout modification frequency
@@ -601,6 +653,7 @@ def _sweep_timeout(self):
601
653
self ._timeout_offset = pos
602
654
603
655
def _handle_events (self , events ):
656
+ # handle events and dispatch to handlers
604
657
for sock , fd , event in events :
605
658
if sock :
606
659
logging .log (utils .VERBOSE_LEVEL , 'fd %d %s' , fd ,
0 commit comments