Skip to content

Commit 5a8d75f

Browse files
Filter schemas
1 parent ad88669 commit 5a8d75f

File tree

4 files changed

+19
-5
lines changed

4 files changed

+19
-5
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
3131
blocking=False, only_events=None, log_file=None, log_pos=None,
3232
filter_non_implemented_events=True,
3333
ignored_events=None, auto_position=None,
34-
only_tables = None):
34+
only_tables = None, only_schemas = None):
3535
"""
3636
Attributes:
3737
resume_stream: Start for event from position or the latest event of
@@ -43,6 +43,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
4343
log_pos: Set replication start log pos
4444
auto_position: Use master_auto_position gtid to set position
4545
only_tables: An array with the tables you want to watch
46+
only_schemas: An array with the schemas you want to watch
4647
"""
4748
self.__connection_settings = connection_settings
4849
self.__connection_settings["charset"] = "utf8"
@@ -53,6 +54,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
5354
self.__blocking = blocking
5455

5556
self.__only_tables = only_tables
57+
self.__only_schemas = only_schemas
5658
self.__allowed_events = self._allowed_event_list(only_events, ignored_events, filter_non_implemented_events)
5759

5860
# We can't filter on packet level TABLE_MAP and rotate event because we need
@@ -238,7 +240,8 @@ def fetchone(self):
238240
self._ctl_connection,
239241
self.__use_checksum,
240242
allowed_events = self.__allowed_events_in_packet,
241-
only_tables = self.__only_tables)
243+
only_tables = self.__only_tables,
244+
only_schemas = self.__only_schemas)
242245
if binlog_event.event_type == TABLE_MAP_EVENT:
243246
self.table_map[binlog_event.event.table_id] = \
244247
binlog_event.event.get_table()

pymysqlreplication/event.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88

99
class BinLogEvent(object):
10-
def __init__(self, from_packet, event_size, table_map, ctl_connection, only_tables = None):
10+
def __init__(self, from_packet, event_size, table_map, ctl_connection,
11+
only_tables = None,
12+
only_schemas = None):
1113
self.packet = from_packet
1214
self.table_map = table_map
1315
self.event_type = self.packet.event_type

pymysqlreplication/packet.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ class BinLogPacketWrapper(object):
4848

4949
}
5050

51-
def __init__(self, from_packet, table_map, ctl_connection, use_checksum, allowed_events = None, only_tables = None):
51+
def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
52+
allowed_events = None,
53+
only_tables = None,
54+
only_schemas = None):
5255
# -1 because we ignore the ok byte
5356
self.read_bytes = 0
5457
# Used when we want to override a value in the data buffer
@@ -86,7 +89,9 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum, allowed
8689
if event_class not in allowed_events:
8790
return
8891
self.event = event_class(self, event_size_without_header, table_map,
89-
ctl_connection, only_tables = only_tables)
92+
ctl_connection,
93+
only_tables = only_tables,
94+
only_schemas = only_schemas)
9095
if self.event._processed == False:
9196
self.event = None
9297

pymysqlreplication/row_event.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
1919
ctl_connection, **kwargs)
2020
self.__rows = None
2121
self.__only_tables = kwargs["only_tables"]
22+
self.__only_schemas = kwargs["only_schemas"]
2223

2324
#Header
2425
self.table_id = self._read_table_id()
@@ -31,6 +32,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
3132
if self.__only_tables is not None and self.table not in self.__only_tables:
3233
self._processed = False
3334
return
35+
if self.__only_schemas is not None and self.schema not in self.__only_schemas:
36+
self._processed = False
37+
return
3438

3539
#Event V2
3640
if self.event_type == BINLOG.WRITE_ROWS_EVENT_V2 or \

0 commit comments

Comments
 (0)