Skip to content

Commit d6394f6

Browse files
Merge pull request julien-duponchelle#43 from noplay/handle_ctl_disconnects
Handle disconnects for the ctl connection
2 parents eb41f5a + a40ceb8 commit d6394f6

File tree

2 files changed

+30
-17
lines changed

2 files changed

+30
-17
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def __connect_to_ctl(self):
5757
self._ctl_connection_settings["cursorclass"] = \
5858
pymysql.cursors.DictCursor
5959
self._ctl_connection = pymysql.connect(**self._ctl_connection_settings)
60+
self._ctl_connection._get_table_information = self.__get_table_information
6061
self.__connected_ctl = True
6162

6263
def __connect_to_stream(self):
@@ -117,8 +118,7 @@ def fetchone(self):
117118
if not pkt.is_ok_packet():
118119
continue
119120

120-
binlog_event = BinLogPacketWrapper(pkt, self.table_map,
121-
self._ctl_connection)
121+
binlog_event = BinLogPacketWrapper(pkt, self.table_map, self._ctl_connection)
122122

123123
if binlog_event.event_type == TABLE_MAP_EVENT:
124124
self.table_map[binlog_event.event.table_id] = \
@@ -146,5 +146,32 @@ def __filter_event(self, event):
146146
return True
147147
return False
148148

149+
def __get_table_information(self, schema, table):
150+
for i in range(1, 3):
151+
try:
152+
if not self.__connected_ctl:
153+
self.__connect_to_ctl()
154+
155+
cur = self._ctl_connection.cursor()
156+
cur.execute("""
157+
SELECT
158+
COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
159+
COLUMN_COMMENT, COLUMN_TYPE
160+
FROM
161+
columns
162+
WHERE
163+
table_schema = %s AND table_name = %s
164+
""", (schema, table))
165+
166+
return cur.fetchall()
167+
except pymysql.OperationalError as error:
168+
code, message = error.args
169+
# 2013: Connection Lost
170+
if code == 2013:
171+
self.__connected_ctl = False
172+
continue
173+
else:
174+
raise error
175+
149176
def __iter__(self):
150177
return iter(self.fetchone, None)

pymysqlreplication/row_event.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection):
477477
if self.table_id in table_map:
478478
self.column_schemas = table_map[self.table_id].column_schemas
479479
else:
480-
self.column_schemas = self.__get_table_information(self.schema,
481-
self.table)
480+
self.column_schemas = self._ctl_connection._get_table_information(self.schema, self.table)
482481

483482
# Read columns meta data
484483
column_types = list(self.packet.read(self.column_count))
@@ -495,19 +494,6 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection):
495494
# TODO: get this informations instead of trashing data
496495
# n NULL-bitmask, length: (column-length * 8) / 7
497496

498-
def __get_table_information(self, schema, table):
499-
cur = self._ctl_connection.cursor()
500-
cur.execute("""
501-
SELECT
502-
COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
503-
COLUMN_COMMENT, COLUMN_TYPE
504-
FROM
505-
columns
506-
WHERE
507-
table_schema = %s AND table_name = %s
508-
""", (schema, table))
509-
return cur.fetchall()
510-
511497
def get_table(self):
512498
return self.table_obj
513499

0 commit comments

Comments
 (0)