Skip to content

Commit c25ec08

Browse files
committed
Allow setting the time of a point manually.
Point can be specified as either a number of nanoseconds, a python datetime object (with or without timezone) or a string in ISO datetime format. If a time is not specified, the Helper sets the time at the time of assembling the point fields so that multiple unique points with the same tags can be committed simultaneously without them failing to add due to all being assigned the same automatic time by the InfluxDB server. This fix is based upon the discussion in influxdata#130 but also includes the outstanding items for it to be merged. I'm happy to receive suggestions for further ways to add test coverage to this change. This also fixes influxdata#264 and fixes influxdata#259.
1 parent 1e8e3dd commit c25ec08

File tree

3 files changed

+69
-14
lines changed

3 files changed

+69
-14
lines changed

influxdb/helper.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
Helper class for InfluxDB
44
"""
55
from collections import namedtuple, defaultdict
6+
from datetime import datetime
67
from warnings import warn
78

89
import six
@@ -16,6 +17,16 @@ class SeriesHelper(object):
1617
Each subclass can write to its own database.
1718
The time series names can also be based on one or more defined fields.
1819
20+
A field "time" can be used to write data points at a specific time,
21+
rather than the default current time. The time field can take any of
22+
the following forms:
23+
* An integer unix timestamp in nanoseconds, assumed to be in UTC.
24+
* A string in the ISO time format, including a timezone.
25+
* A naive python datetime, which will be treated as UTC.
26+
* A localized python datetime, which will use the chosen timezone.
27+
If no time field is provided, the current UTC system time in microseconds
28+
at the time of assembling the point data will be used.
29+
1930
Annotated example::
2031
2132
class MySeriesHelper(SeriesHelper):
@@ -142,8 +153,23 @@ def _json_body_(cls):
142153
"tags": {},
143154
}
144155

156+
ts = getattr(point, 'time', None)
157+
if not ts:
158+
# No time provided. Use current UTC time.
159+
ts = datetime.utcnow().isoformat() + "+00:00"
160+
elif isinstance(ts, datetime):
161+
if ts.tzinfo is None or ts.tzinfo.utcoffset(ts) is None:
162+
# Assuming naive datetime provided. Format with UTC tz.
163+
ts = ts.isoformat() + "+00:00"
164+
else:
165+
# Assuming localized datetime provided.
166+
ts = ts.isoformat()
167+
# Neither of the above match. Assuming correct string or int.
168+
json_point['time'] = ts
169+
145170
for field in cls._fields:
146-
json_point['fields'][field] = getattr(point, field)
171+
if field != 'time':
172+
json_point['fields'][field] = getattr(point, field)
147173

148174
for tag in cls._tags:
149175
json_point['tags'][tag] = getattr(point, tag)

influxdb/tests/helper_test.py

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# -*- coding: utf-8 -*-
22

3+
import datetime
4+
import pytz
35
import sys
46
if sys.version_info < (2, 7):
57
import unittest2 as unittest
@@ -38,6 +40,18 @@ class Meta:
3840

3941
TestSeriesHelper.MySeriesHelper = MySeriesHelper
4042

43+
class MySeriesTimeHelper(SeriesHelper):
44+
45+
class Meta:
46+
client = TestSeriesHelper.client
47+
series_name = 'events.stats.{server_name}'
48+
fields = ['time', 'some_stat']
49+
tags = ['server_name', 'other_tag']
50+
bulk_size = 5
51+
autocommit = True
52+
53+
TestSeriesHelper.MySeriesTimeHelper = MySeriesTimeHelper
54+
4155
def test_auto_commit(self):
4256
"""
4357
Tests that write_points is called after the right number of events
@@ -66,14 +80,20 @@ def testSingleSeriesName(self):
6680
"""
6781
Tests JSON conversion when there is only one series name.
6882
"""
69-
TestSeriesHelper.MySeriesHelper(
70-
server_name='us.east-1', other_tag='ello', some_stat=159)
71-
TestSeriesHelper.MySeriesHelper(
72-
server_name='us.east-1', other_tag='ello', some_stat=158)
73-
TestSeriesHelper.MySeriesHelper(
74-
server_name='us.east-1', other_tag='ello', some_stat=157)
75-
TestSeriesHelper.MySeriesHelper(
76-
server_name='us.east-1', other_tag='ello', some_stat=156)
83+
dt = datetime.datetime(2016, 1, 2, 3, 4, 5, 678912)
84+
ts1 = dt
85+
ts2 = "2016-10-11T01:02:03.123456789-04:00"
86+
ts3 = 1234567890123456789
87+
ts4 = pytz.timezone("Europe/Berlin").localize(dt)
88+
89+
TestSeriesHelper.MySeriesTimeHelper(
90+
time=ts1, server_name='us.east-1', other_tag='ello', some_stat=159)
91+
TestSeriesHelper.MySeriesTimeHelper(
92+
time=ts2, server_name='us.east-1', other_tag='ello', some_stat=158)
93+
TestSeriesHelper.MySeriesTimeHelper(
94+
time=ts3, server_name='us.east-1', other_tag='ello', some_stat=157)
95+
TestSeriesHelper.MySeriesTimeHelper(
96+
time=ts4, server_name='us.east-1', other_tag='ello', some_stat=156)
7797
expectation = [
7898
{
7999
"measurement": "events.stats.us.east-1",
@@ -84,6 +104,7 @@ def testSingleSeriesName(self):
84104
"fields": {
85105
"some_stat": 159
86106
},
107+
"time": "2016-01-02T03:04:05.678912+00:00",
87108
},
88109
{
89110
"measurement": "events.stats.us.east-1",
@@ -94,6 +115,7 @@ def testSingleSeriesName(self):
94115
"fields": {
95116
"some_stat": 158
96117
},
118+
"time": "2016-10-11T01:02:03.123456789-04:00",
97119
},
98120
{
99121
"measurement": "events.stats.us.east-1",
@@ -104,6 +126,7 @@ def testSingleSeriesName(self):
104126
"fields": {
105127
"some_stat": 157
106128
},
129+
"time": 1234567890123456789,
107130
},
108131
{
109132
"measurement": "events.stats.us.east-1",
@@ -114,23 +137,24 @@ def testSingleSeriesName(self):
114137
"fields": {
115138
"some_stat": 156
116139
},
140+
"time": "2016-01-02T03:04:05.678912+01:00",
117141
}
118142
]
119143

120-
rcvd = TestSeriesHelper.MySeriesHelper._json_body_()
144+
rcvd = TestSeriesHelper.MySeriesTimeHelper._json_body_()
121145
self.assertTrue(all([el in expectation for el in rcvd]) and
122146
all([el in rcvd for el in expectation]),
123147
'Invalid JSON body of time series returned from '
124148
'_json_body_ for one series name: {0}.'.format(rcvd))
125-
TestSeriesHelper.MySeriesHelper._reset_()
149+
TestSeriesHelper.MySeriesTimeHelper._reset_()
126150
self.assertEqual(
127-
TestSeriesHelper.MySeriesHelper._json_body_(),
151+
TestSeriesHelper.MySeriesTimeHelper._json_body_(),
128152
[],
129153
'Resetting helper did not empty datapoints.')
130154

131155
def testSeveralSeriesNames(self):
132156
'''
133-
Tests JSON conversion when there is only one series name.
157+
Tests JSON conversion when there are multiple series names.
134158
'''
135159
TestSeriesHelper.MySeriesHelper(
136160
server_name='us.east-1', some_stat=159, other_tag='ello')
@@ -184,6 +208,10 @@ def testSeveralSeriesNames(self):
184208
]
185209

186210
rcvd = TestSeriesHelper.MySeriesHelper._json_body_()
211+
for r in rcvd:
212+
self.assertTrue(r.get('time'),
213+
"No time field in received JSON body.")
214+
del(r["time"])
187215
self.assertTrue(all([el in expectation for el in rcvd]) and
188216
all([el in rcvd for el in expectation]),
189217
'Invalid JSON body of time series returned from '

test-requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
nose
22
nose-cov
33
mock
4-
requests-mock
4+
requests-mock
5+
pytz

0 commit comments

Comments
 (0)