Skip to content

Commit 975087b

Browse files
authored
Follow up to PR 1782 -- fix tests (dpkp#1914)
1 parent 87fb1bb commit 975087b

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

kafka/consumer/fetcher.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
268268

269269
start_time = time.time()
270270
remaining_ms = timeout_ms
271+
timestamps = copy.copy(timestamps)
271272
while remaining_ms > 0:
272273
if not timestamps:
273274
return {}
@@ -294,7 +295,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
294295
if refresh_future.succeeded() and isinstance(future.exception, Errors.StaleMetadata):
295296
log.debug("Stale metadata was raised, and we now have an updated metadata. Rechecking partition existance")
296297
unknown_partition = future.exception.args[0] # TopicPartition from StaleMetadata
297-
if not self._client.cluster.leader_for_partition(unknown_partition):
298+
if self._client.cluster.leader_for_partition(unknown_partition) is None:
298299
log.debug("Removed partition %s from offsets retrieval" % (unknown_partition, ))
299300
timestamps.pop(unknown_partition)
300301
else:

test/test_consumer_integration.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -766,12 +766,11 @@ def test_kafka_consumer_offsets_for_time_old(kafka_consumer, topic):
766766
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
767767
def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic):
768768
consumer = kafka_consumer_factory(fetch_max_wait_ms=200,
769-
request_timeout_ms=500)
769+
request_timeout_ms=500)
770770
tp = TopicPartition(topic, 0)
771771
bad_tp = TopicPartition(topic, 100)
772772

773773
with pytest.raises(ValueError):
774774
consumer.offsets_for_times({tp: -1})
775775

776-
with pytest.raises(KafkaTimeoutError):
777-
consumer.offsets_for_times({bad_tp: 0})
776+
assert consumer.offsets_for_times({bad_tp: 0}) == {bad_tp: None}

0 commit comments

Comments
 (0)