Skip to content

BinLogStreamReader is not resuming from last position (log_file, log_pos not working). #618

Open
@zygias

Description

@zygias

Hello, i face issue, that BinLogStreamReader is not resuming from last position.
I have hardcoded 1146 position, but process takes 875 position too. It should resume log stream from >1146 position.

    stream = BinLogStreamReader(
        connection_settings=config,
        server_id=1,
        only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
        is_mariadb=True,
        blocking=True,
        resume_stream=True,
        # log_file=log_file,
        # log_pos=log_pos,
        log_file="mysqld-bin.000003",
        log_pos=1146,
    )

MariaDB version: 10.3.39
Package version: mysql-replication=="0.45.1" (Works with 10.3.39 MariaDB version)

my.cnf:

[mysqld]
log-bin
binlog-format=ROW
server-id=1
binlog_row_image=FULL

Process logs:

INFO:root:Resuming from mysqld-bin.000003:1146
INFO:root:Starting BinLogStreamReader with log_file=mysqld-bin.000003 and log_pos=1146
DEBUG:root:BinLogStreamReader initialized with log_file=mysqld-bin.000003 and log_pos=1146
INFO:root: -> Insert into mydatabase.example_table: {'id': 1, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 34, 17), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:875
INFO:root: -> Insert into mydatabase.example_table: {'id': 2, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 56, 39), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:1146

Python code:

import time
# mysql-replication=="0.45.1"
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
from pymysqlreplication.event import RotateEvent
import logging

# Database connection parameters
config = {
    'host': 'localhost',
    'port': 3307,
    'user': 'root',
    'password': 'rootpassword',
    'database': 'mydatabase'
}

# Setup logging
logging.basicConfig(level=logging.DEBUG)


def save_binlog_position(log_file, log_pos):
    with open("binlog_position.txt", "w") as f:
        f.write(f"{log_file},{log_pos}")
        logging.info(f"Saved logs: {log_file}:{log_pos}")


def load_binlog_position():
    try:
        with open("binlog_position.txt", "r") as f:
            log_file, log_pos = f.read().strip().split(',')
            logging.info(f"Resuming from {log_file}:{log_pos}")
            return log_file, int(log_pos)
    except FileNotFoundError:
        logging.info("Starting from beginning")
        return None, None


def get_values_from_logs(stream):
    for event in stream:
        table_name = event.table
        if isinstance(event, WriteRowsEvent):
            for row in event.rows:
                row['values'].update({"_deleted": False})
                logging.info(f" -> Insert into {event.schema}.{event.table}: {row['values']}")
        elif isinstance(event, UpdateRowsEvent):
            for row in event.rows:
                row['after_values'].update({"_deleted": False})
                logging.info(
                    f" -> Update {event.schema}.{event.table}: {row['before_values']} -> {row['after_values']}")
        elif isinstance(event, DeleteRowsEvent):
            for row in event.rows:
                row['values'].update({"_deleted": True})
                logging.info(f" -> Delete from {event.schema}.{event.table}: {row['values']}")
        log_file, log_pos = stream.log_file, stream.log_pos
        save_binlog_position(log_file, log_pos)


# Function to parse binary log events
def parse_binlog_events():
    log_file, log_pos = load_binlog_position()

    logging.info(f"Starting BinLogStreamReader with log_file={log_file} and log_pos={log_pos}")

    stream = BinLogStreamReader(
        connection_settings=config,
        server_id=1,
        only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
        is_mariadb=True,
        blocking=True,
        resume_stream=True,
        # log_file=log_file,
        # log_pos=log_pos,
        log_file="mysqld-bin.000003",
        log_pos=1146,
    )

    logging.debug(f"BinLogStreamReader initialized with log_file={stream.log_file} and log_pos={stream.log_pos}")
    get_values_from_logs(stream)
    stream.close()


if __name__ == "__main__":
    parse_binlog_events()

Docker-compose:

version: '3'

services:
  mariadb:
    image: mariadb:10.3.39
    ports:
      - "3307:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_DATABASE: mydatabase
      MYSQL_USER: myuser
      MYSQL_PASSWORD: mypassword
    volumes:
      - ./mariadb/my.cnf:/etc/mysql/conf.d/my.cnf

Anyone could help? Thank you in advance!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions