Description
Hi,
I have been working on gevent support much like what @jeffwidman did a while ago. Obviously, for a proper gevent python program, avoiding to patch thread is just neither realistic nor reasonable because other parts of a gevent program may depend on thread being patched.
Previous important discussions about kafka-python and gevent include #1687 #1515
I spent some time digging into the issues of kafka-python with gevent. The issues are mostly about kafka consumers since it involves a heartbeat thread. My investigation is based on kafka-python version: 1.4.6 (the most recent release)
1. Starvation of heartbeat thread
My observation does not agree with @jeffwidman 's observation in #1515 (given that #1515 was created a year ago, it is possible that things might have changed). It is not related to other greenlets eating up CPU and starving the heartbeat. It is about the conflict of how gevent cooperative multitasking works and the usage of locks in kafka-python. The hang / starvation is due to heartbeat thread failing to acquire client lock.
Let's assume that heartbeat thread is starved (I'll explain the cause later). Why does it hang the consumer (main thread)? In consumer, https://github.com/dpkp/kafka-python/blob/1.4.6/kafka/consumer/group.py#L1088 shortcuts the fetch when time.time() > timeout_at
, with timeout_at = self._coordinator.time_to_next_poll() + time.time()
, which is 0 + time.time()
when heartbeat is starved. Since timeout_at is evaluated first, the next call to time.time()
is almost guaranteed to be larger than the previous call of 0 + time.time()
, that's why fetch is never accessed. (There are some rare cases where the 2nd time.time() call is not greater than the first time.time(). In those cases consumer can access the fetcher and fetch one message, then get "internal iterator timeout - breaking for poll", and get back to the time.time lottery again. The heartbeat thread remains starved.)
Now let's look at why heartbeat thread is starved. Heartbeat thread requires client (KafkaClient) lock: https://github.com/dpkp/kafka-python/blob/1.4.6/kafka/coordinator/base.py#L964 and https://github.com/dpkp/kafka-python/blob/1.4.6/kafka/coordinator/base.py#L997
In consumer's _message_generator, there is almost no way to yield to heartbeat thread's coroutine. Gevent automatically yields to other greenlets when monkey patch is applied and the patched network functions are called. But the network calls (e.g. _poll
) inside KafkaClient are protected by client's _lock
. In areas not protected by the locks, there are no functions that will yield cooperatively. This is how the heartbeat thread is starved.
This can be solved by adding time.sleep(0)
before acquiring the lock in KafkaClient. This gives a chance for heartbeat thread to run and acquire the lock. https://github.com/dpkp/kafka-python/blob/1.4.6/kafka/client_async.py#L570
2. (Past issue, fixed by commit 9f0b518, no releases yet) Heartbeat thread consuming fetch messages, making the fetch thread hang.
Due to a race condition, sometimes heartbeat thread consumes a fetch response. I am not sure whether this happens without gevent.
In these unfortunate cases, the fetch thread waiting for poll for the fetch response (that has been consumed by heartbeat thread) for a long time (a few minutes), as determined by https://github.com/dpkp/kafka-python/blob/1.4.6/kafka/client_async.py#L586 when timeout_ms is inf (_consumer_timeout
). timeout_ms
is inf when in_flight_fetches()
returns True before acquiring the lock and False after acquiring the lock, because heartbeat thread consumes the fetch response before main thread acquires the lock.
With commit 9f0b51, before really doing a network call, the number of in_flight_request_count of KafkaClient is checked such that client will not poll with a timeout of a few minutes. The root cause of this issue is that self._fetcher.in_flight_fetches
in _message_generator
in consumer is not protected by a lock. Also this problematic unprotected checking is proved redundant after commit 9f0b51.
Conclusion
In conclusion, I would like official support for gevent in kafka-python. I have yet to find any problems apart from the ones mentioned. It is actually not that hard to make it work. The key remaining part is to make sure it works reliably, meaning that more testing and code review should be done. My current understanding of the codebase is limited to this investigation. I am willing to work on it if this suggestion is accepted by the maintainers.
The motivation is that my production services which use gevent are now moving away from pykafka for stability reasons. Given that kafka-python is a mature and popular python kafka library, it would be great if kafka-python is compatible with gevent.