Skip to content

Commit fb0b498

Browse files
committed
Add heartbeat timeout test
1 parent 047a65f commit fb0b498

File tree

1 file changed

+24
-3
lines changed

1 file changed

+24
-3
lines changed

test/test_consumer_group.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import collections
22
import logging
33
import threading
4-
import os
54
import time
65

76
import pytest
87
import six
98

10-
from kafka import SimpleClient, SimpleProducer
9+
from kafka import SimpleClient
1110
from kafka.common import TopicPartition
12-
from kafka.conn import BrokerConnection, ConnectionStates
11+
from kafka.conn import ConnectionStates
1312
from kafka.consumer.group import KafkaConsumer
13+
from kafka.future import Future
14+
from kafka.protocol.metadata import MetadataResponse
1415

1516
from test.conftest import version
1617
from test.testutil import random_string
@@ -115,3 +116,23 @@ def consumer_thread(i):
115116
finally:
116117
for c in range(num_consumers):
117118
stop[c].set()
119+
120+
121+
@pytest.fixture
122+
def conn(mocker):
123+
conn = mocker.patch('kafka.client_async.BrokerConnection')
124+
conn.return_value = conn
125+
conn.state = ConnectionStates.CONNECTED
126+
conn.send.return_value = Future().success(
127+
MetadataResponse(
128+
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
129+
[])) # topics
130+
return conn
131+
132+
133+
def test_heartbeat_timeout(conn, mocker):
134+
mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9')
135+
mocker.patch('time.time', return_value = 1234)
136+
consumer = KafkaConsumer('foobar')
137+
mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
138+
assert consumer._next_timeout() == 1234

0 commit comments

Comments
 (0)