From fae71d3892bcd18a7af9ee30dfd7368ef5ff6541 Mon Sep 17 00:00:00 2001 From: Joshua Sorenson Date: Wed, 24 Jun 2015 17:59:05 -0700 Subject: [PATCH 1/3] Allow keyedproducers to send batches of messages with different keys. Given a list of single key dictionaries, use the keys as the `key` and value as the `message` for sending messages. --- kafka/producer/keyed.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index a5a26c950..b5b51f442 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,6 +7,8 @@ from ..partitioner import HashedPartitioner from ..util import kafka_bytestring +from collections import defaultdict + log = logging.getLogger(__name__) @@ -37,6 +39,25 @@ def _next_partition(self, topic, key): partitioner = self.partitioners[topic] return partitioner.partition(key) + def send_messages_with_keys(self, topic, *msg): + """ Given a list of single key dictionaries, use the keys as the `key` + and value as the `message` for sending messages. """ + + msg_grouping = defaultdict(list) + for m in msg: + if isinstance(m, tuple): + k, v = m + elif isinstance(m, dict): + k, v = m.items()[0] + else: + raise TypeError("all msgs must be a tuple eg (key, msg) or dict eg {key: msg}") + + msg_grouping[k].append(v) + + for key, message in msg_grouping.items(): + partition = self._next_partition(topic, key) + return self._send_messages(topic, partition, *message, key=key) + def send_messages(self, topic, key, *msg): topic = kafka_bytestring(topic) partition = self._next_partition(topic, key) From 756ec4dc37e4c3ade7c7a9990918fe712a61b1ed Mon Sep 17 00:00:00 2001 From: Joshua Sorenson Date: Thu, 23 Jul 2015 12:51:39 -0700 Subject: [PATCH 2/3] Gevent friendly constructor and copy methods --- kafka/client.py | 20 +++++++++++++++----- kafka/conn.py | 16 +++++++++------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 817c62152..f1f9254ab 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -28,6 +28,7 @@ class KafkaClient(object): # socket timeout. def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, + activate=True, correlation_id=0): # We need one connection to bootstrap self.client_id = kafka_bytestring(client_id) @@ -41,8 +42,9 @@ def __init__(self, hosts, client_id=CLIENT_ID, self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata self.topic_partitions = {} # topic -> partition -> PartitionMetadata - self.load_metadata_for_topics() # bootstrap with all metadata - + if activate: + self.load_metadata_for_topics() # bootstrap with all metadata + ################## # Private API # @@ -271,9 +273,17 @@ def copy(self): Note that the copied connections are not initialized, so reinit() must be called on the returned copy. """ - c = copy.deepcopy(self) - for key in c.conns: - c.conns[key] = self.conns[key].copy() + #c = copy.deepcopy(self) + #for key in c.conns: + # c.conns[key] = self.conns[key].copy() + #return c + c = KafkaClient(hosts=['{0}:{1}'.format(entry[0], entry[1]) for entry in self.hosts], + client_id=self.client_id, + timeout=self.timeout, + correlation_id=self.correlation_id, + activate=False) + for k, v in self.conns.iteritems(): + c.conns[k] = v.copy() return c def reinit(self): diff --git a/kafka/conn.py b/kafka/conn.py index 432e10b0c..473e825e6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -54,14 +54,15 @@ class KafkaConnection(local): timeout: default 120. The socket timeout for sending and receiving data in seconds. None means no timeout, so a request can block forever. """ - def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, activate=True): super(KafkaConnection, self).__init__() self.host = host self.port = port self.timeout = timeout self._sock = None - self.reinit() + if activate: + self.reinit() def __getnewargs__(self): return (self.host, self.port, self.timeout) @@ -167,12 +168,13 @@ def copy(self): The returned copy is not connected; you must call reinit() before using. """ - c = copy.deepcopy(self) + #c = copy.deepcopy(self) # Python 3 doesn't copy custom attributes of the threadlocal subclass - c.host = copy.copy(self.host) - c.port = copy.copy(self.port) - c.timeout = copy.copy(self.timeout) - c._sock = None + #c.host = copy.copy(self.host) + #c.port = copy.copy(self.port) + #c.timeout = copy.copy(self.timeout) + #c._sock = None + c = KafkaConnection(host=self.host, port=self.port, timeout=self.timeout, activate=False) return c def close(self): From 1ca43e3764f0cca06a3972b25629011a350fde6d Mon Sep 17 00:00:00 2001 From: Ian Wilson Date: Mon, 3 Aug 2015 19:41:05 -0700 Subject: [PATCH 3/3] fixing problem with send_messages_with_keys() where only one request would be made --- kafka/producer/keyed.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index b5b51f442..8922e20d1 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -54,9 +54,14 @@ def send_messages_with_keys(self, topic, *msg): msg_grouping[k].append(v) + # collect all responses for each request and send back as a list + responses = [] + for key, message in msg_grouping.items(): partition = self._next_partition(topic, key) - return self._send_messages(topic, partition, *message, key=key) + responses.append(self._send_messages(topic, partition, *message, key=key)) + + return responses def send_messages(self, topic, key, *msg): topic = kafka_bytestring(topic)