31
31
from hashlib import md5
32
32
import logging
33
33
import socket
34
+ import collections
34
35
35
36
# pylint: disable=F0401,E0611
36
37
try :
135
136
136
137
_LOGGER = logging .getLogger ('myconnpy-fabric' )
137
138
139
+ class FabricResponse (object ):
140
+ """Class used to parse a response got from Fabric.
141
+ """
142
+
143
+ SUPPORTED_VERSION = 1
144
+
145
+ def __init__ (self , data ):
146
+ """Initialize the FabricResponse object
147
+ """
148
+ (format_version , fabric_uuid_str , ttl , error , rows ) = data
149
+ if error :
150
+ raise InterfaceError (error )
151
+ if format_version != FabricResponse .SUPPORTED_VERSION :
152
+ raise InterfaceError (
153
+ "Supported protocol has version {sversion}. Got a response "
154
+ "from MySQL Fabric with version {gversion}." .format (
155
+ sversion = FabricResponse .SUPPORTED_VERSION ,
156
+ gversion = format_version )
157
+ )
158
+ self .format_version = format_version
159
+ self .fabric_uuid_str = fabric_uuid_str
160
+ self .ttl = ttl
161
+ self .coded_rows = rows
162
+
163
+ class FabricSet (FabricResponse ):
164
+ """Iterator to navigate through the result set returned from Fabric
165
+ """
166
+ def __init__ (self , data ):
167
+ """Initialize the FabricSet object.
168
+ """
169
+ super (FabricSet , self ).__init__ (data )
170
+ assert len (self .coded_rows ) == 1
171
+ self .__names = self .coded_rows [0 ]['info' ]['names' ]
172
+ self .__rows = self .coded_rows [0 ]['rows' ]
173
+ assert all (len (self .__names ) == len (row ) for row in self .__rows ) or \
174
+ len (self .__rows ) == 0
175
+ self .__result = collections .namedtuple ('ResultSet' , self .__names )
176
+
177
+ def rowcount (self ):
178
+ """The number of rows in the result set.
179
+ """
180
+ return len (self .__rows )
181
+
182
+ def rows (self ):
183
+ """Iterate over the rows of the result set.
184
+
185
+ Each row is a named tuple.
186
+ """
187
+ for row in self .__rows :
188
+ yield self .__result (* row )
189
+
190
+ def row (self , index ):
191
+ """Indexing method for a row.
192
+
193
+ Each row is a named tuple.
194
+ """
195
+ return self .__result (* self .__rows [index ])
196
+
138
197
139
198
def extra_failure_report (error_codes ):
140
199
"""Add MySQL error to be reported to Fabric
@@ -448,8 +507,10 @@ def report_failure(self, server_uuid, errno):
448
507
server_uuid )
449
508
inst = self .get_instance ()
450
509
try :
451
- inst .proxy .threat .report_failure (server_uuid , current_host ,
452
- errno )
510
+ data = inst .proxy .threat .report_failure (
511
+ server_uuid , current_host , errno
512
+ )
513
+ FabricResponse (data )
453
514
except (Fault , socket .error ) as exc :
454
515
_LOGGER .debug ("Failed reporting server to Fabric (%s)" ,
455
516
str (exc ))
@@ -473,15 +534,10 @@ def get_fabric_servers(self, fabric_cnx=None):
473
534
result = []
474
535
err_msg = "Looking up Fabric servers failed using {host}:{port}: {err}"
475
536
try :
476
- (fabric_uuid_str , fabric_version , ttl , addr_list ) = \
477
- inst .proxy .dump .fabric_nodes ()
478
- for addr in addr_list :
479
- try :
480
- host , port = addr .split (':' , 2 )
481
- port = int (port )
482
- except ValueError :
483
- host , port = addr , MYSQL_FABRIC_PORT
484
- result .append ({'host' : host , 'port' : port })
537
+ data = inst .proxy .dump .fabric_nodes ('protocol.xmlrpc' )
538
+ fset = FabricSet (data )
539
+ for row in fset .rows ():
540
+ result .append ({'host' : row .host , 'port' : row .port })
485
541
except (Fault , socket .error ) as exc :
486
542
msg = err_msg .format (err = str (exc ), host = inst .host , port = inst .port )
487
543
raise InterfaceError (msg )
@@ -492,11 +548,13 @@ def get_fabric_servers(self, fabric_cnx=None):
492
548
raise InterfaceError (msg )
493
549
494
550
try :
495
- fabric_uuid = uuid .UUID ('{' + fabric_uuid_str + '}' )
551
+ fabric_uuid = uuid .UUID (fset . fabric_uuid_str )
496
552
except TypeError :
497
553
fabric_uuid = uuid .uuid4 ()
498
554
499
- return fabric_uuid , fabric_version , ttl , result
555
+ fabric_version = 0
556
+
557
+ return fabric_uuid , fabric_version , fset .ttl , result
500
558
501
559
def get_group_servers (self , group , use_cache = True ):
502
560
"""Get all MySQL servers in a group
@@ -519,19 +577,21 @@ def get_group_servers(self, group, use_cache=True):
519
577
inst = self .get_instance ()
520
578
result = []
521
579
try :
522
- servers = inst .proxy .dump .servers (
523
- self . _version_token , group )[ 3 ]
580
+ data = inst .proxy .dump .servers (self . _version_token , group )
581
+ fset = FabricSet ( data )
524
582
except (Fault , socket .error ) as exc :
525
583
msg = ("Looking up MySQL servers failed for group "
526
584
"{group}: {error}" ).format (error = str (exc ), group = group )
527
585
raise InterfaceError (msg )
528
586
529
587
weights = []
530
- for server in servers :
588
+ for row in fset . rows () :
531
589
# We make sure, when using local groups, we skip the global group
532
- if server [1 ] == group :
533
- server [3 ] = int (server [3 ]) # port should be an int
534
- mysqlserver = FabricMySQLServer (* server )
590
+ if row .group_id == group :
591
+ mysqlserver = FabricMySQLServer (
592
+ row .server_uuid , row .group_id , row .host , row .port ,
593
+ row .mode , row .status , row .weight
594
+ )
535
595
result .append (mysqlserver )
536
596
if mysqlserver .status == STATUS_SECONDARY :
537
597
weights .append ((mysqlserver .uuid , mysqlserver .weight ))
@@ -639,17 +699,22 @@ def get_sharding_information(self, tables=None, database=None):
639
699
patterns .append ("{0}.{1}" .format (dbase , tbl ))
640
700
641
701
inst = self .get_instance ()
642
-
643
702
try :
644
- result = inst .proxy .dump .sharding_information (
645
- self ._version_token , ',' .join (patterns ))
703
+ data = inst .proxy .dump .sharding_information (
704
+ self ._version_token , ',' .join (patterns )
705
+ )
706
+ fset = FabricSet (data )
646
707
except (Fault , socket .error ) as exc :
647
708
msg = "Looking up sharding information failed : {error}" .format (
648
709
error = str (exc ))
649
710
raise InterfaceError (msg )
650
711
651
- for info in result [3 ]:
652
- self ._cache .sharding_cache_table (FabricShard (* info ))
712
+ for row in fset .rows ():
713
+ self ._cache .sharding_cache_table (
714
+ FabricShard (row .schema_name , row .table_name , row .column_name ,
715
+ row .lower_bound , row .shard_id , row .type_name ,
716
+ row .group_id , row .global_group )
717
+ )
653
718
654
719
def get_shard_server (self , tables , key , scope = SCOPE_LOCAL , mode = None ):
655
720
"""Get MySQL server information for a particular shard
0 commit comments