Skip to content

FYR-11936: Changes for python3. #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open

FYR-11936: Changes for python3. #1

wants to merge 2 commits into from

Conversation

sachiin
Copy link
Owner

@sachiin sachiin commented Jun 11, 2019

No description provided.

@sachiin
Copy link
Owner Author

sachiin commented Jun 11, 2019

Changes not incorporated is fetch loop the messages in thread since it was failing the IT. For debugging, I put the logs. In logs I found that the messages are being put into the queue but reading the messages is giving empty queue error even after putting some time.sleep. Please let me know if its fine to have this code as it is.

@sachiin
Copy link
Owner Author

sachiin commented Jun 11, 2019

Changes not incorporated is fetch loop the messages in thread since it was failing the IT. For debugging, I put the logs. In logs I found that the messages are being put into the queue but reading the messages is giving empty queue error even after putting some time.sleep. Please let me know if its fine to have this code as it is.

Talking about the change on line 473 in consumer.py file. dpkp/kafka-python@master...Livefyre:master#diff-a2ba34b32ce95607a85319056b0dc3e9R301

@@ -439,6 +457,7 @@ def _fetch(self):
message)
continue
# Put the message in our queue
self.queue.put((partition, message))
meta = META(partition, high_water_mark)
self.queue.put((meta, message), block=True)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check this.

@@ -1,8 +1,8 @@
# Some simple testing tasks (sorry, UNIX only).

FLAGS=
KAFKA_VERSION=0.11.0.2
SCALA_VERSION=2.12
KAFKA_VERSION=0.8.1.1
Copy link

@asomani5 asomani5 Jun 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we need to reduce Kafka & scala version for Py3 support. Seems counter intutive, just curious

KAFKA_VERSION=0.11.0.2
SCALA_VERSION=2.12
KAFKA_VERSION=0.8.1.1
SCALA_VERSION=2.10

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala version has also been scaled down from 2.12 to 2.10

FETCH_BUFFER_SIZE_BYTES = 4096
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
FETCH_BUFFER_SIZE_BYTES = 262144
MAX_FETCH_BUFFER_SIZE_BYTES = 157286400

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have increased this ratio from 8 to 600, is this expected or may be some typo ?
Also, would you please use the previous format like FETCH_BUFFER_SIZE_BYTES * 600, so we can change only place.

log.info('executing "on_stop_callback"')
self.on_stop_callback()
except:
log.exception('There was an error executing "on_stop_callback"')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible can we log some exception info in the logs as well.

offset, None))
resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
check_error(resp)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any mechanism by which if commit_offset fails. We can retry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants