From 3386cda852ba6e99b81572ce9e5987982c06f68c Mon Sep 17 00:00:00 2001 From: Bartek Ogryczak Date: Thu, 28 May 2015 18:34:20 -0700 Subject: [PATCH] Adding support for skipping the binlog until reaching specified timestamp. --- pymysqlreplication/binlogstream.py | 7 ++++++- pymysqlreplication/tests/test_basic.py | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 13e1981e..ff5b296e 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -40,7 +40,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False, filter_non_implemented_events=True, ignored_events=None, auto_position=None, only_tables=None, only_schemas=None, - freeze_schema=False): + freeze_schema=False, skip_to_timestamp=None): """ Attributes: resume_stream: Start for event from position or the latest event of @@ -54,6 +54,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False, only_tables: An array with the tables you want to watch only_schemas: An array with the schemas you want to watch freeze_schema: If true do not support ALTER TABLE. It's faster. + skip_to_timestamp: Ignore all events until reaching specified timestamp. """ self.__connection_settings = connection_settings self.__connection_settings["charset"] = "utf8" @@ -82,6 +83,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False, self.log_pos = log_pos self.log_file = log_file self.auto_position = auto_position + self.skip_to_timestamp = skip_to_timestamp def close(self): if self.__connected_stream: @@ -259,6 +261,9 @@ def fetchone(self): self.__only_schemas, self.__freeze_schema) + if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp: + continue + if binlog_event.event_type == TABLE_MAP_EVENT and \ binlog_event.event is not None: self.table_map[binlog_event.event.table_id] = \ diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index d242eac7..83993bea 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- +import time + from pymysqlreplication.tests import base from pymysqlreplication import BinLogStreamReader from pymysqlreplication.event import * @@ -404,6 +406,26 @@ def test_log_pos_handles_disconnects(self): self.assertGreater(self.stream.log_pos, 0) + def test_skip_to_timestamp(self): + self.stream.close() + query = "CREATE TABLE test_1 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + time.sleep(1) + query = "SELECT UNIX_TIMESTAMP();" + timestamp = self.execute(query).fetchone()[0] + query2 = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query2) + + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + skip_to_timestamp=timestamp, + ignored_events=self.ignoredEvents(), + ) + event = self.stream.fetchone() + self.assertIsInstance(event, QueryEvent) + self.assertEqual(event.query, query2) + class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self):