Skip to content

Commit fb279d7

Browse files
asdaraujojeffwidman
authored andcommitted
Fixes racing condition when message is sent to broker before topic logs are created
1 parent a1869c4 commit fb279d7

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

test/testutil.py

+19-3
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
from . import unittest
1010

1111
from kafka import SimpleClient, create_message
12-
from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError
13-
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
12+
from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError
13+
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \
14+
NotLeaderForPartitionError, UnknownTopicOrPartitionError, \
15+
FailedPayloadsError
1416
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
1517

1618
def kafka_versions(*versions):
@@ -123,11 +125,25 @@ def setUp(self):
123125
self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False)
124126
if self.client.has_metadata_for_topic(topic):
125127
break
126-
except LeaderNotAvailableError:
128+
except (LeaderNotAvailableError, InvalidTopicError):
127129
time.sleep(1)
128130
else:
129131
raise KafkaTimeoutError('Timeout loading topic metadata!')
130132

133+
134+
# Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors
135+
# TODO: It might be a good idea to move this to self.client.ensure_topic_exists
136+
for partition in self.client.get_partition_ids_for_topic(self.topic):
137+
while True:
138+
try:
139+
req = OffsetRequestPayload(self.topic, partition, -1, 100)
140+
self.client.send_offset_request([req])
141+
break
142+
except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e:
143+
if time.time() > timeout:
144+
raise KafkaTimeoutError('Timeout loading topic metadata!')
145+
time.sleep(.1)
146+
131147
self._messages = {}
132148

133149
def tearDown(self):

0 commit comments

Comments
 (0)