From 1698d149058c5ce932abb8d07732e5b40b77436d Mon Sep 17 00:00:00 2001 From: "yongkun.wang" Date: Sun, 20 Apr 2014 23:20:27 +0900 Subject: [PATCH 1/3] add offset enhancement for consumer --- kafka/common.py | 14 ++++++++ kafka/consumer.py | 90 +++++++++++++++++++++++++++++++++++++---------- 2 files changed, 86 insertions(+), 18 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index 005e6dd06..aba073139 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -118,3 +118,17 @@ class ConsumerFetchSizeTooSmall(KafkaError): class ConsumerNoMoreData(KafkaError): pass + +class ClientOffset: + #Zero = 0 # start from 0, may not be available due + # to ttl + + Previous = -1 # position stores in zookeeper last time + + CurrentBeginning = -2 # current beginning offset (may not be 0) + + PreviousOrCurrentBeginning = -3 # Get previous offset firstly, if not + # available, use current beginning + + Latest = -4 # Start from latest, like tail + diff --git a/kafka/consumer.py b/kafka/consumer.py index 8ac28daf4..bbe73dd69 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -9,8 +9,9 @@ from kafka.common import ( ErrorMapping, FetchRequest, - OffsetRequest, OffsetCommitRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + ClientOffset ) from kafka.util import ReentrantTimer @@ -67,8 +68,17 @@ class Consumer(object): * initialization and fetching metadata of partitions * Auto-commit logic * APIs for fetching pending message count + Offset: + #ClientOffset.Zero or 0; + ClientOffset.Previous or -1; + ClientOffset.CurrentBeginning or -2; + ClientOffset.PreviousOrCurrentBeginning or -3; Default. + ClientOffset.Latest or -4; + Other value >= 0; """ - def __init__(self, client, group, topic, partitions=None, auto_commit=True, + def __init__(self, client, group, topic, partitions=None, + offset = ClientOffset.PreviousOrCurrentBeginning, + auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): @@ -95,7 +105,19 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, self.commit) self.commit_timer.start() - def get_or_init_offset_callback(resp): + def get_current_offsets_callback(resp): + if resp.error == ErrorMapping.NO_ERROR: + return resp.offsets + elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + return 0 + else: + raise Exception("OffsetRequest for topic=%s, " + "partition=%d failed with errorcode=%s" % ( + resp.topic, resp.partition, resp.error)) + + + # callback for fetching on zookeeper + def get_or_init_previous_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: return resp.offset elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: @@ -105,17 +127,42 @@ def get_or_init_offset_callback(resp): "partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) - # Uncomment for 0.8.1 - # - #for partition in partitions: - # req = OffsetFetchRequest(topic, partition) - # (offset,) = self.client.send_offset_fetch_request(group, [req], - # callback=get_or_init_offset_callback, - # fail_on_error=False) - # self.offsets[partition] = offset - + currTimeMs = int(time.time()*1000) + PAYLOAD_MAX_OFFSET = 2147483647 for partition in partitions: - self.offsets[partition] = 0 + # current stream + req = OffsetRequest(topic, partition,currTimeMs,PAYLOAD_MAX_OFFSET) + (raw_offsets,) = self.client.send_offset_request([req], + fail_on_error=False, + callback=get_current_offsets_callback) + offset_start = raw_offsets[-1] + offset_end = raw_offsets[0] + + # zookeeper + req = OffsetFetchRequest(topic, partition) + (last_offset,) = self.client.send_offset_fetch_request(group, [req], + callback=get_or_init_previous_offset_callback, + fail_on_error=False) + + if offset == ClientOffset.PreviousOrCurrentBeginning: + if offset_start <= last_offset <= offset_end: + self.offsets[partition] = last_offset + else: + self.offsets[partition] = offset_start + elif offset == ClientOffset.Previous: + self.offsets[partition] = last_offset + elif offset == ClientOffset.CurrentBeginning: + self.offsets[partition] = offset_start + elif offset == ClientOffset.Latest: + self.offsets[partition] = offset_end + elif offset >=0: + if offset_start <= offset <= offset_end: + for partition in partitions: + self.offsets[partition] = offset + else: + raise Exception("Invalid parameter value offset=%d," + "allowed range %d to %d" + % (offset,offset_start,offset_end)) def commit(self, partitions=None): """ @@ -205,6 +252,7 @@ class SimpleConsumer(Consumer): client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume + offset: default to previous position if available, or the current beginning partitions: An optional list of partitions to consume the data from auto_commit: default True. Whether or not to auto commit the offsets @@ -227,7 +275,9 @@ class SimpleConsumer(Consumer): commit method on this class. A manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=True, partitions=None, + def __init__(self, client, group, topic, + offset = ClientOffset.PreviousOrCurrentBeginning, + auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, fetch_size_bytes=FETCH_MIN_BYTES, @@ -450,7 +500,7 @@ def _fetch(self): log.debug("Done iterating over partition %s" % partition) partitions = retry_partitions -def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): +def _mp_consume(client, group, topic, offset, chunk, queue, start, exit, pause, size): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -467,6 +517,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # done by the master controller process. consumer = SimpleConsumer(client, group, topic, partitions=chunk, + offset = offset, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None) @@ -532,7 +583,9 @@ class MultiProcessConsumer(Consumer): commit method on this class. A manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=True, + def __init__(self, client, group, topic, + offset = ClientOffset.PreviousOrCurrentBeginning, + auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, num_procs=1, partitions_per_proc=0): @@ -540,6 +593,7 @@ def __init__(self, client, group, topic, auto_commit=True, # Initiate the base consumer class super(MultiProcessConsumer, self).__init__( client, group, topic, + offset=offset, partitions=None, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, @@ -572,7 +626,7 @@ def __init__(self, client, group, topic, auto_commit=True, for chunk in chunks: chunk = filter(lambda x: x is not None, chunk) args = (client.copy(), - group, topic, chunk, + group, topic, offset, chunk, self.queue, self.start, self.exit, self.pause, self.size) From e9fef0b42531df627ffddc52c40df354a17f1848 Mon Sep 17 00:00:00 2001 From: "yongkun.wang" Date: Wed, 7 May 2014 09:34:43 +0900 Subject: [PATCH 2/3] refactor on constant naming and exceptions --- kafka/common.py | 16 +++++----------- kafka/consumer.py | 23 ++++++++++++----------- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index aba073139..0c706594c 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -120,15 +120,9 @@ class ConsumerNoMoreData(KafkaError): pass class ClientOffset: - #Zero = 0 # start from 0, may not be available due - # to ttl - - Previous = -1 # position stores in zookeeper last time - - CurrentBeginning = -2 # current beginning offset (may not be 0) - - PreviousOrCurrentBeginning = -3 # Get previous offset firstly, if not - # available, use current beginning - - Latest = -4 # Start from latest, like tail + PREVIOUS = -1 # position stores in zookeeper last time + CURRENT_BEGINNING = -2 # current beginning offset (may not be 0) + PREVIOUS_OR_CURRENT_BEGINNING = -3 # Get previous offset firstly, if not + # available, use current beginning + LATEST = -4 # Start from latest, like tail diff --git a/kafka/consumer.py b/kafka/consumer.py index bbe73dd69..6a1c695ce 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -11,7 +11,7 @@ ErrorMapping, FetchRequest, OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, - ClientOffset + ClientOffset, BrokerResponseError ) from kafka.util import ReentrantTimer @@ -77,7 +77,7 @@ class Consumer(object): Other value >= 0; """ def __init__(self, client, group, topic, partitions=None, - offset = ClientOffset.PreviousOrCurrentBeginning, + offset = ClientOffset.PREVIOUS_OR_CURRENT_BEGINNING, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): @@ -111,7 +111,7 @@ def get_current_offsets_callback(resp): elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: return 0 else: - raise Exception("OffsetRequest for topic=%s, " + raise BrokerResponseError("OffsetRequest for topic=%s, " "partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) @@ -123,7 +123,7 @@ def get_or_init_previous_offset_callback(resp): elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: return 0 else: - raise Exception("OffsetFetchRequest for topic=%s, " + raise BrokerResponseError("OffsetFetchRequest for topic=%s, " "partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) @@ -144,23 +144,23 @@ def get_or_init_previous_offset_callback(resp): callback=get_or_init_previous_offset_callback, fail_on_error=False) - if offset == ClientOffset.PreviousOrCurrentBeginning: + if offset == ClientOffset.PREVIOUS_OR_CURRENT_BEGINNING: if offset_start <= last_offset <= offset_end: self.offsets[partition] = last_offset else: self.offsets[partition] = offset_start - elif offset == ClientOffset.Previous: + elif offset == ClientOffset.PREVIOUS: self.offsets[partition] = last_offset - elif offset == ClientOffset.CurrentBeginning: + elif offset == ClientOffset.CURRENT_BEGINNING: self.offsets[partition] = offset_start - elif offset == ClientOffset.Latest: + elif offset == ClientOffset.LATEST: self.offsets[partition] = offset_end elif offset >=0: if offset_start <= offset <= offset_end: for partition in partitions: self.offsets[partition] = offset else: - raise Exception("Invalid parameter value offset=%d," + raise ValueError("Invalid parameter value offset=%d," "allowed range %d to %d" % (offset,offset_start,offset_end)) @@ -276,7 +276,7 @@ class SimpleConsumer(Consumer): these triggers """ def __init__(self, client, group, topic, - offset = ClientOffset.PreviousOrCurrentBeginning, + offset = ClientOffset.PREVIOUS_OR_CURRENT_BEGINNING, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, @@ -286,6 +286,7 @@ def __init__(self, client, group, topic, iter_timeout=None): super(SimpleConsumer, self).__init__( client, group, topic, + offset=offset, partitions=partitions, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, @@ -584,7 +585,7 @@ class MultiProcessConsumer(Consumer): these triggers """ def __init__(self, client, group, topic, - offset = ClientOffset.PreviousOrCurrentBeginning, + offset = ClientOffset.PREVIOUS_OR_CURRENT_BEGINNING, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, From 22f429bb0e700114df078f8eb42e06186ee6237f Mon Sep 17 00:00:00 2001 From: "yongkun.wang" Date: Mon, 2 Mar 2015 12:55:59 +0900 Subject: [PATCH 3/3] add try except to skip wrong partitions --- kafka/client.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 39c89ba43..f09474d3e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -269,13 +269,18 @@ def load_metadata_for_topics(self, *topics): self.topic_partitions[topic] = [] for partition, meta in partitions.items(): - self.topic_partitions[topic].append(partition) - topic_part = TopicAndPartition(topic, partition) - if meta.leader == -1: - log.warning('No leader for topic %s partition %s', topic, partition) - self.topics_to_brokers[topic_part] = None - else: - self.topics_to_brokers[topic_part] = brokers[meta.leader] + try: + + self.topic_partitions[topic].append(partition) + topic_part = TopicAndPartition(topic, partition) + if meta.leader == -1: + log.warning('No leader for topic %s partition %s', topic, partition) + self.topics_to_brokers[topic_part] = None + else: + self.topics_to_brokers[topic_part] = brokers[meta.leader] + + except Exception, e: + logging.warn("Failed to get partition meta for topic " + topic + "," + str(e)) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None):