|
9 | 9 | from . import unittest
|
10 | 10 |
|
11 | 11 | from kafka import SimpleClient, create_message
|
12 |
| -from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError |
13 |
| -from kafka.structs import OffsetRequestPayload, ProduceRequestPayload |
| 12 | +from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError |
| 13 | +from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \ |
| 14 | + NotLeaderForPartitionError, UnknownTopicOrPartitionError, \ |
| 15 | + FailedPayloadsError |
14 | 16 | from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
|
15 | 17 |
|
16 | 18 | def kafka_versions(*versions):
|
@@ -123,11 +125,25 @@ def setUp(self):
|
123 | 125 | self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False)
|
124 | 126 | if self.client.has_metadata_for_topic(topic):
|
125 | 127 | break
|
126 |
| - except LeaderNotAvailableError: |
| 128 | + except (LeaderNotAvailableError, InvalidTopicError): |
127 | 129 | time.sleep(1)
|
128 | 130 | else:
|
129 | 131 | raise KafkaTimeoutError('Timeout loading topic metadata!')
|
130 | 132 |
|
| 133 | + |
| 134 | + # Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors |
| 135 | + # TODO: It might be a good idea to move this to self.client.ensure_topic_exists |
| 136 | + for partition in self.client.get_partition_ids_for_topic(self.topic): |
| 137 | + while True: |
| 138 | + try: |
| 139 | + req = OffsetRequestPayload(self.topic, partition, -1, 100) |
| 140 | + self.client.send_offset_request([req]) |
| 141 | + break |
| 142 | + except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e: |
| 143 | + if time.time() > timeout: |
| 144 | + raise KafkaTimeoutError('Timeout loading topic metadata!') |
| 145 | + time.sleep(.1) |
| 146 | + |
131 | 147 | self._messages = {}
|
132 | 148 |
|
133 | 149 | def tearDown(self):
|
|
0 commit comments