Open
Description
Hello, i face issue, that BinLogStreamReader is not resuming from last position.
I have hardcoded 1146 position, but process takes 875 position too. It should resume log stream from >1146 position.
stream = BinLogStreamReader(
connection_settings=config,
server_id=1,
only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
is_mariadb=True,
blocking=True,
resume_stream=True,
# log_file=log_file,
# log_pos=log_pos,
log_file="mysqld-bin.000003",
log_pos=1146,
)
MariaDB version: 10.3.39
Package version: mysql-replication=="0.45.1"
(Works with 10.3.39 MariaDB version)
my.cnf:
[mysqld]
log-bin
binlog-format=ROW
server-id=1
binlog_row_image=FULL
Process logs:
INFO:root:Resuming from mysqld-bin.000003:1146
INFO:root:Starting BinLogStreamReader with log_file=mysqld-bin.000003 and log_pos=1146
DEBUG:root:BinLogStreamReader initialized with log_file=mysqld-bin.000003 and log_pos=1146
INFO:root: -> Insert into mydatabase.example_table: {'id': 1, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 34, 17), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:875
INFO:root: -> Insert into mydatabase.example_table: {'id': 2, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 56, 39), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:1146
Python code:
import time
# mysql-replication=="0.45.1"
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
from pymysqlreplication.event import RotateEvent
import logging
# Database connection parameters
config = {
'host': 'localhost',
'port': 3307,
'user': 'root',
'password': 'rootpassword',
'database': 'mydatabase'
}
# Setup logging
logging.basicConfig(level=logging.DEBUG)
def save_binlog_position(log_file, log_pos):
with open("binlog_position.txt", "w") as f:
f.write(f"{log_file},{log_pos}")
logging.info(f"Saved logs: {log_file}:{log_pos}")
def load_binlog_position():
try:
with open("binlog_position.txt", "r") as f:
log_file, log_pos = f.read().strip().split(',')
logging.info(f"Resuming from {log_file}:{log_pos}")
return log_file, int(log_pos)
except FileNotFoundError:
logging.info("Starting from beginning")
return None, None
def get_values_from_logs(stream):
for event in stream:
table_name = event.table
if isinstance(event, WriteRowsEvent):
for row in event.rows:
row['values'].update({"_deleted": False})
logging.info(f" -> Insert into {event.schema}.{event.table}: {row['values']}")
elif isinstance(event, UpdateRowsEvent):
for row in event.rows:
row['after_values'].update({"_deleted": False})
logging.info(
f" -> Update {event.schema}.{event.table}: {row['before_values']} -> {row['after_values']}")
elif isinstance(event, DeleteRowsEvent):
for row in event.rows:
row['values'].update({"_deleted": True})
logging.info(f" -> Delete from {event.schema}.{event.table}: {row['values']}")
log_file, log_pos = stream.log_file, stream.log_pos
save_binlog_position(log_file, log_pos)
# Function to parse binary log events
def parse_binlog_events():
log_file, log_pos = load_binlog_position()
logging.info(f"Starting BinLogStreamReader with log_file={log_file} and log_pos={log_pos}")
stream = BinLogStreamReader(
connection_settings=config,
server_id=1,
only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
is_mariadb=True,
blocking=True,
resume_stream=True,
# log_file=log_file,
# log_pos=log_pos,
log_file="mysqld-bin.000003",
log_pos=1146,
)
logging.debug(f"BinLogStreamReader initialized with log_file={stream.log_file} and log_pos={stream.log_pos}")
get_values_from_logs(stream)
stream.close()
if __name__ == "__main__":
parse_binlog_events()
Docker-compose:
version: '3'
services:
mariadb:
image: mariadb:10.3.39
ports:
- "3307:3306"
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: mydatabase
MYSQL_USER: myuser
MYSQL_PASSWORD: mypassword
volumes:
- ./mariadb/my.cnf:/etc/mysql/conf.d/my.cnf
Anyone could help? Thank you in advance!
Metadata
Metadata
Assignees
Labels
No labels