Skip to content

Commit 3de8598

Browse files
Improve speed on filtering event
1 parent 93be38b commit 3de8598

File tree

2 files changed

+39
-24
lines changed

2 files changed

+39
-24
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99

1010
from .packet import BinLogPacketWrapper
1111
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
12-
from .event import NotImplementedEvent
1312
from .gtid import GtidSet
13+
import event
14+
import row_event
1415

1516
try:
1617
from pymysql.constants.COMMAND import COM_BINLOG_DUMP_GTID
@@ -48,9 +49,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
4849
self.__connected_ctl = False
4950
self.__resume_stream = resume_stream
5051
self.__blocking = blocking
51-
self.__only_events = only_events
52-
self.__ignored_events = ignored_events
53-
self.__filter_non_implemented_events = filter_non_implemented_events
52+
self.__only_events = self._allowed_event_list(only_events, ignored_events, filter_non_implemented_events)
5453
self.__server_id = server_id
5554
self.__use_checksum = False
5655

@@ -156,7 +155,7 @@ def __connect_to_stream(self):
156155
# A gtid set looks like:
157156
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
158157
# 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
159-
#
158+
#
160159
# In this particular gtid set, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
161160
# is the first member of the set, it is called a gtid.
162161
# In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid
@@ -242,35 +241,45 @@ def fetchone(self):
242241
# wrong table schema.
243242
# The fix is to rely on the fact that MySQL will also rotate to a new binlog file every time it
244243
# restarts. That means every rotation we see *could* be a sign of restart and so potentially
245-
# invalidates all our cached table id to schema mappings. This means we have to load them all
244+
# invalidates all our cached table id to schema mappings. This means we have to load them all
246245
# again for each logfile which is potentially wasted effort but we can't really do much better
247246
# without being broken in restart case
248247
self.table_map = {}
249248
elif binlog_event.log_pos:
250249
self.log_pos = binlog_event.log_pos
251250

252-
if self.__filter_event(binlog_event.event):
251+
# event is none if we have filter it on packet level
252+
# we filter also not allowed events
253+
if binlog_event.event is None or not (binlog_event.event.__class__ in self.__only_events):
253254
continue
254255

255256
return binlog_event.event
256257

257-
def __filter_event(self, event):
258-
if self.__filter_non_implemented_events and isinstance(event, NotImplementedEvent):
259-
return True
260-
261-
if self.__ignored_events is not None:
262-
for ignored_event in self.__ignored_events:
263-
if isinstance(event, ignored_event):
264-
return True
265-
266-
if self.__only_events is not None:
267-
for allowed_event in self.__only_events:
268-
if isinstance(event, allowed_event):
269-
return False
270-
else:
271-
return True
272-
273-
return False
258+
def _allowed_event_list(self, only_events, ignored_events, filter_non_implemented_events):
259+
if only_events is not None:
260+
events = set(only_events)
261+
else:
262+
events = set((
263+
event.QueryEvent,
264+
event.RotateEvent,
265+
event.FormatDescriptionEvent,
266+
event.XidEvent,
267+
event.GtidEvent,
268+
# row_event
269+
row_event.UpdateRowsEvent,
270+
row_event.WriteRowsEvent,
271+
row_event.DeleteRowsEvent,
272+
row_event.TableMapEvent,
273+
event.NotImplementedEvent))
274+
if ignored_events is not None:
275+
for e in ignored_events:
276+
events.remove(e)
277+
if filter_non_implemented_events:
278+
try:
279+
events.remove(event.NotImplementedEvent)
280+
except KeyError:
281+
pass
282+
return frozenset(events)
274283

275284
def __get_table_information(self, schema, table):
276285
for i in range(1, 3):

pymysqlreplication/tests/test_basic.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
1313
def ignoredEvents(self):
1414
return [GtidEvent]
1515

16+
def test_allowed_event_list(self):
17+
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 10)
18+
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 9)
19+
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 9)
20+
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
21+
1622
def test_read_query_event(self):
1723
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
1824
self.execute(query)

0 commit comments

Comments
 (0)