Skip to content

New Feature: asyncio support for BinLogStreamReader #593

Open
@gongyisheng

Description

@gongyisheng

Hi python-mysql-replication community, I want to request asyncio support for BinLogStreamReader.

My use case here is to use BinLogStreamReader to receive binlog message and forward it to a message queue and a consumer will process it. Here's an example of my code:

def binlog_subscribe():
        stream = BinLogStreamReader(
                connection_settings=MYSQL_SETTINGS, 
                server_id=2, 
                only_events=[DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent],
                blocking=True,
                enable_logging=False,
                only_schemas=set(["test"]),
                only_tables=set(["binlog_test"]))
        
        for binlogevent in stream:
                for row in binlogevent.rows:
                        yield row
 
async def forward_message():
        while True:
                try:
                        message = await asyncio.wait_for(memory_queue.get(), timeout=XXX)
                        await message_queue.push(message)
                except asyncio.TimeoutError:
                        logging.debug("Memory queue is empty")

async def main():
        forward_task = asyncio.create_task(forward_message())
        for message in binlog_subscribe():
                await memory_queue.push(row)

if __name__ == "__main__":
        asyncio.run()

Due to the fact that BinLogStreamReader __iter__ method is blocking, it blocks the asyncio event loop and causes some side effects, eg.

  1. blocking background health check with message queue server under low qps
  2. limit cpu usage under high qps.
  3. some of the message in the memory queue is not sent out in time

And I see the examples like https://github.com/julien-duponchelle/python-mysql-replication/blob/main/examples/mysql_to_kafka.py, it's a common use case to have a forward service to listen binlog stream and forward it to somewhere else.

Currently the package does not support asyncio. I think asyncio is a fit for this package because reading binlog is I/O process and asyncio allows parallel processing for multiple I/O process. There's also open source 3p lib that supports asyncio like aiomysql, aiokafka so I think there're use cases to have asyncio support. For order guarantee, as long as we continue to use iterator interface it will not cause message disorder issues.

Let me know your idea!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions