|
9 | 9 |
|
10 | 10 | from .packet import BinLogPacketWrapper
|
11 | 11 | from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
|
12 |
| -from .event import NotImplementedEvent |
13 | 12 | from .gtid import GtidSet
|
| 13 | +import event |
| 14 | +import row_event |
14 | 15 |
|
15 | 16 | try:
|
16 | 17 | from pymysql.constants.COMMAND import COM_BINLOG_DUMP_GTID
|
@@ -48,9 +49,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
|
48 | 49 | self.__connected_ctl = False
|
49 | 50 | self.__resume_stream = resume_stream
|
50 | 51 | self.__blocking = blocking
|
51 |
| - self.__only_events = only_events |
52 |
| - self.__ignored_events = ignored_events |
53 |
| - self.__filter_non_implemented_events = filter_non_implemented_events |
| 52 | + self.__only_events = self._allowed_event_list(only_events, ignored_events, filter_non_implemented_events) |
54 | 53 | self.__server_id = server_id
|
55 | 54 | self.__use_checksum = False
|
56 | 55 |
|
@@ -156,7 +155,7 @@ def __connect_to_stream(self):
|
156 | 155 | # A gtid set looks like:
|
157 | 156 | # 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
|
158 | 157 | # 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
|
159 |
| - # |
| 158 | + # |
160 | 159 | # In this particular gtid set, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
|
161 | 160 | # is the first member of the set, it is called a gtid.
|
162 | 161 | # In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid
|
@@ -242,35 +241,45 @@ def fetchone(self):
|
242 | 241 | # wrong table schema.
|
243 | 242 | # The fix is to rely on the fact that MySQL will also rotate to a new binlog file every time it
|
244 | 243 | # restarts. That means every rotation we see *could* be a sign of restart and so potentially
|
245 |
| - # invalidates all our cached table id to schema mappings. This means we have to load them all |
| 244 | + # invalidates all our cached table id to schema mappings. This means we have to load them all |
246 | 245 | # again for each logfile which is potentially wasted effort but we can't really do much better
|
247 | 246 | # without being broken in restart case
|
248 | 247 | self.table_map = {}
|
249 | 248 | elif binlog_event.log_pos:
|
250 | 249 | self.log_pos = binlog_event.log_pos
|
251 | 250 |
|
252 |
| - if self.__filter_event(binlog_event.event): |
| 251 | + # event is none if we have filter it on packet level |
| 252 | + # we filter also not allowed events |
| 253 | + if binlog_event.event is None or not (binlog_event.event.__class__ in self.__only_events): |
253 | 254 | continue
|
254 | 255 |
|
255 | 256 | return binlog_event.event
|
256 | 257 |
|
257 |
| - def __filter_event(self, event): |
258 |
| - if self.__filter_non_implemented_events and isinstance(event, NotImplementedEvent): |
259 |
| - return True |
260 |
| - |
261 |
| - if self.__ignored_events is not None: |
262 |
| - for ignored_event in self.__ignored_events: |
263 |
| - if isinstance(event, ignored_event): |
264 |
| - return True |
265 |
| - |
266 |
| - if self.__only_events is not None: |
267 |
| - for allowed_event in self.__only_events: |
268 |
| - if isinstance(event, allowed_event): |
269 |
| - return False |
270 |
| - else: |
271 |
| - return True |
272 |
| - |
273 |
| - return False |
| 258 | + def _allowed_event_list(self, only_events, ignored_events, filter_non_implemented_events): |
| 259 | + if only_events is not None: |
| 260 | + events = set(only_events) |
| 261 | + else: |
| 262 | + events = set(( |
| 263 | + event.QueryEvent, |
| 264 | + event.RotateEvent, |
| 265 | + event.FormatDescriptionEvent, |
| 266 | + event.XidEvent, |
| 267 | + event.GtidEvent, |
| 268 | + # row_event |
| 269 | + row_event.UpdateRowsEvent, |
| 270 | + row_event.WriteRowsEvent, |
| 271 | + row_event.DeleteRowsEvent, |
| 272 | + row_event.TableMapEvent, |
| 273 | + event.NotImplementedEvent)) |
| 274 | + if ignored_events is not None: |
| 275 | + for e in ignored_events: |
| 276 | + events.remove(e) |
| 277 | + if filter_non_implemented_events: |
| 278 | + try: |
| 279 | + events.remove(event.NotImplementedEvent) |
| 280 | + except KeyError: |
| 281 | + pass |
| 282 | + return frozenset(events) |
274 | 283 |
|
275 | 284 | def __get_table_information(self, schema, table):
|
276 | 285 | for i in range(1, 3):
|
|
0 commit comments