Skip to content

Commit f3860e7

Browse files
committed
DevAPI: XSession accepts a list of routers
XSession now accepts a list of routers. Based on the priority assigned to each of the routers, it cycles through the list of routers, and connects to one which is available. In the future, this will be used to failover during query execution. Tests were added for different scenarios.
1 parent 63b5977 commit f3860e7

File tree

3 files changed

+250
-29
lines changed

3 files changed

+250
-29
lines changed

lib/mysqlx/__init__.py

Lines changed: 106 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
"""MySQL X DevAPI Python implementation"""
2525

26+
import re
27+
2628
from .compat import STRING_TYPES, urlparse
2729
from .connection import XSession, NodeSession
2830
from .crud import Schema, Collection, Table
@@ -40,6 +42,100 @@
4042
DropCollectionIndexStatement)
4143

4244

45+
def _parse_address_list(address_list):
46+
"""Parses a list of host, port pairs
47+
48+
Args:
49+
address_list: String containing a list of routers or just router
50+
51+
Returns:
52+
Returns a dict with parsed values of host, port and priority if
53+
specified.
54+
"""
55+
is_list = re.compile(r'^\[(?![^,]*\]).*]$')
56+
hst_list = re.compile(r',(?![^\(\)]*\))')
57+
pri_addr = re.compile(r'^\(address\s*=\s*(?P<address>.+)\s*,\s*priority\s*=\s*(?P<priority>\d+)\)$')
58+
59+
routers = []
60+
if is_list.match(address_list):
61+
address_list = address_list.strip("[]")
62+
address_list = hst_list.split(address_list)
63+
else:
64+
match = urlparse("//{0}".format(address_list))
65+
return {
66+
"host": match.hostname,
67+
"port": match.port
68+
}
69+
70+
while address_list:
71+
router = {}
72+
address = address_list.pop(0).strip()
73+
match = pri_addr.match(address)
74+
if match:
75+
address = match.group("address").strip()
76+
router["priority"] = int(match.group("priority"))
77+
78+
match = urlparse("//{0}".format(address))
79+
if not match.hostname:
80+
raise InterfaceError("Invalid address: {0}".format(address))
81+
82+
router["host"] = match.hostname
83+
router["port"] = match.port
84+
routers.append(router)
85+
86+
return { "routers": routers }
87+
88+
def _parse_connection_uri(uri):
89+
"""Parses the connection string and returns a dictionary with the
90+
connection settings.
91+
92+
Args:
93+
uri: mysqlx URI scheme to connect to a MySQL server/farm.
94+
95+
Returns:
96+
Returns a dict with parsed values of credentials and address of the
97+
MySQL server/farm.
98+
"""
99+
settings = {}
100+
101+
uri = "{0}{1}".format("" if uri.startswith("mysqlx://")
102+
else "mysqlx://", uri)
103+
parsed = urlparse(uri)
104+
if parsed.hostname is None or parsed.username is None \
105+
or parsed.password is None:
106+
raise InterfaceError("Malformed URI '{0}'".format(uri))
107+
settings = {
108+
"user": parsed.username,
109+
"password": parsed.password,
110+
"schema": parsed.path.lstrip("/")
111+
}
112+
113+
settings.update(_parse_address_list(parsed.netloc.split("@")[-1]))
114+
return settings
115+
116+
def _validate_settings(settings):
117+
"""Validates the settings to be passed to a Session object
118+
the port values are converted to int if specified or set to 33060
119+
otherwise. The priority values for each router is converted to int
120+
if specified.
121+
122+
Args:
123+
settings: dict containing connection settings.
124+
"""
125+
if "priority" in settings and settings["priority"]:
126+
try:
127+
settings["priority"] = int(settings["priority"])
128+
except NameError:
129+
raise InterfaceError("Invalid priority")
130+
131+
if "port" in settings and settings["port"]:
132+
try:
133+
settings["port"] = int(settings["port"])
134+
except NameError:
135+
raise InterfaceError("Invalid port")
136+
else:
137+
settings["port"] = 33060
138+
43139
def _get_connection_settings(*args, **kwargs):
44140
"""Parses the connection string and returns a dictionary with the
45141
connection settings.
@@ -57,38 +153,23 @@ def _get_connection_settings(*args, **kwargs):
57153
settings = {}
58154
if args:
59155
if isinstance(args[0], STRING_TYPES):
60-
uri = "{0}{1}".format("" if args[0].startswith("mysqlx://")
61-
else "mysqlx://", args[0])
62-
parsed = urlparse(uri)
63-
if parsed.hostname is None or parsed.username is None \
64-
or parsed.password is None:
65-
raise InterfaceError("Malformed URI '{0}'".format(args[0]))
66-
settings = {
67-
"host": parsed.hostname,
68-
"port": parsed.port,
69-
"user": parsed.username,
70-
"password": parsed.password,
71-
"schema": parsed.path.lstrip("/")
72-
}
156+
settings = _parse_connection_uri(args[0])
73157
elif isinstance(args[0], dict):
74-
settings = args[0]
158+
settings.update(args[0])
75159
elif kwargs:
76-
settings = kwargs
160+
settings.update(kwargs)
77161

78162
if not settings:
79163
raise InterfaceError("Settings not provided")
80164

81-
if "port" in settings and settings["port"]:
82-
try:
83-
settings["port"] = int(settings["port"])
84-
except NameError:
85-
raise InterfaceError("Invalid port")
165+
if "routers" in settings:
166+
for router in settings.get("routers"):
167+
_validate_settings(router)
86168
else:
87-
settings["port"] = 33060
169+
_validate_settings(settings)
88170

89171
return settings
90172

91-
92173
def get_session(*args, **kwargs):
93174
"""Creates a XSession instance using the provided connection data.
94175
@@ -120,6 +201,9 @@ def get_node_session(*args, **kwargs):
120201
mysqlx.XSession: XSession object.
121202
"""
122203
settings = _get_connection_settings(*args, **kwargs)
204+
if "routers" in settings:
205+
raise InterfaceError("NodeSession expects only one pair of host and port")
206+
123207
return NodeSession(settings)
124208

125209

lib/mysqlx/connection.py

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ def close(self):
6969

7070
class Connection(object):
7171
def __init__(self, settings):
72-
self._host = settings.get("host", "localhost")
73-
self._port = settings.get("port", 33060)
7472
self._user = settings.get("user")
7573
self._password = settings.get("password")
7674
self._schema = settings.get("schema")
@@ -85,8 +83,8 @@ def fetch_active_result(self):
8583
self._active_result.fetch_all()
8684
self._active_result = None
8785

88-
def connect(self):
89-
self.stream.connect(self._host, self._port)
86+
def connect(self, host, port):
87+
self.stream.connect(host, port)
9088
self.reader_writer = MessageReaderWriter(self.stream)
9189
self.protocol = Protocol(self.reader_writer)
9290
self._handle_capabilities()
@@ -152,6 +150,75 @@ def close(self):
152150
self.stream.close()
153151

154152

153+
class XConnection(Connection):
154+
def __init__(self, settings):
155+
super(XConnection, self).__init__(settings)
156+
self._routers = settings.get("routers", [])
157+
158+
if 'host' in settings and settings['host']:
159+
self._routers.append({
160+
'host': settings.pop('host'),
161+
'port': settings.pop('port', None)
162+
})
163+
164+
self._ensure_priorities()
165+
self._routers.sort(key=lambda x: x['priority'], reverse=True)
166+
167+
def _ensure_priorities(self):
168+
priority_count = 0
169+
priority = 100
170+
171+
for router in self._routers:
172+
pri = router.get('priority', None)
173+
if pri is None:
174+
priority_count += 1
175+
router["priority"] = priority
176+
elif pri > 100:
177+
raise ProgrammingError("The priorities must be between 0 and "
178+
"100", 4007)
179+
priority -= 1
180+
181+
if 0 < priority_count < len(self._routers):
182+
raise ProgrammingError("You must either assign no priority to any "
183+
"of the routers or give a priority for every router", 4000)
184+
185+
def connect(self):
186+
# Reset all router availibilty
187+
if not all(x.get('available', None) for x in self._routers):
188+
for router in self._routers:
189+
router['available'] = True
190+
191+
# Loop and check
192+
error = None
193+
for router in self._routers:
194+
try:
195+
super(XConnection, self).connect(router.get("host"),
196+
router.get("port"))
197+
return
198+
except socket.error as err:
199+
router['available'] = False
200+
error = err
201+
202+
if len(self._routers) > 1:
203+
raise InterfaceError("Failed to connect to any of the routers.",
204+
4001)
205+
else:
206+
raise InterfaceError("Cannot connect to host: {0}".format(error))
207+
208+
209+
class NodeConnection(Connection):
210+
def __init__(self, settings):
211+
super(NodeConnection, self).__init__(settings)
212+
self._host = settings.get("host", "localhost")
213+
self._port = settings.get("port", 33060)
214+
215+
def connect(self):
216+
try:
217+
super(NodeConnection, self).connect(self._host, self._port)
218+
except socket.error as err:
219+
raise InterfaceError("Cannot connect to host: {0}".format(err))
220+
221+
155222
class BaseSession(object):
156223
"""Base functionality for Session classes through the X Protocol.
157224
@@ -168,8 +235,6 @@ class BaseSession(object):
168235
"""
169236
def __init__(self, settings):
170237
self._settings = settings
171-
self._connection = Connection(self._settings)
172-
self._connection.connect()
173238

174239
def is_open(self):
175240
return self._connection.stream._socket is not None
@@ -256,6 +321,8 @@ class XSession(BaseSession):
256321
"""
257322
def __init__(self, settings):
258323
super(XSession, self).__init__(settings)
324+
self._connection = XConnection(self._settings)
325+
self._connection.connect()
259326

260327

261328
class NodeSession(BaseSession):
@@ -274,6 +341,8 @@ class NodeSession(BaseSession):
274341
"""
275342
def __init__(self, settings):
276343
super(NodeSession, self).__init__(settings)
344+
self._connection = NodeConnection(self._settings)
345+
self._connection.connect()
277346

278347
def sql(self, sql):
279348
"""Creates a :class:`mysqlx.SqlStatement` object to allow running the

tests/test_mysqlx_connection.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,19 @@
6868
)
6969

7070

71+
_ROUTER_LIST_RESULTS = ( # (uri, result)
72+
("áé'í'óú:unicode@127.0.0.1", {"schema": "", "host": "127.0.0.1", "port": 33060,
73+
"password": "unicode", "user": "áé'í'óú"}),
74+
("user:password@[127.0.0.1, localhost]", {"schema": "", "routers":
75+
[{"host": "127.0.0.1", "port": 33060}, {"host": "localhost", "port":
76+
33060}], "password": "password", "user": "user"}),
77+
("user:password@[(address=127.0.0.1, priority=99), (address=localhost,"
78+
"priority=98)]", {"schema": "", "routers": [{"host": "127.0.0.1",
79+
"port": 33060, "priority": 99}, {"host": "localhost", "port": 33060,
80+
"priority": 98}], "password": "password", "user": "user"}),
81+
)
82+
83+
7184
@unittest.skipIf(tests.MYSQL_VERSION < (5, 7, 12), "XPlugin not compatible")
7285
class MySQLxXSessionTests(tests.MySQLxTests):
7386

@@ -88,6 +101,61 @@ def test___init__(self):
88101
}
89102
self.assertRaises(TypeError, mysqlx.XSession, bad_config)
90103

104+
host = self.connect_kwargs["host"]
105+
port = self.connect_kwargs["port"]
106+
user = self.connect_kwargs["user"]
107+
password = self.connect_kwargs["password"]
108+
109+
# XSession to a farm using one of many routers (prios)
110+
# Loop during connect because of network error (succeed)
111+
uri = ("mysqlx://{0}:{1}@[(address=bad_host, priority=100),"
112+
"(address={2}:{3}, priority=98)]"
113+
"".format(user, password, host, port))
114+
session = mysqlx.get_session(uri)
115+
session.close()
116+
117+
# XSession to a farm using one of many routers (incomplete prios)
118+
uri = ("mysqlx://{0}:{1}@[(address=bad_host, priority=100), {2}:{3}]"
119+
"".format(user, password, host, port))
120+
self.assertRaises(mysqlx.errors.ProgrammingError,
121+
mysqlx.get_session, uri)
122+
try:
123+
session = mysqlx.get_session(uri)
124+
except mysqlx.errors.ProgrammingError as err:
125+
self.assertEqual(4000, err.errno)
126+
127+
# XSession to a farm using invalid priorities (out of range)
128+
uri = ("mysqlx://{0}:{1}@[(address=bad_host, priority=100), "
129+
"(address={2}:{3}, priority=101)]"
130+
"".format(user, password, host, port))
131+
self.assertRaises(mysqlx.errors.ProgrammingError,
132+
mysqlx.get_session, uri)
133+
try:
134+
session = mysqlx.get_session(uri)
135+
except mysqlx.errors.ProgrammingError as err:
136+
self.assertEqual(4007, err.errno)
137+
138+
# Establish an XSession to a farm using one of many routers (no prios)
139+
uri = ("mysqlx://{0}:{1}@[bad_host, {2}:{3}]"
140+
"".format(user, password, host, port))
141+
session = mysqlx.get_session(uri)
142+
session.close()
143+
144+
# Break loop during connect (non-network error)
145+
uri = ("mysqlx://{0}:{1}@[bad_host, {2}:{3}]"
146+
"".format(user, "bad_pass", host, port))
147+
self.assertRaises(mysqlx.errors.InterfaceError,
148+
mysqlx.get_session, uri)
149+
150+
# Break loop during connect (none left)
151+
uri = "mysqlx://{0}:{1}@[bad_host, another_bad_host]"
152+
self.assertRaises(mysqlx.errors.InterfaceError,
153+
mysqlx.get_session, uri)
154+
try:
155+
session = mysqlx.get_session(uri)
156+
except mysqlx.errors.InterfaceError as err:
157+
self.assertEqual(4001, err.errno)
158+
91159
def test_connection_uri(self):
92160
uri = ("mysqlx://{user}:{password}@{host}:{port}/{schema}"
93161
"".format(user=self.connect_kwargs["user"],
@@ -99,7 +167,7 @@ def test_connection_uri(self):
99167
self.assertIsInstance(session, mysqlx.XSession)
100168

101169
# Test URI parser function
102-
for uri, res in _URI_TEST_RESULTS:
170+
for uri, res in _ROUTER_LIST_RESULTS:
103171
try:
104172
settings = mysqlx._get_connection_settings(uri)
105173
self.assertEqual(res, settings)

0 commit comments

Comments
 (0)