|
| 1 | +import socket |
| 2 | +from time import sleep |
1 | 3 | import unittest2
|
2 | 4 |
|
3 | 5 | from mock import MagicMock, patch
|
|
6 | 8 | from kafka.common import (
|
7 | 9 | ProduceRequest, BrokerMetadata, PartitionMetadata,
|
8 | 10 | TopicAndPartition, KafkaUnavailableError,
|
9 |
| - LeaderUnavailableError, PartitionUnavailableError |
| 11 | + LeaderUnavailableError, PartitionUnavailableError, |
| 12 | + ConnectionError |
10 | 13 | )
|
| 14 | +from kafka.conn import KafkaConnection |
11 | 15 | from kafka.protocol import create_message
|
12 | 16 |
|
| 17 | +from test.testutil import Timer |
| 18 | + |
13 | 19 | class TestKafkaClient(unittest2.TestCase):
|
14 | 20 | def test_init_with_list(self):
|
15 | 21 | with patch.object(KafkaClient, 'load_metadata_for_topics'):
|
@@ -242,3 +248,15 @@ def test_send_produce_request_raises_when_noleader(self, protocol, conn):
|
242 | 248 | with self.assertRaises(LeaderUnavailableError):
|
243 | 249 | client.send_produce_request(requests)
|
244 | 250 |
|
| 251 | + def test_timeout(self): |
| 252 | + def _timeout(*args, **kwargs): |
| 253 | + timeout = args[1] |
| 254 | + sleep(timeout) |
| 255 | + raise socket.timeout |
| 256 | + |
| 257 | + with patch.object(socket, "create_connection", side_effect=_timeout): |
| 258 | + |
| 259 | + with Timer() as t: |
| 260 | + with self.assertRaises(ConnectionError): |
| 261 | + KafkaConnection("nowhere", 1234, 1.0) |
| 262 | + self.assertGreaterEqual(t.interval, 1.0) |
0 commit comments