From d2fe82d3e0317a5349142f1160a14ab36059ea5d Mon Sep 17 00:00:00 2001 From: Asaf Flescher Date: Mon, 11 Feb 2019 13:29:16 +0200 Subject: [PATCH 1/2] When doing commit_offset_async after lookup_coordinator, need to wrap the actual call in a partial application to ignore the result from lookup_coordinator --- kafka/coordinator/consumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 14eee0fdc..e0f92ad95 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -2,6 +2,7 @@ import collections import copy +import functools import logging import time @@ -457,7 +458,7 @@ def commit_offsets_async(self, offsets, callback=None): # same order that they were added. Note also that BaseCoordinator # prevents multiple concurrent coordinator lookup requests. future = self.lookup_coordinator() - future.add_callback(self._do_commit_offsets_async, offsets, callback) + future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)) if callback: future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e))) From e848252fa22d7074c7b5d5b9b6a2830b4718ac39 Mon Sep 17 00:00:00 2001 From: Asaf Flescher Date: Mon, 11 Feb 2019 13:34:52 +0200 Subject: [PATCH 2/2] Actually called the partial application --- kafka/coordinator/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index e0f92ad95..b575664b2 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -458,7 +458,7 @@ def commit_offsets_async(self, offsets, callback=None): # same order that they were added. Note also that BaseCoordinator # prevents multiple concurrent coordinator lookup requests. future = self.lookup_coordinator() - future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)) + future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)()) if callback: future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e)))