Skip to content

Commit cf1ef9b

Browse files
author
aviau
committed
Refactored ResultSet
1 parent 3e1a03c commit cf1ef9b

File tree

7 files changed

+125
-95
lines changed

7 files changed

+125
-95
lines changed

docs/source/resultset.rst

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,39 @@ Query response object: ResultSet
77

88
Using the ``InfluxDBClient.query()`` function will return a ``ResultSet`` Object.
99

10-
A ResultSet behaves like a dict. Its keys are series and values are points. However, it is a little bit smarter than a regular dict. Its ``__getitem__`` method can be used to query the ResultSet in several ways.
10+
A ResultSet can be browsed in several ways. Its ``get_points`` method can be used to retrieve points generators that filter either by measurement, tags, or both.
1111

12-
Filtering by serie name
13-
-----------------------
12+
Getting all points
13+
------------------
1414

15-
Using ``rs['cpu']`` will return a generator for all the points that are in a serie named ``cpu``, no matter the tags.
15+
Using ``rs.get_points()`` will return a generator for all the points in the ResultSet.
16+
17+
18+
Filtering by measurement
19+
------------------------
20+
21+
Using ``rs.get_points('cpu')`` will return a generator for all the points that are in a serie with measurement name ``cpu``, no matter the tags.
1622
::
1723

1824
rs = cli.query("SELECT * from cpu")
19-
cpu_points = list(rs['cpu'])
25+
cpu_points = list(rs.get_points(measurement='cpu')])
2026

2127
Filtering by tags
2228
-----------------
2329

24-
Using ``rs[{'host_name': 'influxdb.com'}]`` will return a generator for all the points that are tagged with the specified tags, no matter the serie name.
30+
Using ``rs.get_points(tags={'host_name': 'influxdb.com'})`` will return a generator for all the points that are tagged with the specified tags, no matter the measurement name.
2531
::
2632

2733
rs = cli.query("SELECT * from cpu")
2834
cpu_influxdb_com_points = list(rs[{"host_name": "influxdb.com"}])
2935

30-
Filtering by serie name and tags
31-
--------------------------------
36+
Filtering by measurement and tags
37+
---------------------------------
3238

33-
Using a tuple with a serie name and a dict will return a generator for all the points that are in a serie with the given name AND whose tags match the given tags.
39+
Using measurement name and tags will return a generator for all the points that are in a serie with the specified measurement name AND whose tags match the given tags.
3440
::
3541

3642
rs = cli.query("SELECT * from cpu")
37-
points = list(rs[('cpu', {'host_name': 'influxdb.com'})])
43+
points = list(rs.get_points(measurement='cpu', tags={'host_name': 'influxdb.com'}))
3844

3945
See the :ref:`api-documentation` page for more information.

influxdb/client.py

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from sys import version_info
1313

1414
from influxdb.resultset import ResultSet
15+
from .exceptions import InfluxDBClientError
16+
from .exceptions import InfluxDBServerError
1517

1618
try:
1719
xrange
@@ -24,23 +26,6 @@
2426
from urlparse import urlparse
2527

2628

27-
class InfluxDBClientError(Exception):
28-
"""Raised when an error occurs in the request."""
29-
def __init__(self, content, code):
30-
if isinstance(content, type(b'')):
31-
content = content.decode('UTF-8', errors='replace')
32-
super(InfluxDBClientError, self).__init__(
33-
"{0}: {1}".format(code, content))
34-
self.content = content
35-
self.code = code
36-
37-
38-
class InfluxDBServerError(Exception):
39-
"""Raised when a server error occurs."""
40-
def __init__(self, content):
41-
super(InfluxDBServerError, self).__init__(content)
42-
43-
4429
class InfluxDBClient(object):
4530
"""The :class:`~.InfluxDBClient` object holds information necessary to
4631
connect to InfluxDB. Requests can be made to InfluxDB directly through
@@ -310,7 +295,13 @@ def query(self,
310295

311296
data = response.json()
312297

313-
return ResultSet(data)
298+
results = [ResultSet(result) for result in data.get('results', [])]
299+
300+
# TODO(aviau): Always return a list. (This would be a breaking change)
301+
if len(results) == 1:
302+
return results[0]
303+
else:
304+
return results
314305

315306
def write_points(self,
316307
points,
@@ -420,7 +411,7 @@ def get_list_database(self):
420411
>>> dbs
421412
[{u'name': u'db1'}, {u'name': u'db2'}, {u'name': u'db3'}]
422413
"""
423-
return list(self.query("SHOW DATABASES")['databases'])
414+
return list(self.query("SHOW DATABASES").get_points())
424415

425416
def create_database(self, dbname):
426417
"""Create a new database in InfluxDB.
@@ -527,7 +518,7 @@ def get_list_retention_policies(self, database=None):
527518
rsp = self.query(
528519
"SHOW RETENTION POLICIES %s" % (database or self._database)
529520
)
530-
return list(rsp['results'])
521+
return list(rsp.get_points())
531522

532523
def get_list_series(self, database=None):
533524
"""Get the list of series for a database.
@@ -572,7 +563,7 @@ def get_list_users(self):
572563
{u'admin': False, u'user': u'user2'},
573564
{u'admin': False, u'user': u'user3'}]
574565
"""
575-
return list(self.query("SHOW USERS")["results"])
566+
return list(self.query("SHOW USERS").get_points())
576567

577568
def create_user(self, username, password):
578569
"""Create a new user in InfluxDB

influxdb/exceptions.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
class InfluxDBClientError(Exception):
2+
"""Raised when an error occurs in the request."""
3+
def __init__(self, content, code=None):
4+
if isinstance(content, type(b'')):
5+
content = content.decode('UTF-8', errors='replace')
6+
7+
if code is not None:
8+
message = "%s: %s" % (code, content)
9+
else:
10+
message = content
11+
12+
super(InfluxDBClientError, self).__init__(
13+
message
14+
)
15+
self.content = content
16+
self.code = code
17+
18+
19+
class InfluxDBServerError(Exception):
20+
"""Raised when a server error occurs."""
21+
def __init__(self, content):
22+
super(InfluxDBServerError, self).__init__(content)

influxdb/resultset.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
# -*- coding: utf-8 -*-
22

3+
from influxdb.exceptions import InfluxDBClientError
4+
35
_sentinel = object()
46

57

68
class ResultSet(object):
7-
"""A wrapper around series results """
9+
"""A wrapper around a single InfluxDB query result"""
810

911
def __init__(self, series):
1012
self._raw = series
1113

14+
if 'error' in self.raw:
15+
raise InfluxDBClientError(self.raw['error'])
16+
1217
@property
1318
def raw(self):
1419
"""Raw JSON from InfluxDB"""
@@ -46,23 +51,29 @@ def __getitem__(self, key):
4651
name = key
4752
tags = None
4853

49-
if not isinstance(name, (bytes, type(b''.decode()), type(None))):
50-
raise TypeError('serie_name must be an str or None')
54+
return self.get_points(name, tags)
55+
56+
def get_points(self, measurement=None, tags=None):
57+
58+
# Raise error if measurement is not str or bytes
59+
if not isinstance(measurement,
60+
(bytes, type(b''.decode()), type(None))):
61+
raise TypeError('measurement must be an str or None')
5162

5263
for serie in self._get_series():
5364
serie_name = serie.get('measurement', serie.get('name', 'results'))
5465
if serie_name is None:
5566
# this is a "system" query or a query which
5667
# doesn't return a name attribute.
5768
# like 'show retention policies' ..
58-
if key is None:
69+
if tags is None:
5970
for point in serie['values']:
6071
yield self.point_from_cols_vals(
6172
serie['columns'],
6273
point
6374
)
6475

65-
elif name in (None, serie_name):
76+
elif measurement in (None, serie_name):
6677
# by default if no tags was provided then
6778
# we will matches every returned serie
6879
serie_tags = serie.get('tags', {})
@@ -101,13 +112,7 @@ def _tag_matches(self, tags, filter):
101112

102113
def _get_series(self):
103114
"""Returns all series"""
104-
series = []
105-
try:
106-
for result in self.raw['results']:
107-
series.extend(result['series'])
108-
except KeyError:
109-
pass
110-
return series
115+
return self.raw.get('series', [])
111116

112117
def __len__(self):
113118
return len(self.keys())

tests/influxdb/client_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ def test_query(self):
337337
rs = self.cli.query('select * from foo')
338338

339339
self.assertListEqual(
340-
list(rs['cpu_load_short']),
340+
list(rs[0].get_points()),
341341
[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
342342
)
343343

tests/influxdb/client_test_with_server.py

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
warnings.simplefilter('error', FutureWarning)
3030

3131
from influxdb import InfluxDBClient
32-
from influxdb.client import InfluxDBClientError
32+
from influxdb.exceptions import InfluxDBClientError
3333

3434
from tests.influxdb.misc import get_free_port, is_port_open
3535
from tests import skipIfPYpy, using_pypy
@@ -337,13 +337,11 @@ def test_create_database(self):
337337
[{'name': 'new_db_1'}, {'name': 'new_db_2'}]
338338
)
339339

340-
@unittest.skip("Broken as of 0.9.0-rc30")
341340
def test_create_database_fails(self):
342341
self.assertIsNone(self.cli.create_database('new_db'))
343342
with self.assertRaises(InfluxDBClientError) as ctx:
344343
self.cli.create_database('new_db')
345-
self.assertEqual(500, ctx.exception.code)
346-
self.assertEqual('{"results":[{"error":"database exists"}]}',
344+
self.assertEqual('database already exists',
347345
ctx.exception.content)
348346

349347
def test_get_list_series_empty(self):
@@ -360,20 +358,16 @@ def test_drop_database(self):
360358
self.assertIsNone(self.cli.drop_database('new_db_1'))
361359
self.assertEqual([{'name': 'new_db_2'}], self.cli.get_list_database())
362360

363-
@unittest.skip("Broken as of 0.9.0-rc30")
364361
def test_drop_database_fails(self):
365362
with self.assertRaises(InfluxDBClientError) as ctx:
366363
self.cli.drop_database('db')
367-
self.assertEqual(500, ctx.exception.code)
368-
self.assertIn('{"results":[{"error":"database not found: db',
364+
self.assertIn('database not found: db',
369365
ctx.exception.content)
370366

371-
@unittest.skip("Broken as of 0.9.0-rc30")
372367
def test_query_fail(self):
373368
with self.assertRaises(InfluxDBClientError) as ctx:
374369
self.cli.query('select column_one from foo')
375-
self.assertEqual(500, ctx.exception.code)
376-
self.assertIn('{"results":[{"error":"database not found: db',
370+
self.assertIn('database not found: db',
377371
ctx.exception.content)
378372

379373
def test_create_user(self):
@@ -388,6 +382,19 @@ def test_create_user_blank_password(self):
388382
self.assertIn({'user': 'test_user', 'admin': False},
389383
rsp)
390384

385+
def test_get_list_users_empty(self):
386+
rsp = self.cli.get_list_users()
387+
self.assertEqual([], rsp)
388+
389+
def test_get_list_users(self):
390+
self.cli.query("CREATE USER test WITH PASSWORD 'test'")
391+
rsp = self.cli.get_list_users()
392+
393+
self.assertEqual(
394+
[{'user': 'test', 'admin': False}],
395+
rsp
396+
)
397+
391398
def test_create_user_blank_username(self):
392399
with self.assertRaises(InfluxDBClientError) as ctx:
393400
self.cli.create_user('', 'secret_password')
@@ -414,12 +421,10 @@ def test_drop_user(self):
414421
users = list(self.cli.query("SHOW USERS")['results'])
415422
self.assertEqual(users, [])
416423

417-
@unittest.skip("Broken as of 0.9.0-rc30")
418424
def test_drop_user_nonexisting(self):
419425
with self.assertRaises(InfluxDBClientError) as ctx:
420426
self.cli.drop_user('test')
421-
self.assertEqual(500, ctx.exception.code)
422-
self.assertIn('{"results":[{"error":"user not found"}]}',
427+
self.assertIn('user not found',
423428
ctx.exception.content)
424429

425430
def test_drop_user_invalid(self):
@@ -550,7 +555,7 @@ def test_write_points_check_read(self):
550555
[[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]]
551556
)
552557

553-
rsp2 = list(rsp['cpu_load_short'])
558+
rsp2 = list(rsp.get_points())
554559
self.assertEqual(len(rsp2), 1)
555560
pt = rsp2[0]
556561

@@ -634,10 +639,10 @@ def test_write_points_batch(self):
634639
batch_size=2)
635640
time.sleep(5)
636641
net_in = self.cli.query("SELECT value FROM network "
637-
"WHERE direction='in'").raw['results'][0]
642+
"WHERE direction='in'").raw
638643
net_out = self.cli.query("SELECT value FROM network "
639-
"WHERE direction='out'").raw['results'][0]
640-
cpu = self.cli.query("SELECT value FROM cpu_usage").raw['results'][0]
644+
"WHERE direction='out'").raw
645+
cpu = self.cli.query("SELECT value FROM cpu_usage").raw
641646
self.assertIn(123, net_in['series'][0]['values'][0])
642647
self.assertIn(12, net_out['series'][0]['values'][0])
643648
self.assertIn(12.34, cpu['series'][0]['values'][0])
@@ -796,19 +801,6 @@ def test_get_list_series_DF(self):
796801
columns=['_id', 'host', 'region'])
797802
assert_frame_equal(rsp['cpu_load_short'], expected)
798803

799-
def test_get_list_users_empty(self):
800-
rsp = self.cli.get_list_users()
801-
self.assertEqual([], rsp)
802-
803-
def test_get_list_users_non_empty(self):
804-
self.cli.query("CREATE USER test WITH PASSWORD 'test'")
805-
rsp = self.cli.get_list_users()
806-
807-
self.assertEqual(
808-
[{'user': 'test', 'admin': False}],
809-
rsp
810-
)
811-
812804
def test_default_retention_policy(self):
813805
rsp = self.cli.get_list_retention_policies()
814806
self.assertEqual(

0 commit comments

Comments
 (0)