Skip to content

Commit 043e9bb

Browse files
author
Dana Powers
committed
Retry failed messages in failover integration tests; use module logger
1 parent ded2ac7 commit 043e9bb

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

test/test_failover_integration.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
import os
33
import time
44

5-
from . import unittest
6-
75
from kafka import KafkaClient, SimpleConsumer, KeyedProducer
86
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
97
from kafka.producer.base import Producer
@@ -15,6 +13,9 @@
1513
)
1614

1715

16+
log = logging.getLogger(__name__)
17+
18+
1819
class TestFailover(KafkaIntegrationTestCase):
1920
create_client = False
2021

@@ -73,12 +74,12 @@ def test_switch_leader(self):
7374
timeout = 60
7475
while not recovered and (time.time() - started) < timeout:
7576
try:
76-
logging.debug("attempting to send 'success' message after leader killed")
77+
log.debug("attempting to send 'success' message after leader killed")
7778
producer.send_messages(topic, partition, b'success')
78-
logging.debug("success!")
79+
log.debug("success!")
7980
recovered = True
8081
except (FailedPayloadsError, ConnectionError):
81-
logging.debug("caught exception sending message -- will retry")
82+
log.debug("caught exception sending message -- will retry")
8283
continue
8384

8485
# Verify we successfully sent the message
@@ -110,7 +111,7 @@ def test_switch_leader_async(self):
110111
# kill leader for partition
111112
self._kill_leader(topic, partition)
112113

113-
logging.debug("attempting to send 'success' message after leader killed")
114+
log.debug("attempting to send 'success' message after leader killed")
114115

115116
# in async mode, this should return immediately
116117
producer.send_messages(topic, partition, b'success')
@@ -164,7 +165,7 @@ def test_switch_leader_keyed_producer(self):
164165
if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0:
165166
recovered = True
166167
except (FailedPayloadsError, ConnectionError):
167-
logging.debug("caught exception sending message -- will retry")
168+
log.debug("caught exception sending message -- will retry")
168169
continue
169170

170171
# Verify we successfully sent the message
@@ -187,12 +188,16 @@ def test_switch_leader_simple_consumer(self):
187188

188189
def _send_random_messages(self, producer, topic, partition, n):
189190
for j in range(n):
190-
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
191191
msg = 'msg {0}: {1}'.format(j, random_string(10))
192-
resp = producer.send_messages(topic, partition, msg.encode('utf-8'))
193-
if len(resp) > 0:
194-
self.assertEqual(resp[0].error, 0)
195-
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
192+
log.debug('_send_random_message %s to %s:%d', msg, topic, partition)
193+
while True:
194+
try:
195+
producer.send_messages(topic, partition, msg.encode('utf-8'))
196+
except:
197+
log.exception('failure in _send_random_messages - retrying')
198+
continue
199+
else:
200+
break
196201

197202
def _kill_leader(self, topic, partition):
198203
leader = self.client.topics_to_brokers[TopicAndPartition(kafka_bytestring(topic), partition)]

0 commit comments

Comments
 (0)