-
Notifications
You must be signed in to change notification settings - Fork 1
FYR-11936: Changes for python3. #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Changes not incorporated is fetch loop the messages in thread since it was failing the IT. For debugging, I put the logs. In logs I found that the messages are being put into the queue but reading the messages is giving empty queue error even after putting some time.sleep. Please let me know if its fine to have this code as it is. |
Talking about the change on line 473 in consumer.py file. dpkp/kafka-python@master...Livefyre:master#diff-a2ba34b32ce95607a85319056b0dc3e9R301 |
kafka/consumer/simple.py
Outdated
@@ -439,6 +457,7 @@ def _fetch(self): | |||
message) | |||
continue | |||
# Put the message in our queue | |||
self.queue.put((partition, message)) | |||
meta = META(partition, high_water_mark) | |||
self.queue.put((meta, message), block=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check this.
@@ -1,8 +1,8 @@ | |||
# Some simple testing tasks (sorry, UNIX only). | |||
|
|||
FLAGS= | |||
KAFKA_VERSION=0.11.0.2 | |||
SCALA_VERSION=2.12 | |||
KAFKA_VERSION=0.8.1.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we need to reduce Kafka & scala version for Py3 support. Seems counter intutive, just curious
KAFKA_VERSION=0.11.0.2 | ||
SCALA_VERSION=2.12 | ||
KAFKA_VERSION=0.8.1.1 | ||
SCALA_VERSION=2.10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala version has also been scaled down from 2.12
to 2.10
FETCH_BUFFER_SIZE_BYTES = 4096 | ||
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 | ||
FETCH_BUFFER_SIZE_BYTES = 262144 | ||
MAX_FETCH_BUFFER_SIZE_BYTES = 157286400 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have increased this ratio from 8 to 600, is this expected or may be some typo ?
Also, would you please use the previous format like FETCH_BUFFER_SIZE_BYTES * 600
, so we can change only place.
log.info('executing "on_stop_callback"') | ||
self.on_stop_callback() | ||
except: | ||
log.exception('There was an error executing "on_stop_callback"') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible can we log some exception info in the logs as well.
offset, None)) | ||
resps = self.client.send_offset_commit_request(self.group, reqs) | ||
for resp in resps: | ||
check_error(resp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any mechanism by which if commit_offset fails. We can retry.
No description provided.