Skip to content

Commit 9856cc3

Browse files
committed
Merge pull request dpkp#222 from dpkp/fix_socket_timeout_test
Fix socket timeout test -- mock the side_effect
2 parents eddd143 + 1954f38 commit 9856cc3

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

test/test_client.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import socket
2+
from time import sleep
13
import unittest2
24

35
from mock import MagicMock, patch
@@ -6,10 +8,14 @@
68
from kafka.common import (
79
ProduceRequest, BrokerMetadata, PartitionMetadata,
810
TopicAndPartition, KafkaUnavailableError,
9-
LeaderUnavailableError, PartitionUnavailableError
11+
LeaderUnavailableError, PartitionUnavailableError,
12+
ConnectionError
1013
)
14+
from kafka.conn import KafkaConnection
1115
from kafka.protocol import create_message
1216

17+
from test.testutil import Timer
18+
1319
class TestKafkaClient(unittest2.TestCase):
1420
def test_init_with_list(self):
1521
with patch.object(KafkaClient, 'load_metadata_for_topics'):
@@ -242,3 +248,15 @@ def test_send_produce_request_raises_when_noleader(self, protocol, conn):
242248
with self.assertRaises(LeaderUnavailableError):
243249
client.send_produce_request(requests)
244250

251+
def test_timeout(self):
252+
def _timeout(*args, **kwargs):
253+
timeout = args[1]
254+
sleep(timeout)
255+
raise socket.timeout
256+
257+
with patch.object(socket, "create_connection", side_effect=_timeout):
258+
259+
with Timer() as t:
260+
with self.assertRaises(ConnectionError):
261+
KafkaConnection("nowhere", 1234, 1.0)
262+
self.assertGreaterEqual(t.interval, 1.0)

test/test_client_integration.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,6 @@ def tearDownClass(cls): # noqa
2424
cls.server.close()
2525
cls.zk.close()
2626

27-
@unittest2.skip("This doesn't appear to work on Linux?")
28-
def test_timeout(self):
29-
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
30-
server_port = get_open_port()
31-
server_socket.bind(('localhost', server_port))
32-
33-
with Timer() as t:
34-
with self.assertRaises((socket.timeout, socket.error)):
35-
kafka.conn.KafkaConnection("localhost", server_port, 1.0)
36-
self.assertGreaterEqual(t.interval, 1.0)
37-
3827
@kafka_versions("all")
3928
def test_consume_none(self):
4029
fetch = FetchRequest(self.topic, 0, 0, 1024)

0 commit comments

Comments
 (0)