Skip to content

Commit c54a2ed

Browse files
committed
Add more cleanup in consumer.stop()
1 parent c13ee1d commit c54a2ed

File tree

1 file changed

+7
-5
lines changed

1 file changed

+7
-5
lines changed

kafka/consumer.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ def _auto_commit(self):
159159
if self.count_since_commit > self.auto_commit_every_n:
160160
self.commit()
161161

162+
def stop(self):
163+
if self.commit_timer is not None:
164+
self.commit_timer.stop()
165+
self.commit()
166+
162167
def pending(self, partitions=None):
163168
"""
164169
Gets the pending message count
@@ -226,11 +231,6 @@ def provide_partition_info(self):
226231
"""
227232
self.partition_info = True
228233

229-
def stop(self):
230-
if self.commit_timer is not None:
231-
self.commit_timer.stop()
232-
self.commit()
233-
234234
def seek(self, offset, whence):
235235
"""
236236
Alter the current offset in the consumer, similar to fseek
@@ -510,6 +510,8 @@ def stop(self):
510510
proc.join()
511511
proc.terminate()
512512

513+
super(MultiProcessConsumer, self).stop()
514+
513515
def __iter__(self):
514516
"""
515517
Iterator to consume the messages available on this consumer

0 commit comments

Comments
 (0)