diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 14eee0fdc..b575664b2 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -2,6 +2,7 @@ import collections import copy +import functools import logging import time @@ -457,7 +458,7 @@ def commit_offsets_async(self, offsets, callback=None): # same order that they were added. Note also that BaseCoordinator # prevents multiple concurrent coordinator lookup requests. future = self.lookup_coordinator() - future.add_callback(self._do_commit_offsets_async, offsets, callback) + future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)()) if callback: future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e)))