Skip to content

Commit 3b26839

Browse files
committed
Add BrokerConnection.disconnected() method; update tests
1 parent 8b592cd commit 3b26839

File tree

3 files changed

+16
-8
lines changed

3 files changed

+16
-8
lines changed

kafka/conn.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ def connecting(self):
139139
"""Return True iff socket is in intermediate connecting state."""
140140
return self.state is ConnectionStates.CONNECTING
141141

142+
def disconnected(self):
143+
"""Return True iff socket is closed"""
144+
return self.state is ConnectionStates.DISCONNECTED
145+
142146
def close(self, error=None):
143147
"""Close socket and fail all in-flight-requests.
144148

test/conftest.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ def conn(mocker):
4646
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
4747
[])) # topics
4848
conn.blacked_out.return_value = False
49+
def _set_conn_state(state):
50+
conn.state = state
51+
return state
52+
conn._set_conn_state = _set_conn_state
4953
conn.connect.side_effect = lambda: conn.state
50-
conn.connecting = lambda: conn.connect() is ConnectionStates.CONNECTING
51-
conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED
54+
conn.connecting = lambda: conn.state is ConnectionStates.CONNECTING
55+
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
56+
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
5257
return conn

test/test_client_async.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,20 @@ def test_maybe_connect(conn):
8585

8686
assert 0 not in cli._conns
8787
conn.state = ConnectionStates.DISCONNECTED
88-
conn.connect.side_effect = lambda: ConnectionStates.CONNECTING
88+
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
8989
assert cli._maybe_connect(0) is False
9090
assert cli._conns[0] is conn
9191
assert 0 in cli._connecting
9292

93-
conn.state = ConnectionStates.CONNECTING
94-
conn.connect.side_effect = lambda: ConnectionStates.CONNECTED
93+
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTED)
9594
assert cli._maybe_connect(0) is True
9695
assert 0 not in cli._connecting
9796

9897
# Failure to connect should trigger metadata update
9998
assert cli.cluster._need_update is False
100-
cli._connecting.add(0)
10199
conn.state = ConnectionStates.CONNECTING
102-
conn.connect.side_effect = lambda: ConnectionStates.DISCONNECTED
100+
cli._connecting.add(0)
101+
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.DISCONNECTED)
103102
assert cli._maybe_connect(0) is False
104103
assert 0 not in cli._connecting
105104
assert cli.cluster._need_update is True
@@ -155,7 +154,7 @@ def test_ready(conn):
155154
# connecting node connects
156155
cli._connecting.add(0)
157156
conn.state = ConnectionStates.CONNECTING
158-
conn.connect.side_effect = lambda: ConnectionStates.CONNECTED
157+
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTED)
159158
cli.ready(0)
160159
assert 0 not in cli._connecting
161160
assert cli._conns[0].connect.called_with()

0 commit comments

Comments
 (0)