2
2
from __future__ import absolute_import
3
3
4
4
from errno import EALREADY , EINPROGRESS , EISCONN , ECONNRESET
5
+ import socket
5
6
import time
6
7
7
8
import pytest
14
15
15
16
16
17
@pytest .fixture
17
- def socket (mocker ):
18
+ def _socket (mocker ):
18
19
socket = mocker .MagicMock ()
19
20
socket .connect_ex .return_value = 0
20
21
mocker .patch ('socket.socket' , return_value = socket )
21
22
return socket
22
23
23
24
24
25
@pytest .fixture
25
- def conn (socket ):
26
- from socket import AF_INET
27
- conn = BrokerConnection ('localhost' , 9092 , AF_INET )
26
+ def conn (_socket ):
27
+ conn = BrokerConnection ('localhost' , 9092 , socket .AF_INET )
28
28
return conn
29
29
30
30
@@ -38,23 +38,23 @@ def conn(socket):
38
38
([EALREADY ], ConnectionStates .CONNECTING ),
39
39
([EISCONN ], ConnectionStates .CONNECTED )),
40
40
])
41
- def test_connect (socket , conn , states ):
41
+ def test_connect (_socket , conn , states ):
42
42
assert conn .state is ConnectionStates .DISCONNECTED
43
43
44
44
for errno , state in states :
45
- socket .connect_ex .side_effect = errno
45
+ _socket .connect_ex .side_effect = errno
46
46
conn .connect ()
47
47
assert conn .state is state
48
48
49
49
50
- def test_connect_timeout (socket , conn ):
50
+ def test_connect_timeout (_socket , conn ):
51
51
assert conn .state is ConnectionStates .DISCONNECTED
52
52
53
53
# Initial connect returns EINPROGRESS
54
54
# immediate inline connect returns EALREADY
55
55
# second explicit connect returns EALREADY
56
56
# third explicit connect returns EALREADY and times out via last_attempt
57
- socket .connect_ex .side_effect = [EINPROGRESS , EALREADY , EALREADY , EALREADY ]
57
+ _socket .connect_ex .side_effect = [EINPROGRESS , EALREADY , EALREADY , EALREADY ]
58
58
conn .connect ()
59
59
assert conn .state is ConnectionStates .CONNECTING
60
60
conn .connect ()
@@ -108,15 +108,15 @@ def test_send_max_ifr(conn):
108
108
assert isinstance (f .exception , Errors .TooManyInFlightRequests )
109
109
110
110
111
- def test_send_no_response (socket , conn ):
111
+ def test_send_no_response (_socket , conn ):
112
112
conn .connect ()
113
113
assert conn .state is ConnectionStates .CONNECTED
114
114
req = MetadataRequest [0 ]([])
115
115
header = RequestHeader (req , client_id = conn .config ['client_id' ])
116
116
payload_bytes = len (header .encode ()) + len (req .encode ())
117
117
third = payload_bytes // 3
118
118
remainder = payload_bytes % 3
119
- socket .send .side_effect = [4 , third , third , third , remainder ]
119
+ _socket .send .side_effect = [4 , third , third , third , remainder ]
120
120
121
121
assert len (conn .in_flight_requests ) == 0
122
122
f = conn .send (req , expect_response = False )
@@ -125,36 +125,34 @@ def test_send_no_response(socket, conn):
125
125
assert len (conn .in_flight_requests ) == 0
126
126
127
127
128
- def test_send_response (socket , conn ):
128
+ def test_send_response (_socket , conn ):
129
129
conn .connect ()
130
130
assert conn .state is ConnectionStates .CONNECTED
131
131
req = MetadataRequest [0 ]([])
132
132
header = RequestHeader (req , client_id = conn .config ['client_id' ])
133
133
payload_bytes = len (header .encode ()) + len (req .encode ())
134
134
third = payload_bytes // 3
135
135
remainder = payload_bytes % 3
136
- socket .send .side_effect = [4 , third , third , third , remainder ]
136
+ _socket .send .side_effect = [4 , third , third , third , remainder ]
137
137
138
138
assert len (conn .in_flight_requests ) == 0
139
139
f = conn .send (req )
140
140
assert f .is_done is False
141
141
assert len (conn .in_flight_requests ) == 1
142
142
143
143
144
- def test_send_error (socket , conn ):
144
+ def test_send_error (_socket , conn ):
145
145
conn .connect ()
146
146
assert conn .state is ConnectionStates .CONNECTED
147
147
req = MetadataRequest [0 ]([])
148
- header = RequestHeader (req , client_id = conn .config ['client_id' ])
149
148
try :
150
- error = ConnectionError
149
+ _socket . send . side_effect = ConnectionError
151
150
except NameError :
152
- from socket import error
153
- socket .send .side_effect = error
151
+ _socket .send .side_effect = socket .error
154
152
f = conn .send (req )
155
153
assert f .failed () is True
156
154
assert isinstance (f .exception , Errors .ConnectionError )
157
- assert socket .close .call_count == 1
155
+ assert _socket .close .call_count == 1
158
156
assert conn .state is ConnectionStates .DISCONNECTED
159
157
160
158
@@ -167,7 +165,52 @@ def test_can_send_more(conn):
167
165
assert conn .can_send_more () is False
168
166
169
167
170
- def test_recv (socket , conn ):
168
+ def test_recv_disconnected ():
169
+ sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
170
+ sock .bind (('127.0.0.1' , 0 ))
171
+ port = sock .getsockname ()[1 ]
172
+ sock .listen (5 )
173
+
174
+ conn = BrokerConnection ('127.0.0.1' , port , socket .AF_INET )
175
+ timeout = time .time () + 1
176
+ while time .time () < timeout :
177
+ conn .connect ()
178
+ if conn .connected ():
179
+ break
180
+ else :
181
+ assert False , 'Connection attempt to local socket timed-out ?'
182
+
183
+ conn .send (MetadataRequest [0 ]([]))
184
+
185
+ # Disconnect server socket
186
+ sock .close ()
187
+
188
+ # Attempt to receive should mark connection as disconnected
189
+ assert conn .connected ()
190
+ conn .recv ()
191
+ assert conn .disconnected ()
192
+
193
+
194
+ def test_recv_disconnected_too (_socket , conn ):
195
+ conn .connect ()
196
+ assert conn .connected ()
197
+
198
+ req = MetadataRequest [0 ]([])
199
+ header = RequestHeader (req , client_id = conn .config ['client_id' ])
200
+ payload_bytes = len (header .encode ()) + len (req .encode ())
201
+ _socket .send .side_effect = [4 , payload_bytes ]
202
+ conn .send (req )
203
+
204
+ # Empty data on recv means the socket is disconnected
205
+ _socket .recv .return_value = b''
206
+
207
+ # Attempt to receive should mark connection as disconnected
208
+ assert conn .connected ()
209
+ conn .recv ()
210
+ assert conn .disconnected ()
211
+
212
+
213
+ def test_recv (_socket , conn ):
171
214
pass # TODO
172
215
173
216
0 commit comments