Skip to content

Commit 5682ff6

Browse files
author
Dana Powers
committed
Producer.stop() now blocks until async thread completes (drop confusing timeout arg)
1 parent 2916bb8 commit 5682ff6

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

kafka/producer/base.py

+12-7
Original file line numberDiff line numberDiff line change
@@ -415,17 +415,22 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
415415
raise
416416
return resp
417417

418-
def stop(self, timeout=1):
418+
def stop(self):
419419
"""
420-
Stop the producer. Optionally wait for the specified timeout before
421-
forcefully cleaning up.
420+
Stop the producer (async mode). Blocks until async thread completes.
422421
"""
422+
if not self.async:
423+
log.warning("producer.stop() called, but producer is not async")
424+
return
425+
426+
if self.stopped:
427+
log.warning("producer.stop() called, but producer is already stopped")
428+
return
429+
423430
if self.async:
424431
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
425-
self.thread.join(timeout)
426-
427-
if self.thread.is_alive():
428-
self.thread_stop_event.set()
432+
self.thread_stop_event.set()
433+
self.thread.join()
429434

430435
if hasattr(self, '_cleanup_func'):
431436
# Remove cleanup handler now that we've stopped

0 commit comments

Comments
 (0)