|
1 | 1 | import collections
|
2 | 2 | import logging
|
3 | 3 | import threading
|
4 |
| -import os |
5 | 4 | import time
|
6 | 5 |
|
7 | 6 | import pytest
|
8 | 7 | import six
|
9 | 8 |
|
10 |
| -from kafka import SimpleClient, SimpleProducer |
| 9 | +from kafka import SimpleClient |
11 | 10 | from kafka.common import TopicPartition
|
12 |
| -from kafka.conn import BrokerConnection, ConnectionStates |
| 11 | +from kafka.conn import ConnectionStates |
13 | 12 | from kafka.consumer.group import KafkaConsumer
|
| 13 | +from kafka.future import Future |
| 14 | +from kafka.protocol.metadata import MetadataResponse |
14 | 15 |
|
15 | 16 | from test.conftest import version
|
16 | 17 | from test.testutil import random_string
|
@@ -115,3 +116,23 @@ def consumer_thread(i):
|
115 | 116 | finally:
|
116 | 117 | for c in range(num_consumers):
|
117 | 118 | stop[c].set()
|
| 119 | + |
| 120 | + |
| 121 | +@pytest.fixture |
| 122 | +def conn(mocker): |
| 123 | + conn = mocker.patch('kafka.client_async.BrokerConnection') |
| 124 | + conn.return_value = conn |
| 125 | + conn.state = ConnectionStates.CONNECTED |
| 126 | + conn.send.return_value = Future().success( |
| 127 | + MetadataResponse( |
| 128 | + [(0, 'foo', 12), (1, 'bar', 34)], # brokers |
| 129 | + [])) # topics |
| 130 | + return conn |
| 131 | + |
| 132 | + |
| 133 | +def test_heartbeat_timeout(conn, mocker): |
| 134 | + mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9') |
| 135 | + mocker.patch('time.time', return_value = 1234) |
| 136 | + consumer = KafkaConsumer('foobar') |
| 137 | + mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0) |
| 138 | + assert consumer._next_timeout() == 1234 |
0 commit comments