Skip to content

Commit 59be3a5

Browse files
author
Geert Vanderkelen
committed
Disabled Fabric tests; updated Fabric code
1 parent d7397b0 commit 59be3a5

File tree

3 files changed

+93
-25
lines changed

3 files changed

+93
-25
lines changed

cpyint

lib/mysql/connector/fabric/connection.py

Lines changed: 89 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from hashlib import md5
3232
import logging
3333
import socket
34+
import collections
3435

3536
# pylint: disable=F0401,E0611
3637
try:
@@ -135,6 +136,64 @@
135136

136137
_LOGGER = logging.getLogger('myconnpy-fabric')
137138

139+
class FabricResponse(object):
140+
"""Class used to parse a response got from Fabric.
141+
"""
142+
143+
SUPPORTED_VERSION = 1
144+
145+
def __init__(self, data):
146+
"""Initialize the FabricResponse object
147+
"""
148+
(format_version, fabric_uuid_str, ttl, error, rows) = data
149+
if error:
150+
raise InterfaceError(error)
151+
if format_version != FabricResponse.SUPPORTED_VERSION:
152+
raise InterfaceError(
153+
"Supported protocol has version {sversion}. Got a response "
154+
"from MySQL Fabric with version {gversion}.".format(
155+
sversion=FabricResponse.SUPPORTED_VERSION,
156+
gversion=format_version)
157+
)
158+
self.format_version = format_version
159+
self.fabric_uuid_str = fabric_uuid_str
160+
self.ttl = ttl
161+
self.coded_rows = rows
162+
163+
class FabricSet(FabricResponse):
164+
"""Iterator to navigate through the result set returned from Fabric
165+
"""
166+
def __init__(self, data):
167+
"""Initialize the FabricSet object.
168+
"""
169+
super(FabricSet, self).__init__(data)
170+
assert len(self.coded_rows) == 1
171+
self.__names = self.coded_rows[0]['info']['names']
172+
self.__rows = self.coded_rows[0]['rows']
173+
assert all(len(self.__names) == len(row) for row in self.__rows) or \
174+
len(self.__rows) == 0
175+
self.__result = collections.namedtuple('ResultSet', self.__names)
176+
177+
def rowcount(self):
178+
"""The number of rows in the result set.
179+
"""
180+
return len(self.__rows)
181+
182+
def rows(self):
183+
"""Iterate over the rows of the result set.
184+
185+
Each row is a named tuple.
186+
"""
187+
for row in self.__rows:
188+
yield self.__result(*row)
189+
190+
def row(self, index):
191+
"""Indexing method for a row.
192+
193+
Each row is a named tuple.
194+
"""
195+
return self.__result(*self.__rows[index])
196+
138197

139198
def extra_failure_report(error_codes):
140199
"""Add MySQL error to be reported to Fabric
@@ -448,8 +507,10 @@ def report_failure(self, server_uuid, errno):
448507
server_uuid)
449508
inst = self.get_instance()
450509
try:
451-
inst.proxy.threat.report_failure(server_uuid, current_host,
452-
errno)
510+
data = inst.proxy.threat.report_failure(
511+
server_uuid, current_host, errno
512+
)
513+
FabricResponse(data)
453514
except (Fault, socket.error) as exc:
454515
_LOGGER.debug("Failed reporting server to Fabric (%s)",
455516
str(exc))
@@ -473,15 +534,10 @@ def get_fabric_servers(self, fabric_cnx=None):
473534
result = []
474535
err_msg = "Looking up Fabric servers failed using {host}:{port}: {err}"
475536
try:
476-
(fabric_uuid_str, fabric_version, ttl, addr_list) = \
477-
inst.proxy.dump.fabric_nodes()
478-
for addr in addr_list:
479-
try:
480-
host, port = addr.split(':', 2)
481-
port = int(port)
482-
except ValueError:
483-
host, port = addr, MYSQL_FABRIC_PORT
484-
result.append({'host': host, 'port': port})
537+
data = inst.proxy.dump.fabric_nodes('protocol.xmlrpc')
538+
fset = FabricSet(data)
539+
for row in fset.rows():
540+
result.append({'host': row.host, 'port': row.port})
485541
except (Fault, socket.error) as exc:
486542
msg = err_msg.format(err=str(exc), host=inst.host, port=inst.port)
487543
raise InterfaceError(msg)
@@ -492,11 +548,13 @@ def get_fabric_servers(self, fabric_cnx=None):
492548
raise InterfaceError(msg)
493549

494550
try:
495-
fabric_uuid = uuid.UUID('{' + fabric_uuid_str + '}')
551+
fabric_uuid = uuid.UUID(fset.fabric_uuid_str)
496552
except TypeError:
497553
fabric_uuid = uuid.uuid4()
498554

499-
return fabric_uuid, fabric_version, ttl, result
555+
fabric_version = 0
556+
557+
return fabric_uuid, fabric_version, fset.ttl, result
500558

501559
def get_group_servers(self, group, use_cache=True):
502560
"""Get all MySQL servers in a group
@@ -519,19 +577,21 @@ def get_group_servers(self, group, use_cache=True):
519577
inst = self.get_instance()
520578
result = []
521579
try:
522-
servers = inst.proxy.dump.servers(
523-
self._version_token, group)[3]
580+
data = inst.proxy.dump.servers(self._version_token, group)
581+
fset = FabricSet(data)
524582
except (Fault, socket.error) as exc:
525583
msg = ("Looking up MySQL servers failed for group "
526584
"{group}: {error}").format(error=str(exc), group=group)
527585
raise InterfaceError(msg)
528586

529587
weights = []
530-
for server in servers:
588+
for row in fset.rows():
531589
# We make sure, when using local groups, we skip the global group
532-
if server[1] == group:
533-
server[3] = int(server[3]) # port should be an int
534-
mysqlserver = FabricMySQLServer(*server)
590+
if row.group_id == group:
591+
mysqlserver = FabricMySQLServer(
592+
row.server_uuid, row.group_id, row.host, row.port,
593+
row.mode, row.status, row.weight
594+
)
535595
result.append(mysqlserver)
536596
if mysqlserver.status == STATUS_SECONDARY:
537597
weights.append((mysqlserver.uuid, mysqlserver.weight))
@@ -639,17 +699,22 @@ def get_sharding_information(self, tables=None, database=None):
639699
patterns.append("{0}.{1}".format(dbase, tbl))
640700

641701
inst = self.get_instance()
642-
643702
try:
644-
result = inst.proxy.dump.sharding_information(
645-
self._version_token, ','.join(patterns))
703+
data = inst.proxy.dump.sharding_information(
704+
self._version_token, ','.join(patterns)
705+
)
706+
fset = FabricSet(data)
646707
except (Fault, socket.error) as exc:
647708
msg = "Looking up sharding information failed : {error}".format(
648709
error=str(exc))
649710
raise InterfaceError(msg)
650711

651-
for info in result[3]:
652-
self._cache.sharding_cache_table(FabricShard(*info))
712+
for row in fset.rows():
713+
self._cache.sharding_cache_table(
714+
FabricShard(row.schema_name, row.table_name, row.column_name,
715+
row.lower_bound, row.shard_id, row.type_name,
716+
row.group_id, row.global_group)
717+
)
653718

654719
def get_shard_server(self, tables, key, scope=SCOPE_LOCAL, mode=None):
655720
"""Get MySQL server information for a particular shard

tests/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ def get_test_modules():
205205
# Skip django testing completely when Django is not available.
206206
LOGGER.warning("Django tests will not run: Django not available")
207207
continue
208+
if 'fabric' in module:
209+
LOGGER.warning("Fabric tests are disabled in full run")
210+
continue
208211
testcases.append(
209212
'tests.{module}'.format(module=module))
210213
LOGGER.debug('Added tests.{module}'.format(module=module))

0 commit comments

Comments
 (0)