Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Add time parameter to SeriesHelper #306

Merged
merged 9 commits into from
Apr 21, 2016
44 changes: 16 additions & 28 deletions influxdb/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,9 @@ class SeriesHelper(object):
All data points are immutable, insuring they do not get overwritten.
Each subclass can write to its own database.
The time series names can also be based on one or more defined fields.

A field "time" can be used to write data points at a specific time,
rather than the default current time. The time field can take any of
the following forms:
* An integer unix timestamp in nanoseconds, assumed to be in UTC.
* A string in the ISO time format, including a timezone.
* A naive python datetime, which will be treated as UTC.
* A localized python datetime, which will use the chosen timezone.
If no time field is provided, the current UTC system time in microseconds
at the time of assembling the point data will be used.
The field "time" can be specified when creating a point, and may be any of
the time types supported by the client (i.e. str, datetime, int).
If the time is not specified, the current system time (utc) will be used.

Annotated example::

Expand Down Expand Up @@ -98,8 +91,11 @@ def __new__(cls, *args, **kwargs):
' autocommit is false.'.format(cls.__name__))

cls._datapoints = defaultdict(list)
cls._type = namedtuple(cls.__name__, cls._fields + cls._tags)

if 'time' in cls._fields:
cls._fields.remove('time')
cls._type = namedtuple(cls.__name__,
cls._fields + cls._tags + ['time'])
return super(SeriesHelper, cls).__new__(cls)

def __init__(self, **kw):
Expand All @@ -110,14 +106,17 @@ def __init__(self, **kw):
:warning: Data points are *immutable* (`namedtuples`).
"""
cls = self.__class__
timestamp = kw.pop('time', self._current_timestamp())

if sorted(cls._fields + cls._tags) != sorted(kw.keys()):
raise NameError(
'Expected {0}, got {1}.'.format(
sorted(cls._fields + cls._tags),
kw.keys()))

cls._datapoints[cls._series_name.format(**kw)].append(cls._type(**kw))
cls._datapoints[cls._series_name.format(**kw)].append(
cls._type(time=timestamp, **kw)
)

if cls._autocommit and \
sum(len(series) for series in cls._datapoints.values()) \
Expand Down Expand Up @@ -151,25 +150,11 @@ def _json_body_(cls):
"measurement": series_name,
"fields": {},
"tags": {},
"time": getattr(point, "time")
}

ts = getattr(point, 'time', None)
if not ts:
# No time provided. Use current UTC time.
ts = datetime.utcnow().isoformat() + "+00:00"
elif isinstance(ts, datetime):
if ts.tzinfo is None or ts.tzinfo.utcoffset(ts) is None:
# Assuming naive datetime provided. Format with UTC tz.
ts = ts.isoformat() + "+00:00"
else:
# Assuming localized datetime provided.
ts = ts.isoformat()
# Neither of the above match. Assuming correct string or int.
json_point['time'] = ts

for field in cls._fields:
if field != 'time':
json_point['fields'][field] = getattr(point, field)
json_point['fields'][field] = getattr(point, field)

for tag in cls._tags:
json_point['tags'][tag] = getattr(point, tag)
Expand All @@ -183,3 +168,6 @@ def _reset_(cls):
Reset data storage.
"""
cls._datapoints = defaultdict(list)

def _current_timestamp(self):
return datetime.utcnow()
144 changes: 90 additions & 54 deletions influxdb/tests/helper_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-

import datetime
import pytz
import sys
if sys.version_info < (2, 7):
import unittest2 as unittest
Expand All @@ -10,6 +8,7 @@
import warnings

import mock
from datetime import datetime, timedelta
from influxdb import SeriesHelper, InfluxDBClient
from requests.exceptions import ConnectionError

Expand Down Expand Up @@ -40,17 +39,13 @@ class Meta:

TestSeriesHelper.MySeriesHelper = MySeriesHelper

class MySeriesTimeHelper(SeriesHelper):

class Meta:
client = TestSeriesHelper.client
series_name = 'events.stats.{server_name}'
fields = ['time', 'some_stat']
tags = ['server_name', 'other_tag']
bulk_size = 5
autocommit = True

TestSeriesHelper.MySeriesTimeHelper = MySeriesTimeHelper
def tearDown(self):
super(TestSeriesHelper, self).tearDown()
TestSeriesHelper.MySeriesHelper._reset_()
self.assertEqual(
TestSeriesHelper.MySeriesHelper._json_body_(),
[],
'Resetting helper did not empty datapoints.')

def test_auto_commit(self):
"""
Expand All @@ -76,24 +71,20 @@ class Meta:
AutoCommitTest(server_name='us.east-1', some_stat=3443, other_tag='gg')
self.assertTrue(fake_write_points.called)

def testSingleSeriesName(self):
@mock.patch('influxdb.helper.SeriesHelper._current_timestamp')
def testSingleSeriesName(self, current_timestamp):
"""
Tests JSON conversion when there is only one series name.
"""
dt = datetime.datetime(2016, 1, 2, 3, 4, 5, 678912)
ts1 = dt
ts2 = "2016-10-11T01:02:03.123456789-04:00"
ts3 = 1234567890123456789
ts4 = pytz.timezone("Europe/Berlin").localize(dt)

TestSeriesHelper.MySeriesTimeHelper(
time=ts1, server_name='us.east-1', other_tag='ello', some_stat=159)
TestSeriesHelper.MySeriesTimeHelper(
time=ts2, server_name='us.east-1', other_tag='ello', some_stat=158)
TestSeriesHelper.MySeriesTimeHelper(
time=ts3, server_name='us.east-1', other_tag='ello', some_stat=157)
TestSeriesHelper.MySeriesTimeHelper(
time=ts4, server_name='us.east-1', other_tag='ello', some_stat=156)
current_timestamp.return_value = current_date = datetime.today()
TestSeriesHelper.MySeriesHelper(
server_name='us.east-1', other_tag='ello', some_stat=159)
TestSeriesHelper.MySeriesHelper(
server_name='us.east-1', other_tag='ello', some_stat=158)
TestSeriesHelper.MySeriesHelper(
server_name='us.east-1', other_tag='ello', some_stat=157)
TestSeriesHelper.MySeriesHelper(
server_name='us.east-1', other_tag='ello', some_stat=156)
expectation = [
{
"measurement": "events.stats.us.east-1",
Expand All @@ -104,7 +95,7 @@ def testSingleSeriesName(self):
"fields": {
"some_stat": 159
},
"time": "2016-01-02T03:04:05.678912+00:00",
"time": current_date,
},
{
"measurement": "events.stats.us.east-1",
Expand All @@ -115,7 +106,7 @@ def testSingleSeriesName(self):
"fields": {
"some_stat": 158
},
"time": "2016-10-11T01:02:03.123456789-04:00",
"time": current_date,
},
{
"measurement": "events.stats.us.east-1",
Expand All @@ -126,7 +117,7 @@ def testSingleSeriesName(self):
"fields": {
"some_stat": 157
},
"time": 1234567890123456789,
"time": current_date,
},
{
"measurement": "events.stats.us.east-1",
Expand All @@ -137,25 +128,22 @@ def testSingleSeriesName(self):
"fields": {
"some_stat": 156
},
"time": "2016-01-02T03:04:05.678912+01:00",
"time": current_date,
}
]

rcvd = TestSeriesHelper.MySeriesTimeHelper._json_body_()
rcvd = TestSeriesHelper.MySeriesHelper._json_body_()
self.assertTrue(all([el in expectation for el in rcvd]) and
all([el in rcvd for el in expectation]),
'Invalid JSON body of time series returned from '
'_json_body_ for one series name: {0}.'.format(rcvd))
TestSeriesHelper.MySeriesTimeHelper._reset_()
self.assertEqual(
TestSeriesHelper.MySeriesTimeHelper._json_body_(),
[],
'Resetting helper did not empty datapoints.')

def testSeveralSeriesNames(self):
'''
@mock.patch('influxdb.helper.SeriesHelper._current_timestamp')
def testSeveralSeriesNames(self, current_timestamp):
"""
Tests JSON conversion when there are multiple series names.
'''
"""
current_timestamp.return_value = current_date = datetime.today()
TestSeriesHelper.MySeriesHelper(
server_name='us.east-1', some_stat=159, other_tag='ello')
TestSeriesHelper.MySeriesHelper(
Expand All @@ -173,7 +161,8 @@ def testSeveralSeriesNames(self):
'tags': {
'other_tag': 'ello',
'server_name': 'lu.lux'
}
},
"time": current_date,
},
{
'fields': {
Expand All @@ -183,7 +172,8 @@ def testSeveralSeriesNames(self):
'tags': {
'other_tag': 'ello',
'server_name': 'uk.london'
}
},
"time": current_date,
},
{
'fields': {
Expand All @@ -193,7 +183,8 @@ def testSeveralSeriesNames(self):
'tags': {
'other_tag': 'ello',
'server_name': 'fr.paris-10'
}
},
"time": current_date,
},
{
'fields': {
Expand All @@ -203,25 +194,70 @@ def testSeveralSeriesNames(self):
'tags': {
'other_tag': 'ello',
'server_name': 'us.east-1'
}
},
"time": current_date,
}
]

rcvd = TestSeriesHelper.MySeriesHelper._json_body_()
for r in rcvd:
self.assertTrue(r.get('time'),
"No time field in received JSON body.")
del(r["time"])
self.assertTrue(all([el in expectation for el in rcvd]) and
all([el in rcvd for el in expectation]),
'Invalid JSON body of time series returned from '
'_json_body_ for several series names: {0}.'
.format(rcvd))
TestSeriesHelper.MySeriesHelper._reset_()
self.assertEqual(
TestSeriesHelper.MySeriesHelper._json_body_(),
[],
'Resetting helper did not empty datapoints.')

@mock.patch('influxdb.helper.SeriesHelper._current_timestamp')
def testSeriesWithoutTimeField(self, current_timestamp):
"""
Tests that time is optional on a series without a time field.
"""
current_date = datetime.today()
yesterday = current_date - timedelta(days=1)
current_timestamp.return_value = yesterday
TestSeriesHelper.MySeriesHelper(
server_name='us.east-1', other_tag='ello',
some_stat=159, time=current_date
)
TestSeriesHelper.MySeriesHelper(
server_name='us.east-1', other_tag='ello',
some_stat=158,
)
point1, point2 = TestSeriesHelper.MySeriesHelper._json_body_()
self.assertTrue('time' in point1 and 'time' in point2)
self.assertEqual(point1['time'], current_date)
self.assertEqual(point2['time'], yesterday)

@mock.patch('influxdb.helper.SeriesHelper._current_timestamp')
def testSeriesWithTimeField(self, current_timestamp):
"""
Test that time is optional on a series with a time field.
"""
current_date = datetime.today()
yesterday = current_date - timedelta(days=1)
current_timestamp.return_value = yesterday

class MyTimeFieldSeriesHelper(SeriesHelper):

class Meta:
client = TestSeriesHelper.client
series_name = 'events.stats.{server_name}'
fields = ['some_stat', 'time']
tags = ['server_name', 'other_tag']
bulk_size = 5
autocommit = True

MyTimeFieldSeriesHelper(
server_name='us.east-1', other_tag='ello',
some_stat=159, time=current_date
)
MyTimeFieldSeriesHelper(
server_name='us.east-1', other_tag='ello',
some_stat=158,
)
point1, point2 = MyTimeFieldSeriesHelper._json_body_()
self.assertTrue('time' in point1 and 'time' in point2)
self.assertEqual(point1['time'], current_date)
self.assertEqual(point2['time'], yesterday)

def testInvalidHelpers(self):
'''
Expand Down
3 changes: 1 addition & 2 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
nose
nose-cov
mock
requests-mock
pytz
requests-mock