Skip to content

Commit b722d06

Browse files
author
Arthur Gautier
committed
Adds gtid ignore
1 parent 91c16a6 commit b722d06

File tree

5 files changed

+31
-4
lines changed

5 files changed

+31
-4
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,18 @@ class BinLogStreamReader(object):
2121

2222
def __init__(self, connection_settings={}, resume_stream=False,
2323
blocking=False, only_events=None, server_id=255,
24-
log_file=None, log_pos=None, filter_non_implemented_events=True):
24+
log_file=None, log_pos=None, filter_non_implemented_events=True,
25+
ignored_events=[]):
2526
"""
2627
Attributes:
2728
resume_stream: Start for event from position or the latest event of
2829
binlog or from older available event
2930
blocking: Read on stream is blocking
3031
only_events: Array of allowed events
32+
ignored_events: Array of ignoreded events
3133
log_file: Set replication start log file
3234
log_pos: Set replication start log pos
35+
auto_position: Use master_auto_position gtid to set position
3336
"""
3437
self.__connection_settings = connection_settings
3538
self.__connection_settings["charset"] = "utf8"
@@ -39,6 +42,7 @@ def __init__(self, connection_settings={}, resume_stream=False,
3942
self.__resume_stream = resume_stream
4043
self.__blocking = blocking
4144
self.__only_events = only_events
45+
self.__ignored_events = ignored_events
4246
self.__filter_non_implemented_events = filter_non_implemented_events
4347
self.__server_id = server_id
4448
self.__use_checksum = False
@@ -182,6 +186,10 @@ def __filter_event(self, event):
182186
if self.__filter_non_implemented_events and isinstance(event, NotImplementedEvent):
183187
return True
184188

189+
for ignored_event in self.__ignored_events:
190+
if isinstance(event, ignored_event):
191+
return True
192+
185193
if self.__only_events is not None:
186194
for allowed_event in self.__only_events:
187195
if isinstance(event, allowed_event):

pymysqlreplication/tests/base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
base = unittest.TestCase
1616

1717
class PyMySQLReplicationTestCase(base):
18+
def ignoredEvents(self):
19+
return []
20+
1821
def setUp(self):
1922
self.database = {
2023
"host": "localhost",
@@ -71,4 +74,5 @@ def resetBinLog(self):
7174
self.execute("RESET MASTER")
7275
if self.stream is not None:
7376
self.stream.close()
74-
self.stream = BinLogStreamReader(connection_settings=self.database)
77+
self.stream = BinLogStreamReader(connection_settings=self.database,
78+
ignored_events=self.ignoredEvents())

pymysqlreplication/tests/test_basic.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010

1111

1212
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
13+
def ignoredEvents(self):
14+
return [GtidEvent]
15+
1316
def test_read_query_event(self):
1417
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
1518
self.execute(query)
@@ -58,7 +61,8 @@ def test_reading_rotate_event(self):
5861
def test_connection_stream_lost_event(self):
5962
self.stream.close()
6063
self.stream = BinLogStreamReader(connection_settings=self.database,
61-
blocking=True)
64+
blocking=True,
65+
ignored_events=self.ignoredEvents())
6266

6367
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
6468
self.execute(query)
@@ -202,7 +206,8 @@ def test_log_pos(self):
202206
connection_settings=self.database,
203207
resume_stream=True,
204208
log_file=log_file,
205-
log_pos=log_pos
209+
log_pos=log_pos,
210+
ignored_events=self.ignoredEvents()
206211
)
207212

208213
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
@@ -243,6 +248,9 @@ def test_log_pos_handles_disconnects(self):
243248

244249

245250
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
251+
def ignoredEvents(self):
252+
return [GtidEvent]
253+
246254
def test_insert_multiple_row_event(self):
247255
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
248256
self.execute(query)

pymysqlreplication/tests/test_data_objects.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22

33
from pymysqlreplication.column import Column
44
from pymysqlreplication.table import Table
5+
from pymysqlreplication.event import GtidEvent
56

67
from pymysqlreplication.tests import base
78

89
__all__ = ["TestDataObjects"]
910

1011

1112
class TestDataObjects(base.PyMySQLReplicationTestCase):
13+
def ignoredEvents(self):
14+
return [GtidEvent]
15+
1216
def test_column_is_primary(self):
1317
col = Column(1,
1418
{"COLUMN_NAME": "test",

pymysqlreplication/tests/test_data_type.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616

1717
class TestDataType(base.PyMySQLReplicationTestCase):
18+
def ignoredEvents(self):
19+
return [GtidEvent]
20+
1821
def create_and_insert_value(self, create_query, insert_query):
1922
self.execute(create_query)
2023
self.execute(insert_query)

0 commit comments

Comments
 (0)