Skip to content

FYR-11936: Changes for python3. #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Some simple testing tasks (sorry, UNIX only).

FLAGS=
KAFKA_VERSION=0.11.0.2
SCALA_VERSION=2.12
KAFKA_VERSION=0.8.1.1
Copy link

@asomani5 asomani5 Jun 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we need to reduce Kafka & scala version for Py3 support. Seems counter intutive, just curious

SCALA_VERSION=2.10

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala version has also been scaled down from 2.12 to 2.10


setup:
pip install -r requirements-dev.txt
Expand Down
2 changes: 1 addition & 1 deletion build_integration.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.1 0.11.0.2 1.0.2 1.1.1 2.0.1"}
: ${ALL_RELEASES:="0.8.1.1 0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.1 0.11.0.2 1.0.2 1.1.1 2.0.1"}
: ${SCALA_VERSION:=2.11}
: ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/}
: ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git}
Expand Down
35 changes: 28 additions & 7 deletions kafka/consumer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload)
from kafka.util import ReentrantTimer


log = logging.getLogger('kafka.consumer')

AUTO_COMMIT_MSG_COUNT = 100
Expand All @@ -21,8 +20,8 @@
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
FETCH_BUFFER_SIZE_BYTES = 4096
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
FETCH_BUFFER_SIZE_BYTES = 262144
MAX_FETCH_BUFFER_SIZE_BYTES = 157286400

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have increased this ratio from 8 to 600, is this expected or may be some typo ?
Also, would you please use the previous format like FETCH_BUFFER_SIZE_BYTES * 600, so we can change only place.


ITER_TIMEOUT_SECONDS = 60
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
Expand Down Expand Up @@ -66,7 +65,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
self.auto_commit = auto_commit
self.auto_commit_every_n = auto_commit_every_n
self.auto_commit_every_t = auto_commit_every_t

self.on_stop_callback = None
# Set up the auto-commit timer
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
Expand All @@ -88,6 +87,10 @@ def cleanup(obj):

self.partition_info = False # Do not return partition info in msgs

def register_on_stop_callback(self, fn):
if self.on_stop_callback is None:
self.on_stop_callback = fn

def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
Expand Down Expand Up @@ -149,11 +152,11 @@ def commit(self, partitions=None):
if partitions is None: # commit all partitions
partitions = list(self.offsets.keys())

log.debug('Committing new offsets for %s, partitions %s',
log.info('Committing new offsets for %s, partitions %s',
self.topic, partitions)
for partition in partitions:
offset = self.offsets[partition]
log.debug('Commit offset %d in SimpleConsumer: '
log.info('Commit offset %d in SimpleConsumer: '
'group=%s, topic=%s, partition=%s',
offset, self.group, self.topic, partition)

Expand Down Expand Up @@ -185,7 +188,12 @@ def stop(self):
if self.commit_timer is not None:
self.commit_timer.stop()
self.commit()

if not self.auto_commit and self.on_stop_callback:
try:
log.info('executing "on_stop_callback"')
self.on_stop_callback()
except:
log.exception('There was an error executing "on_stop_callback"')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible can we log some exception info in the logs as well.

if hasattr(self, '_cleanup_func'):
# Remove cleanup handler now that we've stopped

Expand All @@ -206,6 +214,19 @@ def stop(self):

del self._cleanup_func


def commit_offsets(self, offsets):
assert not self.auto_commit, 'cannot manually commit offsets if autocommit is True'
with self.commit_lock:
reqs = []
for partition, offset in offsets.iteritems():
reqs.append(OffsetCommitRequestPayload(self.topic, partition,
offset, None))
resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
check_error(resp)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any mechanism by which if commit_offset fails. We can retry.

self.count_since_commit = 0

def pending(self, partitions=None):
"""
Gets the pending message count
Expand Down
10 changes: 5 additions & 5 deletions kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ def __iter__(self):
# a chance to run and put some messages in the queue
# TODO: This is a hack and will make the consumer block for
# at least one second. Need to find a better way of doing this
partition, message = self.queue.get(block=True, timeout=1)
meta, message = self.queue.get(block=True, timeout=1)
except queue.Empty:
break

# Count, check and commit messages if necessary
self.offsets[partition] = message.offset + 1
self.offsets[meta.partition] = message.offset + 1
self.events.start.clear()
self.count_since_commit += 1
self._auto_commit()
Expand Down Expand Up @@ -271,14 +271,14 @@ def get_messages(self, count=1, block=True, timeout=10):

block_next_call = block is True or block > len(messages)
try:
partition, message = self.queue.get(block_next_call,
meta, message = self.queue.get(block_next_call,
timeout)
except queue.Empty:
break

_msg = (partition, message) if self.partition_info else message
_msg = (meta, message) if self.partition_info else message
messages.append(_msg)
new_offsets[partition] = message.offset + 1
new_offsets[meta.partition] = message.offset + 1
count -= 1
if timeout is not None:
timeout = max_time - time.time()
Expand Down
48 changes: 33 additions & 15 deletions kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
from kafka.errors import (
KafkaError, ConsumerFetchSizeTooSmall,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
OffsetOutOfRangeError, FailedPayloadsError, check_error, BufferTooLargeError
)
from kafka.protocol.message import PartialMessage
from kafka.structs import FetchRequestPayload, OffsetRequestPayload

from collections import namedtuple

log = logging.getLogger(__name__)

MAX_QUEUE_SIZE = 10 * 1024
META = namedtuple("meta", ["partition", "high_water_mark"])

class FetchContext(object):
"""
Expand Down Expand Up @@ -118,7 +119,9 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None,
auto_offset_reset='largest'):
auto_offset_reset='largest',
partition_info=False,
skip_buffer_size_error=True):
warnings.warn('deprecated - this class will be removed in a future'
' release. Use KafkaConsumer instead.',
DeprecationWarning)
Expand All @@ -134,13 +137,16 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
'max_buffer_size (%d)' %
(buffer_size, max_buffer_size))
self.buffer_size = buffer_size
self.partition_info = partition_info
self.max_buffer_size = max_buffer_size
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.auto_offset_reset = auto_offset_reset
self.queue = queue.Queue()
self.queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)
self.skip_buffer_size_error = skip_buffer_size_error


def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
Expand Down Expand Up @@ -255,7 +261,7 @@ def seek(self, offset, whence=None, partition=None):
if self.auto_commit:
self.commit()

self.queue = queue.Queue()
self.queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)

def get_messages(self, count=1, block=True, timeout=0.1):
"""
Expand Down Expand Up @@ -289,10 +295,10 @@ def get_messages(self, count=1, block=True, timeout=0.1):
continue
break

partition, message = result
_msg = (partition, message) if self.partition_info else message
meta, message = result
_msg = result if self.partition_info else message
messages.append(_msg)
new_offsets[partition] = message.offset + 1
new_offsets[meta.partition] = message.offset + 1

# Update and commit offsets if necessary
self.offsets.update(new_offsets)
Expand All @@ -318,16 +324,14 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
log.debug('internal queue empty, fetching more messages')
with FetchContext(self, block, timeout):
self._fetch()

if not block or time.time() > (start_at + timeout):
break

try:
partition, message = self.queue.get_nowait()

meta, message = self.queue.get_nowait()
if update_offset:
# Update partition offset
self.offsets[partition] = message.offset + 1
self.offsets[meta.partition] = message.offset + 1

# Count, check and commit messages if necessary
self.count_since_commit += 1
Expand All @@ -336,13 +340,16 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
if get_partition_info is None:
get_partition_info = self.partition_info
if get_partition_info:
return partition, message
return meta, message
else:
return message
except queue.Empty:
log.debug('internal queue empty after fetch - returning None')
return None

def stop(self):
super(SimpleConsumer, self).stop()

def __iter__(self):
if self.iter_timeout is None:
timeout = ITER_TIMEOUT_SECONDS
Expand Down Expand Up @@ -410,6 +417,7 @@ def _fetch(self):
continue

partition = resp.partition
high_water_mark = resp.highwaterMark
buffer_size = partitions[partition]

# Check for partial message
Expand All @@ -426,6 +434,15 @@ def _fetch(self):

if self.max_buffer_size is None:
buffer_size *= 2
if self.skip_buffer_size_error:
if self.buffer_size > MAX_FETCH_BUFFER_SIZE_BYTES:
log.error('Message size exceeded maximum allowed of {0}'.format(MAX_FETCH_BUFFER_SIZE_BYTES))
log.error('Current buffer_size is: {0}'.format(self.buffer_size))
log.error('topic: {0}, partition: {1}, offset:{2}'.format(self.topic, partition, self.fetch_offsets[partition]))
old_offset = self.fetch_offsets[partition]
self.seek(1, 1, partition=partition)
log.error('Incremented offset. New offset is: {0}'.format(self.offsets[partition]))
raise BufferTooLargeError(self.topic, partition, old_offset, self.offsets[partition])
else:
buffer_size = min(buffer_size * 2, self.max_buffer_size)
log.warning('Fetch size too small, increase to %d (2x) '
Expand All @@ -439,6 +456,7 @@ def _fetch(self):
message)
continue
# Put the message in our queue
self.queue.put((partition, message))
meta = META(partition, high_water_mark)
self.queue.put((meta, message))
self.fetch_offsets[partition] = message.offset + 1
partitions = retry_partitions
12 changes: 12 additions & 0 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,18 @@ class KafkaConfigurationError(KafkaError):
class QuotaViolationError(KafkaError):
pass

class BufferTooLargeError(KafkaError):

def __init__(self, topic, partition, old_offset, new_offset):
self.topic = topic
self.partition = partition
self.old_offset = old_offset
self.new_offset = new_offset

def __repr__(self):
return 'topic: {0}, partition: {1}, old_offset: {2}, new_offset: {3}'.format(self.topic, self.partition, self.old_offset, self.new_offset)



class AsyncProducerQueueFull(KafkaError):
def __init__(self, failed_msgs, *args):
Expand Down
Loading