1
1
from __future__ import absolute_import , division , with_statement
2
2
from tornado import netutil
3
3
from tornado .ioloop import IOLoop
4
- from tornado .iostream import IOStream
5
- from tornado .testing import AsyncHTTPTestCase , LogTrapTestCase , get_unused_port
4
+ from tornado .iostream import IOStream , SSLIOStream
5
+ from tornado .testing import AsyncHTTPTestCase , AsyncHTTPSTestCase , AsyncTestCase , LogTrapTestCase , get_unused_port
6
6
from tornado .util import b
7
7
from tornado .web import RequestHandler , Application
8
8
import errno
9
+ import os
10
+ import platform
9
11
import socket
10
12
import sys
11
13
import time
12
14
15
+ try :
16
+ import ssl
17
+ except ImportError :
18
+ ssl = None
19
+
13
20
14
21
class HelloHandler (RequestHandler ):
15
22
def get (self ):
16
23
self .write ("Hello" )
17
24
18
25
19
- class TestIOStream (AsyncHTTPTestCase , LogTrapTestCase ):
26
+ class TestIOStreamWebMixin (object ):
27
+ def _make_client_iostream (self ):
28
+ raise NotImplementedError ()
29
+
20
30
def get_app (self ):
21
31
return Application ([('/' , HelloHandler )])
22
32
23
- def make_iostream_pair (self , ** kwargs ):
24
- port = get_unused_port ()
25
- [listener ] = netutil .bind_sockets (port , '127.0.0.1' ,
26
- family = socket .AF_INET )
27
- streams = [None , None ]
33
+ def test_connection_closed (self ):
34
+ # When a server sends a response and then closes the connection,
35
+ # the client must be allowed to read the data before the IOStream
36
+ # closes itself. Epoll reports closed connections with a separate
37
+ # EPOLLRDHUP event delivered at the same time as the read event,
38
+ # while kqueue reports them as a second read/write event with an EOF
39
+ # flag.
40
+ response = self .fetch ("/" , headers = {"Connection" : "close" })
41
+ response .rethrow ()
28
42
29
- def accept_callback (connection , address ):
30
- streams [0 ] = IOStream (connection , io_loop = self .io_loop , ** kwargs )
31
- self .stop ()
43
+ def test_read_until_close (self ):
44
+ stream = self ._make_client_iostream ()
45
+ stream .connect (('localhost' , self .get_http_port ()), callback = self .stop )
46
+ self .wait ()
47
+ stream .write (b ("GET / HTTP/1.0\r \n \r \n " ))
32
48
33
- def connect_callback ():
34
- streams [1 ] = client_stream
35
- self .stop ()
36
- netutil .add_accept_handler (listener , accept_callback ,
37
- io_loop = self .io_loop )
38
- client_stream = IOStream (socket .socket (), io_loop = self .io_loop ,
39
- ** kwargs )
40
- client_stream .connect (('127.0.0.1' , port ),
41
- callback = connect_callback )
42
- self .wait (condition = lambda : all (streams ))
43
- self .io_loop .remove_handler (listener .fileno ())
44
- listener .close ()
45
- return streams
49
+ stream .read_until_close (self .stop )
50
+ data = self .wait ()
51
+ self .assertTrue (data .startswith (b ("HTTP/1.0 200" )))
52
+ self .assertTrue (data .endswith (b ("Hello" )))
46
53
47
54
def test_read_zero_bytes (self ):
48
- s = socket .socket (socket .AF_INET , socket .SOCK_STREAM , 0 )
49
- s .connect (("localhost" , self .get_http_port ()))
50
- self .stream = IOStream (s , io_loop = self .io_loop )
55
+ self .stream = self ._make_client_iostream ()
56
+ self .stream .connect (("localhost" , self .get_http_port ()),
57
+ callback = self .stop )
58
+ self .wait ()
51
59
self .stream .write (b ("GET / HTTP/1.0\r \n \r \n " ))
52
60
53
61
# normal read
@@ -65,7 +73,48 @@ def test_read_zero_bytes(self):
65
73
data = self .wait ()
66
74
self .assertEqual (data , b ("200" ))
67
75
68
- s .close ()
76
+ self .stream .close ()
77
+
78
+
79
+ class TestIOStreamMixin (object ):
80
+ def _make_server_iostream (self , connection , ** kwargs ):
81
+ raise NotImplementedError ()
82
+
83
+ def _make_client_iostream (self , connection ,** kwargs ):
84
+ raise NotImplementedError ()
85
+
86
+ def make_iostream_pair (self , ** kwargs ):
87
+ port = get_unused_port ()
88
+ [listener ] = netutil .bind_sockets (port , '127.0.0.1' ,
89
+ family = socket .AF_INET )
90
+ streams = [None , None ]
91
+
92
+ def accept_callback (connection , address ):
93
+ streams [0 ] = self ._make_server_iostream (connection , ** kwargs )
94
+ if isinstance (streams [0 ], SSLIOStream ):
95
+ # HACK: The SSL handshake won't complete (and
96
+ # therefore the client connect callback won't be
97
+ # run)until the server side has tried to do something
98
+ # with the connection. For these tests we want both
99
+ # sides to connect before we do anything else with the
100
+ # connection, so we must cause some dummy activity on the
101
+ # server. If this turns out to be useful for real apps
102
+ # it should have a cleaner interface.
103
+ streams [0 ]._add_io_state (IOLoop .READ )
104
+ self .stop ()
105
+
106
+ def connect_callback ():
107
+ streams [1 ] = client_stream
108
+ self .stop ()
109
+ netutil .add_accept_handler (listener , accept_callback ,
110
+ io_loop = self .io_loop )
111
+ client_stream = self ._make_client_iostream (socket .socket (), ** kwargs )
112
+ client_stream .connect (('127.0.0.1' , port ),
113
+ callback = connect_callback )
114
+ self .wait (condition = lambda : all (streams ))
115
+ self .io_loop .remove_handler (listener .fileno ())
116
+ listener .close ()
117
+ return streams
69
118
70
119
def test_write_zero_bytes (self ):
71
120
# Attempting to write zero bytes should run the callback without
@@ -110,27 +159,6 @@ def test_gaierror(self):
110
159
stream .connect (('an invalid domain' , 54321 ))
111
160
self .assertTrue (isinstance (stream .error , socket .gaierror ), stream .error )
112
161
113
- def test_connection_closed (self ):
114
- # When a server sends a response and then closes the connection,
115
- # the client must be allowed to read the data before the IOStream
116
- # closes itself. Epoll reports closed connections with a separate
117
- # EPOLLRDHUP event delivered at the same time as the read event,
118
- # while kqueue reports them as a second read/write event with an EOF
119
- # flag.
120
- response = self .fetch ("/" , headers = {"Connection" : "close" })
121
- response .rethrow ()
122
-
123
- def test_read_until_close (self ):
124
- s = socket .socket (socket .AF_INET , socket .SOCK_STREAM , 0 )
125
- s .connect (("localhost" , self .get_http_port ()))
126
- stream = IOStream (s , io_loop = self .io_loop )
127
- stream .write (b ("GET / HTTP/1.0\r \n \r \n " ))
128
-
129
- stream .read_until_close (self .stop )
130
- data = self .wait ()
131
- self .assertTrue (data .startswith (b ("HTTP/1.0 200" )))
132
- self .assertTrue (data .endswith (b ("Hello" )))
133
-
134
162
def test_streaming_callback (self ):
135
163
server , client = self .make_iostream_pair ()
136
164
try :
@@ -241,6 +269,17 @@ def test_large_read_until(self):
241
269
# seconds.
242
270
server , client = self .make_iostream_pair ()
243
271
try :
272
+ try :
273
+ # This test fails on pypy with ssl. I think it's because
274
+ # pypy's gc defeats moves objects, breaking the
275
+ # "frozen write buffer" assumption.
276
+ if (isinstance (server , SSLIOStream ) and
277
+ platform .python_implementation () == 'PyPy' ):
278
+ return
279
+ except AttributeError :
280
+ # python 2.5 didn't have platform.python_implementation,
281
+ # but there was no pypy for 2.5
282
+ pass
244
283
NUM_KB = 4096
245
284
for i in xrange (NUM_KB ):
246
285
client .write (b ("A" ) * 1024 )
@@ -275,3 +314,39 @@ def test_close_callback_with_pending_read(self):
275
314
finally :
276
315
server .close ()
277
316
client .close ()
317
+
318
+ class TestIOStreamWebHTTP (TestIOStreamWebMixin , AsyncHTTPTestCase ,
319
+ LogTrapTestCase ):
320
+ def _make_client_iostream (self ):
321
+ return IOStream (socket .socket (), io_loop = self .io_loop )
322
+
323
+ class TestIOStreamWebHTTPS (TestIOStreamWebMixin , AsyncHTTPSTestCase ,
324
+ LogTrapTestCase ):
325
+ def _make_client_iostream (self ):
326
+ return SSLIOStream (socket .socket (), io_loop = self .io_loop )
327
+
328
+ class TestIOStream (TestIOStreamMixin , AsyncTestCase , LogTrapTestCase ):
329
+ def _make_server_iostream (self , connection , ** kwargs ):
330
+ return IOStream (connection , io_loop = self .io_loop , ** kwargs )
331
+
332
+ def _make_client_iostream (self , connection , ** kwargs ):
333
+ return IOStream (connection , io_loop = self .io_loop , ** kwargs )
334
+
335
+ class TestIOStreamSSL (TestIOStreamMixin , AsyncTestCase , LogTrapTestCase ):
336
+ def _make_server_iostream (self , connection , ** kwargs ):
337
+ ssl_options = dict (
338
+ certfile = os .path .join (os .path .dirname (__file__ ), 'test.crt' ),
339
+ keyfile = os .path .join (os .path .dirname (__file__ ), 'test.key' ),
340
+ )
341
+ connection = ssl .wrap_socket (connection ,
342
+ server_side = True ,
343
+ do_handshake_on_connect = False ,
344
+ ** ssl_options )
345
+ return SSLIOStream (connection , io_loop = self .io_loop , ** kwargs )
346
+
347
+ def _make_client_iostream (self , connection , ** kwargs ):
348
+ return SSLIOStream (connection , io_loop = self .io_loop , ** kwargs )
349
+
350
+ if ssl is None :
351
+ del TestIOStreamWebHTTPS
352
+ del TestIOStreamSSL
0 commit comments