Skip to content

Commit 4915942

Browse files
committed
catch all errors thrown by _get_leader_for_partition in SimpleClient
1 parent ab03296 commit 4915942

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

kafka/client.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ def _payloads_by_broker(self, payloads):
169169
for payload in payloads:
170170
try:
171171
leader = self._get_leader_for_partition(payload.topic, payload.partition)
172-
except KafkaUnavailableError:
172+
except (KafkaUnavailableError, LeaderNotAvailableError,
173+
UnknownTopicOrPartitionError):
173174
leader = None
174175
payloads_by_broker[leader].append(payload)
175176
return dict(payloads_by_broker)

test/test_client.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
BrokerMetadata,
1212
TopicPartition, KafkaUnavailableError,
1313
LeaderNotAvailableError, UnknownTopicOrPartitionError,
14-
KafkaTimeoutError, ConnectionError
14+
KafkaTimeoutError, ConnectionError, FailedPayloadsError
1515
)
1616
from kafka.conn import KafkaConnection
1717
from kafka.future import Future
@@ -361,7 +361,7 @@ def test_send_produce_request_raises_when_noleader(self, protocol, conn):
361361
"topic_noleader", 0,
362362
[create_message("a"), create_message("b")])]
363363

364-
with self.assertRaises(LeaderNotAvailableError):
364+
with self.assertRaises(FailedPayloadsError):
365365
client.send_produce_request(requests)
366366

367367
@patch('kafka.SimpleClient._get_conn')
@@ -386,7 +386,7 @@ def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn):
386386
"topic_doesnt_exist", 0,
387387
[create_message("a"), create_message("b")])]
388388

389-
with self.assertRaises(UnknownTopicOrPartitionError):
389+
with self.assertRaises(FailedPayloadsError):
390390
client.send_produce_request(requests)
391391

392392
def test_timeout(self):

0 commit comments

Comments
 (0)