Skip to content

Commit 3b468ff

Browse files
author
aviau
committed
Merge SeriesHelper
2 parents 30251ae + 6c0fc43 commit 3b468ff

File tree

4 files changed

+333
-0
lines changed

4 files changed

+333
-0
lines changed

influxdb/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# -*- coding: utf-8 -*-
22
from .client import InfluxDBClient
33
from .dataframe_client import DataFrameClient
4+
from .helper import SeriesHelper
45

56

67
__all__ = [
78
'InfluxDBClient',
89
'DataFrameClient',
10+
'SeriesHelper',
911
]
1012

1113

influxdb/helper.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Helper class for InfluxDB
4+
"""
5+
from collections import namedtuple, defaultdict
6+
from warnings import warn
7+
8+
import six
9+
10+
11+
class SeriesHelper(object):
12+
13+
'''
14+
Subclassing this helper eases writing data points in bulk.
15+
All data points are immutable, insuring they do not get overwritten.
16+
Each subclass can write to its own database.
17+
The time series names can also be based on one or more defined fields.
18+
19+
Annotated example:
20+
```
21+
class MySeriesHelper(SeriesHelper):
22+
class Meta:
23+
# Meta class stores time series helper configuration.
24+
series_name = 'events.stats.{server_name}'
25+
# Series name must be a string, curly brackets for dynamic use.
26+
fields = ['time', 'server_name']
27+
# Defines all the fields in this time series.
28+
### Following attributes are optional. ###
29+
client = TestSeriesHelper.client
30+
# Client should be an instance of InfluxDBClient.
31+
:warning: Only used if autocommit is True.
32+
bulk_size = 5
33+
# Defines the number of data points to write simultaneously.
34+
:warning: Only applicable if autocommit is True.
35+
autocommit = True
36+
# If True and no bulk_size, then will set bulk_size to 1.
37+
38+
# The following will create *five* (immutable) data points.
39+
# Since bulk_size is set to 5, upon the fifth construction call, all data
40+
# points will be written on the wire via MySeriesHelper.Meta.client.
41+
MySeriesHelper(server_name='us.east-1', time=159)
42+
MySeriesHelper(server_name='us.east-1', time=158)
43+
MySeriesHelper(server_name='us.east-1', time=157)
44+
MySeriesHelper(server_name='us.east-1', time=156)
45+
MySeriesHelper(server_name='us.east-1', time=155)
46+
47+
# If autocommit None or False, one must call commit to write datapoints.
48+
# To manually submit data points which are not yet written, call commit:
49+
MySeriesHelper.commit()
50+
51+
# To inspect the JSON which will be written, call _json_body_():
52+
MySeriesHelper._json_body_()
53+
```
54+
'''
55+
__initialized__ = False
56+
57+
def __new__(cls, *args, **kwargs):
58+
'''
59+
Initializes class attributes for subsequent constructor calls.
60+
:note: *args and **kwargs are not explicitly used in this function,
61+
but needed for Python 2 compatibility.
62+
'''
63+
if not cls.__initialized__:
64+
cls.__initialized__ = True
65+
try:
66+
_meta = getattr(cls, 'Meta')
67+
except AttributeError:
68+
raise AttributeError(
69+
'Missing Meta class in {}.'.format(
70+
cls.__name__))
71+
72+
for attr in ['series_name', 'fields']:
73+
try:
74+
setattr(cls, '_' + attr, getattr(_meta, attr))
75+
except AttributeError:
76+
raise AttributeError(
77+
'Missing {} in {} Meta class.'.format(
78+
attr,
79+
cls.__name__))
80+
81+
cls._autocommit = getattr(_meta, 'autocommit', False)
82+
83+
cls._client = getattr(_meta, 'client', None)
84+
if cls._autocommit and not cls._client:
85+
raise AttributeError(
86+
'In {}, autocommit is set to True, but no client is set.'
87+
.format(cls.__name__))
88+
89+
try:
90+
cls._bulk_size = getattr(_meta, 'bulk_size')
91+
if cls._bulk_size < 1 and cls._autocommit:
92+
warn(
93+
'Definition of bulk_size in {} forced to 1, '
94+
'was less than 1.'.format(cls.__name__))
95+
cls._bulk_size = 1
96+
except AttributeError:
97+
cls._bulk_size = -1
98+
else:
99+
if not cls._autocommit:
100+
warn(
101+
'Definition of bulk_size in {} has no affect because'
102+
' autocommit is false.'.format(cls.__name__))
103+
104+
cls._datapoints = defaultdict(list)
105+
cls._type = namedtuple(cls.__name__, cls._fields)
106+
107+
return super(SeriesHelper, cls).__new__(cls)
108+
109+
def __init__(self, **kw):
110+
'''
111+
Constructor call creates a new data point. All fields must be present.
112+
:note: Data points written when `bulk_size` is reached per Helper.
113+
:warning: Data points are *immutable* (`namedtuples`).
114+
'''
115+
cls = self.__class__
116+
117+
if sorted(cls._fields) != sorted(kw.keys()):
118+
raise NameError(
119+
'Expected {0}, got {1}.'.format(
120+
cls._fields,
121+
kw.keys()))
122+
123+
cls._datapoints[cls._series_name.format(**kw)].append(cls._type(**kw))
124+
125+
if cls._autocommit and len(cls._datapoints) >= cls._bulk_size:
126+
cls.commit()
127+
128+
@classmethod
129+
def commit(cls, client=None):
130+
'''
131+
Commit everything from datapoints via the client.
132+
:param client: InfluxDBClient instance for writing points to InfluxDB.
133+
:attention: any provided client will supersede the class client.
134+
:return result of client.write_points.
135+
'''
136+
if not client:
137+
client = cls._client
138+
rtn = client.write_points(cls._json_body_())
139+
cls._reset_()
140+
return rtn
141+
142+
@classmethod
143+
def _json_body_(cls):
144+
'''
145+
:return: JSON body of these datapoints.
146+
'''
147+
json = []
148+
for series_name, data in six.iteritems(cls._datapoints):
149+
json.append({'name': series_name,
150+
'columns': cls._fields,
151+
'points': [[point.__dict__[k] for k in cls._fields]
152+
for point in data]
153+
})
154+
return json
155+
156+
@classmethod
157+
def _reset_(cls):
158+
'''
159+
Reset data storage.
160+
'''
161+
cls._datapoints = defaultdict(list)

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
requests>=1.0.3
2+
six==1.9.0

tests/influxdb/helper_test.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# -*- coding: utf-8 -*-
2+
3+
import unittest
4+
import warnings
5+
6+
from influxdb import SeriesHelper, InfluxDBClient
7+
from requests.exceptions import ConnectionError
8+
9+
10+
class TestSeriesHelper(unittest.TestCase):
11+
12+
@classmethod
13+
def setUpClass(cls):
14+
super(TestSeriesHelper, cls).setUpClass()
15+
16+
TestSeriesHelper.client = InfluxDBClient(
17+
'host',
18+
8086,
19+
'username',
20+
'password',
21+
'database')
22+
23+
class MySeriesHelper(SeriesHelper):
24+
25+
class Meta:
26+
client = TestSeriesHelper.client
27+
series_name = 'events.stats.{server_name}'
28+
fields = ['time', 'server_name']
29+
bulk_size = 5
30+
autocommit = True
31+
32+
TestSeriesHelper.MySeriesHelper = MySeriesHelper
33+
34+
def testSingleSeriesName(self):
35+
'''
36+
Tests JSON conversion when there is only one series name.
37+
'''
38+
TestSeriesHelper.MySeriesHelper(server_name='us.east-1', time=159)
39+
TestSeriesHelper.MySeriesHelper(server_name='us.east-1', time=158)
40+
TestSeriesHelper.MySeriesHelper(server_name='us.east-1', time=157)
41+
TestSeriesHelper.MySeriesHelper(server_name='us.east-1', time=156)
42+
expectation = [{'points': [[159, 'us.east-1'],
43+
[158, 'us.east-1'],
44+
[157, 'us.east-1'],
45+
[156, 'us.east-1']],
46+
'name': 'events.stats.us.east-1',
47+
'columns': ['time', 'server_name']}]
48+
49+
rcvd = TestSeriesHelper.MySeriesHelper._json_body_()
50+
self.assertTrue(all([el in expectation for el in rcvd]) and
51+
all([el in rcvd for el in expectation]),
52+
'Invalid JSON body of time series returned from '
53+
'_json_body_ for one series name: {}.'.format(rcvd))
54+
TestSeriesHelper.MySeriesHelper._reset_()
55+
self.assertEqual(
56+
TestSeriesHelper.MySeriesHelper._json_body_(),
57+
[],
58+
'Resetting helper did not empty datapoints.')
59+
60+
def testSeveralSeriesNames(self):
61+
'''
62+
Tests JSON conversion when there is only one series name.
63+
'''
64+
TestSeriesHelper.MySeriesHelper(server_name='us.east-1', time=159)
65+
TestSeriesHelper.MySeriesHelper(server_name='fr.paris-10', time=158)
66+
TestSeriesHelper.MySeriesHelper(server_name='lu.lux', time=157)
67+
TestSeriesHelper.MySeriesHelper(server_name='uk.london', time=156)
68+
expectation = [{'points': [[157, 'lu.lux']],
69+
'name': 'events.stats.lu.lux',
70+
'columns': ['time', 'server_name']},
71+
{'points': [[156, 'uk.london']],
72+
'name': 'events.stats.uk.london',
73+
'columns': ['time', 'server_name']},
74+
{'points': [[158, 'fr.paris-10']],
75+
'name': 'events.stats.fr.paris-10',
76+
'columns': ['time', 'server_name']},
77+
{'points': [[159, 'us.east-1']],
78+
'name': 'events.stats.us.east-1',
79+
'columns': ['time', 'server_name']}]
80+
81+
rcvd = TestSeriesHelper.MySeriesHelper._json_body_()
82+
self.assertTrue(all([el in expectation for el in rcvd]) and
83+
all([el in rcvd for el in expectation]),
84+
'Invalid JSON body of time series returned from '
85+
'_json_body_ for several series names: {}.'
86+
.format(rcvd))
87+
TestSeriesHelper.MySeriesHelper._reset_()
88+
self.assertEqual(
89+
TestSeriesHelper.MySeriesHelper._json_body_(),
90+
[],
91+
'Resetting helper did not empty datapoints.')
92+
93+
def testInvalidHelpers(self):
94+
'''
95+
Tests errors in invalid helpers.
96+
'''
97+
class MissingMeta(SeriesHelper):
98+
pass
99+
100+
class MissingClient(SeriesHelper):
101+
102+
class Meta:
103+
series_name = 'events.stats.{server_name}'
104+
fields = ['time', 'server_name']
105+
autocommit = True
106+
107+
class MissingSeriesName(SeriesHelper):
108+
109+
class Meta:
110+
fields = ['time', 'server_name']
111+
112+
class MissingFields(SeriesHelper):
113+
114+
class Meta:
115+
series_name = 'events.stats.{server_name}'
116+
117+
for cls in [MissingMeta, MissingClient, MissingFields,
118+
MissingSeriesName]:
119+
self.assertRaises(
120+
AttributeError, cls, **{'time': 159,
121+
'server_name': 'us.east-1'})
122+
123+
def testWarnBulkSizeZero(self):
124+
'''
125+
Tests warning for an invalid bulk size.
126+
'''
127+
class WarnBulkSizeZero(SeriesHelper):
128+
129+
class Meta:
130+
client = TestSeriesHelper.client
131+
series_name = 'events.stats.{server_name}'
132+
fields = ['time', 'server_name']
133+
bulk_size = 0
134+
autocommit = True
135+
136+
with warnings.catch_warnings(record=True) as w:
137+
warnings.simplefilter("always")
138+
try:
139+
WarnBulkSizeZero(time=159, server_name='us.east-1')
140+
except ConnectionError:
141+
# Server defined in the client is invalid, we're testing
142+
# the warning only.
143+
pass
144+
self.assertEqual(len(w), 1,
145+
'{} call should have generated one warning.'
146+
.format(WarnBulkSizeZero))
147+
self.assertIn('forced to 1', str(w[-1].message),
148+
'Warning message did not contain "forced to 1".')
149+
150+
def testWarnBulkSizeNoEffect(self):
151+
'''
152+
Tests warning for a set bulk size but autocommit False.
153+
'''
154+
class WarnBulkSizeNoEffect(SeriesHelper):
155+
156+
class Meta:
157+
series_name = 'events.stats.{server_name}'
158+
fields = ['time', 'server_name']
159+
bulk_size = 5
160+
autocommit = False
161+
162+
with warnings.catch_warnings(record=True) as w:
163+
warnings.simplefilter("always")
164+
WarnBulkSizeNoEffect(time=159, server_name='us.east-1')
165+
self.assertEqual(len(w), 1,
166+
'{} call should have generated one warning.'
167+
.format(WarnBulkSizeNoEffect))
168+
self.assertIn('has no affect', str(w[-1].message),
169+
'Warning message did not contain "has not affect".')

0 commit comments

Comments
 (0)