2
2
import os
3
3
import time
4
4
5
- from . import unittest
6
-
7
5
from kafka import KafkaClient , SimpleConsumer , KeyedProducer
8
6
from kafka .common import TopicAndPartition , FailedPayloadsError , ConnectionError
9
7
from kafka .producer .base import Producer
15
13
)
16
14
17
15
16
+ log = logging .getLogger (__name__ )
17
+
18
+
18
19
class TestFailover (KafkaIntegrationTestCase ):
19
20
create_client = False
20
21
@@ -73,12 +74,12 @@ def test_switch_leader(self):
73
74
timeout = 60
74
75
while not recovered and (time .time () - started ) < timeout :
75
76
try :
76
- logging .debug ("attempting to send 'success' message after leader killed" )
77
+ log .debug ("attempting to send 'success' message after leader killed" )
77
78
producer .send_messages (topic , partition , b'success' )
78
- logging .debug ("success!" )
79
+ log .debug ("success!" )
79
80
recovered = True
80
81
except (FailedPayloadsError , ConnectionError ):
81
- logging .debug ("caught exception sending message -- will retry" )
82
+ log .debug ("caught exception sending message -- will retry" )
82
83
continue
83
84
84
85
# Verify we successfully sent the message
@@ -110,7 +111,7 @@ def test_switch_leader_async(self):
110
111
# kill leader for partition
111
112
self ._kill_leader (topic , partition )
112
113
113
- logging .debug ("attempting to send 'success' message after leader killed" )
114
+ log .debug ("attempting to send 'success' message after leader killed" )
114
115
115
116
# in async mode, this should return immediately
116
117
producer .send_messages (topic , partition , b'success' )
@@ -164,7 +165,7 @@ def test_switch_leader_keyed_producer(self):
164
165
if producer .partitioners [kafka_bytestring (topic )].partition (key ) == 0 :
165
166
recovered = True
166
167
except (FailedPayloadsError , ConnectionError ):
167
- logging .debug ("caught exception sending message -- will retry" )
168
+ log .debug ("caught exception sending message -- will retry" )
168
169
continue
169
170
170
171
# Verify we successfully sent the message
@@ -187,12 +188,16 @@ def test_switch_leader_simple_consumer(self):
187
188
188
189
def _send_random_messages (self , producer , topic , partition , n ):
189
190
for j in range (n ):
190
- logging .debug ('_send_random_message to %s:%d -- try %d' , topic , partition , j )
191
191
msg = 'msg {0}: {1}' .format (j , random_string (10 ))
192
- resp = producer .send_messages (topic , partition , msg .encode ('utf-8' ))
193
- if len (resp ) > 0 :
194
- self .assertEqual (resp [0 ].error , 0 )
195
- logging .debug ('_send_random_message to %s:%d -- try %d success' , topic , partition , j )
192
+ log .debug ('_send_random_message %s to %s:%d' , msg , topic , partition )
193
+ while True :
194
+ try :
195
+ producer .send_messages (topic , partition , msg .encode ('utf-8' ))
196
+ except :
197
+ log .exception ('failure in _send_random_messages - retrying' )
198
+ continue
199
+ else :
200
+ break
196
201
197
202
def _kill_leader (self , topic , partition ):
198
203
leader = self .client .topics_to_brokers [TopicAndPartition (kafka_bytestring (topic ), partition )]
0 commit comments