Skip to content

Commit 4294703

Browse files
author
Geert Vanderkelen
committed
Merge branch 'WL7955' into develop
2 parents cd74ae0 + dd8f9d2 commit 4294703

File tree

6 files changed

+194
-761
lines changed

6 files changed

+194
-761
lines changed

lib/mysql/connector/fabric/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
STATUS_PRIMARY, STATUS_SECONDARY,
5151
SCOPE_GLOBAL, SCOPE_LOCAL,
5252
Fabric, FabricConnection,
53-
MySQLFabricConnection
53+
MySQLFabricConnection,
54+
FabricSet,
5455
)
5556

5657

@@ -71,4 +72,5 @@ def connect(**kwargs):
7172
'Fabric',
7273
'FabricConnection',
7374
'MySQLFabricConnection',
75+
'FabricSet',
7476
]

lib/mysql/connector/fabric/caching.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class CacheShardTable(CacheEntry):
7878

7979
def __init__(self, shard, version=None, fabric_uuid=None):
8080
if not isinstance(shard, FabricShard):
81-
ValueError("shard argument must be a FabricShard instance")
81+
raise ValueError("shard argument must be a FabricShard instance")
8282
super(CacheShardTable, self).__init__(version=version,
8383
fabric_uuid=fabric_uuid)
8484
self.partitioning = {}
@@ -94,6 +94,21 @@ def add_partition(self, key, group):
9494
"""Add sharding information for a group"""
9595
if self.shard_type == 'RANGE':
9696
key = int(key)
97+
elif self.shard_type == 'RANGE_DATETIME':
98+
try:
99+
if ':' in key:
100+
key = datetime.strptime(key, "%Y-%m-%d %H:%M:%S")
101+
else:
102+
key = datetime.strptime(key, "%Y-%m-%d").date()
103+
except:
104+
raise ValueError(
105+
"RANGE_DATETIME key could not be parsed, was: {0}".format(
106+
key
107+
))
108+
else:
109+
raise ValueError("Unsupported sharding type {0}".format(
110+
self.shard_type
111+
))
97112
self.partitioning[key] = {
98113
'group': group,
99114
}

lib/mysql/connector/fabric/connection.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"""Implementing communication with MySQL Fabric"""
2525

2626
import sys
27+
import datetime
2728
import time
2829
import uuid
2930
from base64 import b16decode
@@ -121,7 +122,8 @@
121122
_CNX_PROPERTIES = {
122123
# name: ((valid_types), description, default)
123124
'group': ((str,), "Name of group of servers", None),
124-
'key': ((int, str), "Sharding key", None),
125+
'key': ((int, str, datetime.datetime, datetime.date),
126+
"Sharding key", None),
125127
'tables': ((tuple, list), "List of tables in query", None),
126128
'mode': ((int,), "Read-Only, Write-Only or Read-Write", MODE_READWRITE),
127129
'shard': ((str,), "Identity of the shard for direct connection", None),
@@ -753,6 +755,17 @@ def get_shard_server(self, tables, key, scope=SCOPE_LOCAL, mode=None):
753755
partitions = sorted(entry.partitioning.keys())
754756
index = partitions[bisect(partitions, int(key)) - 1]
755757
partition = entry.partitioning[index]
758+
elif entry.shard_type == 'RANGE_DATETIME':
759+
if not isinstance(key, (datetime.date, datetime.datetime)):
760+
raise ValueError(
761+
"Key must be datetime.date or datetime.datetime for "
762+
"RANGE_DATETIME")
763+
partition_keys = sorted(entry.partitioning.keys(), reverse=True)
764+
for partkey in partition_keys:
765+
if key >= partkey:
766+
index = partkey
767+
break
768+
partition = entry.partitioning[index]
756769
elif entry.shard_type == 'HASH':
757770
md5key = md5(str(key))
758771
partition_keys = sorted(
@@ -774,6 +787,36 @@ def get_shard_server(self, tables, key, scope=SCOPE_LOCAL, mode=None):
774787

775788
return self.get_group_server(groups[0], mode=mode)
776789

790+
def execute(self, group, command, *args, **kwargs):
791+
"""Execute a Fabric command from given group
792+
793+
This method will execute the given Fabric command from the given group
794+
using the given arguments. It returns an instance of FabricSet.
795+
796+
Raises ValueError when group.command is not valid and raises
797+
InterfaceError when an error occurs while executing.
798+
799+
Returns FabricSet.
800+
"""
801+
inst = self.get_instance()
802+
try:
803+
grp = getattr(inst.proxy, group)
804+
cmd = getattr(grp, command)
805+
except AttributeError as exc:
806+
raise ValueError("{group}.{command} not available ({err})".format(
807+
group=group, command=command, err=str(exc)))
808+
809+
fab_set = None
810+
try:
811+
data = cmd(*args, **kwargs)
812+
fab_set = FabricSet(data)
813+
except (Fault, socket.error, InterfaceError) as exc:
814+
msg = "Executing {group}.{command} failed: {error}".format(
815+
group=group, command=command, error=str(exc))
816+
raise InterfaceError(msg)
817+
818+
return fab_set
819+
777820

778821
class FabricConnection(object):
779822

tests/__init__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
TEST_BUILD_DIR = None
109109

110110
DJANGO_VERSION = None
111+
FABRIC_CONFIG = None
111112

112113
__all__ = [
113114
'MySQLConnectorTests',
@@ -205,9 +206,6 @@ def get_test_modules():
205206
# Skip django testing completely when Django is not available.
206207
LOGGER.warning("Django tests will not run: Django not available")
207208
continue
208-
if 'fabric' in module:
209-
LOGGER.warning("Fabric tests are disabled in full run")
210-
continue
211209
testcases.append(
212210
'tests.{module}'.format(module=module))
213211
LOGGER.debug('Added tests.{module}'.format(module=module))

0 commit comments

Comments
 (0)