-
Notifications
You must be signed in to change notification settings - Fork 1
FYR-11936: Changes for python3. #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,8 @@ | ||
# Some simple testing tasks (sorry, UNIX only). | ||
|
||
FLAGS= | ||
KAFKA_VERSION=0.11.0.2 | ||
SCALA_VERSION=2.12 | ||
KAFKA_VERSION=0.8.1.1 | ||
SCALA_VERSION=2.10 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scala version has also been scaled down from |
||
|
||
setup: | ||
pip install -r requirements-dev.txt | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,6 @@ | |
OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload) | ||
from kafka.util import ReentrantTimer | ||
|
||
|
||
log = logging.getLogger('kafka.consumer') | ||
|
||
AUTO_COMMIT_MSG_COUNT = 100 | ||
|
@@ -21,8 +20,8 @@ | |
FETCH_DEFAULT_BLOCK_TIMEOUT = 1 | ||
FETCH_MAX_WAIT_TIME = 100 | ||
FETCH_MIN_BYTES = 4096 | ||
FETCH_BUFFER_SIZE_BYTES = 4096 | ||
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 | ||
FETCH_BUFFER_SIZE_BYTES = 262144 | ||
MAX_FETCH_BUFFER_SIZE_BYTES = 157286400 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have increased this ratio from 8 to 600, is this expected or may be some typo ? |
||
|
||
ITER_TIMEOUT_SECONDS = 60 | ||
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 | ||
|
@@ -66,7 +65,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, | |
self.auto_commit = auto_commit | ||
self.auto_commit_every_n = auto_commit_every_n | ||
self.auto_commit_every_t = auto_commit_every_t | ||
|
||
self.on_stop_callback = None | ||
# Set up the auto-commit timer | ||
if auto_commit is True and auto_commit_every_t is not None: | ||
self.commit_timer = ReentrantTimer(auto_commit_every_t, | ||
|
@@ -88,6 +87,10 @@ def cleanup(obj): | |
|
||
self.partition_info = False # Do not return partition info in msgs | ||
|
||
def register_on_stop_callback(self, fn): | ||
if self.on_stop_callback is None: | ||
self.on_stop_callback = fn | ||
|
||
def provide_partition_info(self): | ||
""" | ||
Indicates that partition info must be returned by the consumer | ||
|
@@ -149,11 +152,11 @@ def commit(self, partitions=None): | |
if partitions is None: # commit all partitions | ||
partitions = list(self.offsets.keys()) | ||
|
||
log.debug('Committing new offsets for %s, partitions %s', | ||
log.info('Committing new offsets for %s, partitions %s', | ||
self.topic, partitions) | ||
for partition in partitions: | ||
offset = self.offsets[partition] | ||
log.debug('Commit offset %d in SimpleConsumer: ' | ||
log.info('Commit offset %d in SimpleConsumer: ' | ||
'group=%s, topic=%s, partition=%s', | ||
offset, self.group, self.topic, partition) | ||
|
||
|
@@ -185,7 +188,12 @@ def stop(self): | |
if self.commit_timer is not None: | ||
self.commit_timer.stop() | ||
self.commit() | ||
|
||
if not self.auto_commit and self.on_stop_callback: | ||
try: | ||
log.info('executing "on_stop_callback"') | ||
self.on_stop_callback() | ||
except: | ||
log.exception('There was an error executing "on_stop_callback"') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If possible can we log some exception info in the logs as well. |
||
if hasattr(self, '_cleanup_func'): | ||
# Remove cleanup handler now that we've stopped | ||
|
||
|
@@ -206,6 +214,19 @@ def stop(self): | |
|
||
del self._cleanup_func | ||
|
||
|
||
def commit_offsets(self, offsets): | ||
assert not self.auto_commit, 'cannot manually commit offsets if autocommit is True' | ||
with self.commit_lock: | ||
reqs = [] | ||
for partition, offset in offsets.iteritems(): | ||
reqs.append(OffsetCommitRequestPayload(self.topic, partition, | ||
offset, None)) | ||
resps = self.client.send_offset_commit_request(self.group, reqs) | ||
for resp in resps: | ||
check_error(resp) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any mechanism by which if commit_offset fails. We can retry. |
||
self.count_since_commit = 0 | ||
|
||
def pending(self, partitions=None): | ||
""" | ||
Gets the pending message count | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we need to reduce Kafka & scala version for Py3 support. Seems counter intutive, just curious