diff --git a/influxdb/helper.py b/influxdb/helper.py index 900df8d7..7f64de46 100644 --- a/influxdb/helper.py +++ b/influxdb/helper.py @@ -16,16 +16,9 @@ class SeriesHelper(object): All data points are immutable, insuring they do not get overwritten. Each subclass can write to its own database. The time series names can also be based on one or more defined fields. - - A field "time" can be used to write data points at a specific time, - rather than the default current time. The time field can take any of - the following forms: - * An integer unix timestamp in nanoseconds, assumed to be in UTC. - * A string in the ISO time format, including a timezone. - * A naive python datetime, which will be treated as UTC. - * A localized python datetime, which will use the chosen timezone. - If no time field is provided, the current UTC system time in microseconds - at the time of assembling the point data will be used. + The field "time" can be specified when creating a point, and may be any of + the time types supported by the client (i.e. str, datetime, int). + If the time is not specified, the current system time (utc) will be used. Annotated example:: @@ -98,8 +91,11 @@ def __new__(cls, *args, **kwargs): ' autocommit is false.'.format(cls.__name__)) cls._datapoints = defaultdict(list) - cls._type = namedtuple(cls.__name__, cls._fields + cls._tags) + if 'time' in cls._fields: + cls._fields.remove('time') + cls._type = namedtuple(cls.__name__, + cls._fields + cls._tags + ['time']) return super(SeriesHelper, cls).__new__(cls) def __init__(self, **kw): @@ -110,6 +106,7 @@ def __init__(self, **kw): :warning: Data points are *immutable* (`namedtuples`). """ cls = self.__class__ + timestamp = kw.pop('time', self._current_timestamp()) if sorted(cls._fields + cls._tags) != sorted(kw.keys()): raise NameError( @@ -117,7 +114,9 @@ def __init__(self, **kw): sorted(cls._fields + cls._tags), kw.keys())) - cls._datapoints[cls._series_name.format(**kw)].append(cls._type(**kw)) + cls._datapoints[cls._series_name.format(**kw)].append( + cls._type(time=timestamp, **kw) + ) if cls._autocommit and \ sum(len(series) for series in cls._datapoints.values()) \ @@ -151,25 +150,11 @@ def _json_body_(cls): "measurement": series_name, "fields": {}, "tags": {}, + "time": getattr(point, "time") } - ts = getattr(point, 'time', None) - if not ts: - # No time provided. Use current UTC time. - ts = datetime.utcnow().isoformat() + "+00:00" - elif isinstance(ts, datetime): - if ts.tzinfo is None or ts.tzinfo.utcoffset(ts) is None: - # Assuming naive datetime provided. Format with UTC tz. - ts = ts.isoformat() + "+00:00" - else: - # Assuming localized datetime provided. - ts = ts.isoformat() - # Neither of the above match. Assuming correct string or int. - json_point['time'] = ts - for field in cls._fields: - if field != 'time': - json_point['fields'][field] = getattr(point, field) + json_point['fields'][field] = getattr(point, field) for tag in cls._tags: json_point['tags'][tag] = getattr(point, tag) @@ -183,3 +168,6 @@ def _reset_(cls): Reset data storage. """ cls._datapoints = defaultdict(list) + + def _current_timestamp(self): + return datetime.utcnow() diff --git a/influxdb/tests/helper_test.py b/influxdb/tests/helper_test.py index ac2872f1..405b5ed8 100644 --- a/influxdb/tests/helper_test.py +++ b/influxdb/tests/helper_test.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- -import datetime -import pytz import sys if sys.version_info < (2, 7): import unittest2 as unittest @@ -10,6 +8,7 @@ import warnings import mock +from datetime import datetime, timedelta from influxdb import SeriesHelper, InfluxDBClient from requests.exceptions import ConnectionError @@ -40,17 +39,13 @@ class Meta: TestSeriesHelper.MySeriesHelper = MySeriesHelper - class MySeriesTimeHelper(SeriesHelper): - - class Meta: - client = TestSeriesHelper.client - series_name = 'events.stats.{server_name}' - fields = ['time', 'some_stat'] - tags = ['server_name', 'other_tag'] - bulk_size = 5 - autocommit = True - - TestSeriesHelper.MySeriesTimeHelper = MySeriesTimeHelper + def tearDown(self): + super(TestSeriesHelper, self).tearDown() + TestSeriesHelper.MySeriesHelper._reset_() + self.assertEqual( + TestSeriesHelper.MySeriesHelper._json_body_(), + [], + 'Resetting helper did not empty datapoints.') def test_auto_commit(self): """ @@ -76,24 +71,20 @@ class Meta: AutoCommitTest(server_name='us.east-1', some_stat=3443, other_tag='gg') self.assertTrue(fake_write_points.called) - def testSingleSeriesName(self): + @mock.patch('influxdb.helper.SeriesHelper._current_timestamp') + def testSingleSeriesName(self, current_timestamp): """ Tests JSON conversion when there is only one series name. """ - dt = datetime.datetime(2016, 1, 2, 3, 4, 5, 678912) - ts1 = dt - ts2 = "2016-10-11T01:02:03.123456789-04:00" - ts3 = 1234567890123456789 - ts4 = pytz.timezone("Europe/Berlin").localize(dt) - - TestSeriesHelper.MySeriesTimeHelper( - time=ts1, server_name='us.east-1', other_tag='ello', some_stat=159) - TestSeriesHelper.MySeriesTimeHelper( - time=ts2, server_name='us.east-1', other_tag='ello', some_stat=158) - TestSeriesHelper.MySeriesTimeHelper( - time=ts3, server_name='us.east-1', other_tag='ello', some_stat=157) - TestSeriesHelper.MySeriesTimeHelper( - time=ts4, server_name='us.east-1', other_tag='ello', some_stat=156) + current_timestamp.return_value = current_date = datetime.today() + TestSeriesHelper.MySeriesHelper( + server_name='us.east-1', other_tag='ello', some_stat=159) + TestSeriesHelper.MySeriesHelper( + server_name='us.east-1', other_tag='ello', some_stat=158) + TestSeriesHelper.MySeriesHelper( + server_name='us.east-1', other_tag='ello', some_stat=157) + TestSeriesHelper.MySeriesHelper( + server_name='us.east-1', other_tag='ello', some_stat=156) expectation = [ { "measurement": "events.stats.us.east-1", @@ -104,7 +95,7 @@ def testSingleSeriesName(self): "fields": { "some_stat": 159 }, - "time": "2016-01-02T03:04:05.678912+00:00", + "time": current_date, }, { "measurement": "events.stats.us.east-1", @@ -115,7 +106,7 @@ def testSingleSeriesName(self): "fields": { "some_stat": 158 }, - "time": "2016-10-11T01:02:03.123456789-04:00", + "time": current_date, }, { "measurement": "events.stats.us.east-1", @@ -126,7 +117,7 @@ def testSingleSeriesName(self): "fields": { "some_stat": 157 }, - "time": 1234567890123456789, + "time": current_date, }, { "measurement": "events.stats.us.east-1", @@ -137,25 +128,22 @@ def testSingleSeriesName(self): "fields": { "some_stat": 156 }, - "time": "2016-01-02T03:04:05.678912+01:00", + "time": current_date, } ] - rcvd = TestSeriesHelper.MySeriesTimeHelper._json_body_() + rcvd = TestSeriesHelper.MySeriesHelper._json_body_() self.assertTrue(all([el in expectation for el in rcvd]) and all([el in rcvd for el in expectation]), 'Invalid JSON body of time series returned from ' '_json_body_ for one series name: {0}.'.format(rcvd)) - TestSeriesHelper.MySeriesTimeHelper._reset_() - self.assertEqual( - TestSeriesHelper.MySeriesTimeHelper._json_body_(), - [], - 'Resetting helper did not empty datapoints.') - def testSeveralSeriesNames(self): - ''' + @mock.patch('influxdb.helper.SeriesHelper._current_timestamp') + def testSeveralSeriesNames(self, current_timestamp): + """ Tests JSON conversion when there are multiple series names. - ''' + """ + current_timestamp.return_value = current_date = datetime.today() TestSeriesHelper.MySeriesHelper( server_name='us.east-1', some_stat=159, other_tag='ello') TestSeriesHelper.MySeriesHelper( @@ -173,7 +161,8 @@ def testSeveralSeriesNames(self): 'tags': { 'other_tag': 'ello', 'server_name': 'lu.lux' - } + }, + "time": current_date, }, { 'fields': { @@ -183,7 +172,8 @@ def testSeveralSeriesNames(self): 'tags': { 'other_tag': 'ello', 'server_name': 'uk.london' - } + }, + "time": current_date, }, { 'fields': { @@ -193,7 +183,8 @@ def testSeveralSeriesNames(self): 'tags': { 'other_tag': 'ello', 'server_name': 'fr.paris-10' - } + }, + "time": current_date, }, { 'fields': { @@ -203,25 +194,70 @@ def testSeveralSeriesNames(self): 'tags': { 'other_tag': 'ello', 'server_name': 'us.east-1' - } + }, + "time": current_date, } ] rcvd = TestSeriesHelper.MySeriesHelper._json_body_() - for r in rcvd: - self.assertTrue(r.get('time'), - "No time field in received JSON body.") - del(r["time"]) self.assertTrue(all([el in expectation for el in rcvd]) and all([el in rcvd for el in expectation]), 'Invalid JSON body of time series returned from ' '_json_body_ for several series names: {0}.' .format(rcvd)) - TestSeriesHelper.MySeriesHelper._reset_() - self.assertEqual( - TestSeriesHelper.MySeriesHelper._json_body_(), - [], - 'Resetting helper did not empty datapoints.') + + @mock.patch('influxdb.helper.SeriesHelper._current_timestamp') + def testSeriesWithoutTimeField(self, current_timestamp): + """ + Tests that time is optional on a series without a time field. + """ + current_date = datetime.today() + yesterday = current_date - timedelta(days=1) + current_timestamp.return_value = yesterday + TestSeriesHelper.MySeriesHelper( + server_name='us.east-1', other_tag='ello', + some_stat=159, time=current_date + ) + TestSeriesHelper.MySeriesHelper( + server_name='us.east-1', other_tag='ello', + some_stat=158, + ) + point1, point2 = TestSeriesHelper.MySeriesHelper._json_body_() + self.assertTrue('time' in point1 and 'time' in point2) + self.assertEqual(point1['time'], current_date) + self.assertEqual(point2['time'], yesterday) + + @mock.patch('influxdb.helper.SeriesHelper._current_timestamp') + def testSeriesWithTimeField(self, current_timestamp): + """ + Test that time is optional on a series with a time field. + """ + current_date = datetime.today() + yesterday = current_date - timedelta(days=1) + current_timestamp.return_value = yesterday + + class MyTimeFieldSeriesHelper(SeriesHelper): + + class Meta: + client = TestSeriesHelper.client + series_name = 'events.stats.{server_name}' + fields = ['some_stat', 'time'] + tags = ['server_name', 'other_tag'] + bulk_size = 5 + autocommit = True + + MyTimeFieldSeriesHelper( + server_name='us.east-1', other_tag='ello', + some_stat=159, time=current_date + ) + MyTimeFieldSeriesHelper( + server_name='us.east-1', other_tag='ello', + some_stat=158, + ) + point1, point2 = MyTimeFieldSeriesHelper._json_body_() + self.assertTrue('time' in point1 and 'time' in point2) + self.assertEqual(point1['time'], current_date) + self.assertEqual(point2['time'], yesterday) def testInvalidHelpers(self): ''' diff --git a/test-requirements.txt b/test-requirements.txt index 9e18b7d2..cbc6add3 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,5 +1,4 @@ nose nose-cov mock -requests-mock -pytz +requests-mock \ No newline at end of file