@@ -79,8 +79,10 @@ def kafka_run_class_env(self):
79
79
80
80
@classmethod
81
81
def render_template (cls , source_file , target_file , binding ):
82
+ log .info ('Rendering %s from template %s' , target_file , source_file )
82
83
with open (source_file , "r" ) as handle :
83
84
template = handle .read ()
85
+ assert len (template ) > 0 , 'Empty template %s' % source_file
84
86
with open (target_file , "w" ) as handle :
85
87
handle .write (template .format (** binding ))
86
88
handle .flush ()
@@ -139,22 +141,22 @@ def open(self):
139
141
env = self .kafka_run_class_env ()
140
142
141
143
# Party!
142
- self .out ("Starting..." )
143
144
timeout = 5
144
145
max_timeout = 30
145
146
backoff = 1
146
147
end_at = time .time () + max_timeout
148
+ tries = 1
147
149
while time .time () < end_at :
148
- log . critical ( 'Starting Zookeeper instance' )
150
+ self . out ( 'Attempting to start (try #%d)' % tries )
149
151
self .child = SpawnedService (args , env )
150
152
self .child .start ()
151
153
timeout = min (timeout , max (end_at - time .time (), 0 ))
152
154
if self .child .wait_for (r"binding to port" , timeout = timeout ):
153
155
break
154
- log .critical ('Zookeeper did not start within timeout %s secs' , timeout )
155
156
self .child .stop ()
156
157
timeout *= 2
157
158
time .sleep (backoff )
159
+ tries += 1
158
160
else :
159
161
raise Exception ('Failed to start Zookeeper before max_timeout' )
160
162
self .out ("Done!" )
@@ -260,8 +262,6 @@ def open(self):
260
262
raise RuntimeError ("Failed to create Zookeeper chroot node" )
261
263
self .out ("Done!" )
262
264
263
- self .out ("Starting..." )
264
-
265
265
# Configure Kafka child process
266
266
args = self .kafka_run_class_args ("kafka.Kafka" , properties )
267
267
env = self .kafka_run_class_env ()
@@ -270,18 +270,19 @@ def open(self):
270
270
max_timeout = 30
271
271
backoff = 1
272
272
end_at = time .time () + max_timeout
273
+ tries = 1
273
274
while time .time () < end_at :
274
- log . critical ( 'Starting Kafka instance' )
275
+ self . out ( 'Attempting to start (try #%d)' % tries )
275
276
self .child = SpawnedService (args , env )
276
277
self .child .start ()
277
278
timeout = min (timeout , max (end_at - time .time (), 0 ))
278
279
if self .child .wait_for (r"\[Kafka Server %d\], Started" %
279
280
self .broker_id , timeout = timeout ):
280
281
break
281
- log .critical ('Kafka did not start within timeout %s secs' , timeout )
282
282
self .child .stop ()
283
283
timeout *= 2
284
284
time .sleep (backoff )
285
+ tries += 1
285
286
else :
286
287
raise Exception ('Failed to start KafkaInstance before max_timeout' )
287
288
self .out ("Done!" )
0 commit comments