@@ -57,6 +57,7 @@ def __connect_to_ctl(self):
57
57
self ._ctl_connection_settings ["cursorclass" ] = \
58
58
pymysql .cursors .DictCursor
59
59
self ._ctl_connection = pymysql .connect (** self ._ctl_connection_settings )
60
+ self ._ctl_connection ._get_table_information = self .__get_table_information
60
61
self .__connected_ctl = True
61
62
62
63
def __connect_to_stream (self ):
@@ -117,8 +118,7 @@ def fetchone(self):
117
118
if not pkt .is_ok_packet ():
118
119
continue
119
120
120
- binlog_event = BinLogPacketWrapper (pkt , self .table_map ,
121
- self ._ctl_connection )
121
+ binlog_event = BinLogPacketWrapper (pkt , self .table_map , self ._ctl_connection )
122
122
123
123
if binlog_event .event_type == TABLE_MAP_EVENT :
124
124
self .table_map [binlog_event .event .table_id ] = \
@@ -146,5 +146,32 @@ def __filter_event(self, event):
146
146
return True
147
147
return False
148
148
149
+ def __get_table_information (self , schema , table ):
150
+ for i in range (1 , 3 ):
151
+ try :
152
+ if not self .__connected_ctl :
153
+ self .__connect_to_ctl ()
154
+
155
+ cur = self ._ctl_connection .cursor ()
156
+ cur .execute ("""
157
+ SELECT
158
+ COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
159
+ COLUMN_COMMENT, COLUMN_TYPE
160
+ FROM
161
+ columns
162
+ WHERE
163
+ table_schema = %s AND table_name = %s
164
+ """ , (schema , table ))
165
+
166
+ return cur .fetchall ()
167
+ except pymysql .OperationalError as error :
168
+ code , message = error .args
169
+ # 2013: Connection Lost
170
+ if code == 2013 :
171
+ self .__connected_ctl = False
172
+ continue
173
+ else :
174
+ raise error
175
+
149
176
def __iter__ (self ):
150
177
return iter (self .fetchone , None )
0 commit comments