Skip to content

Commit 54f2a2b

Browse files
committed
Merge pull request influxdata#224 from tzonghao/line-protocol-time-precision
Improve line-protocol (Thanks @tzonghao !)
2 parents a7ba64f + 57f8f75 commit 54f2a2b

File tree

5 files changed

+106
-58
lines changed

5 files changed

+106
-58
lines changed

influxdb/_dataframe_client.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def write_points(self, dataframe, measurement, tags=None,
3939
:param dataframe: data points in a DataFrame
4040
:param measurement: name of measurement
4141
:param tags: dictionary of tags, with string key-values
42-
:param time_precision: [Optional, default 's'] Either 's', 'ms', 'u'
42+
:param time_precision: [Optional, default None] Either 's', 'ms', 'u'
4343
or 'n'.
4444
:param batch_size: [Optional] Value to write the points in batches
4545
instead of all at one time. Useful for when doing data dumps from
@@ -55,15 +55,14 @@ def write_points(self, dataframe, measurement, tags=None,
5555
end_index = (batch + 1) * batch_size
5656
points = self._convert_dataframe_to_json(
5757
dataframe.ix[start_index:end_index].copy(),
58-
measurement,
59-
tags
58+
measurement, tags, time_precision
6059
)
6160
super(DataFrameClient, self).write_points(
6261
points, time_precision, database, retention_policy)
6362
return True
6463
else:
6564
points = self._convert_dataframe_to_json(
66-
dataframe, measurement, tags
65+
dataframe, measurement, tags, time_precision
6766
)
6867
super(DataFrameClient, self).write_points(
6968
points, time_precision, database, retention_policy)
@@ -116,7 +115,8 @@ def _to_dataframe(self, rs):
116115
result[key] = df
117116
return result
118117

119-
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
118+
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
119+
time_precision=None):
120120

121121
if not isinstance(dataframe, pd.DataFrame):
122122
raise TypeError('Must be DataFrame, but type was: {}.'
@@ -136,11 +136,18 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
136136
# Convert dtype for json serialization
137137
dataframe = dataframe.astype('object')
138138

139+
precision_factor = {
140+
"n": 1,
141+
"u": 1e3,
142+
"ms": 1e6,
143+
"s": 1e9
144+
}.get(time_precision, 1)
145+
139146
points = [
140147
{'measurement': measurement,
141148
'tags': tags if tags else {},
142149
'fields': rec,
143-
'time': ts.isoformat()
150+
'time': int(ts.value / precision_factor)
144151
}
145152
for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))]
146153
return points
@@ -150,8 +157,8 @@ def _datetime_to_epoch(self, datetime, time_precision='s'):
150157
if time_precision == 's':
151158
return seconds
152159
elif time_precision == 'ms':
153-
return seconds * 10 ** 3
160+
return seconds * 1e3
154161
elif time_precision == 'u':
155-
return seconds * 10 ** 6
162+
return seconds * 1e6
156163
elif time_precision == 'n':
157-
return seconds * 10 ** 9
164+
return seconds * 1e9

influxdb/client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,16 @@ def write(self, data, params=None, expected_response_code=204):
259259
headers = self._headers
260260
headers['Content-type'] = 'application/octet-stream'
261261

262+
if params:
263+
precision = params.get('precision')
264+
else:
265+
precision = None
266+
262267
self.request(
263268
url="write",
264269
method='POST',
265270
params=params,
266-
data=make_lines(data).encode('utf-8'),
271+
data=make_lines(data, precision).encode('utf-8'),
267272
expected_response_code=expected_response_code,
268273
headers=headers
269274
)

influxdb/line_protocol.py

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,28 @@
66
from datetime import datetime
77

88
from dateutil.parser import parse
9-
from pytz import utc
109
from six import binary_type, text_type
1110

1211

13-
def _convert_timestamp(timestamp):
12+
def _convert_timestamp(timestamp, precision=None):
1413
if isinstance(timestamp, int):
15-
return timestamp
14+
return timestamp # assume precision is correct if timestamp is int
1615
if isinstance(_force_text(timestamp), text_type):
1716
timestamp = parse(timestamp)
1817
if isinstance(timestamp, datetime):
19-
if timestamp.tzinfo:
20-
timestamp = timestamp.astimezone(utc)
21-
timestamp.replace(tzinfo=None)
22-
return (
23-
timegm(timestamp.timetuple()) * 1e9 +
18+
ns = (
19+
timegm(timestamp.utctimetuple()) * 1e9 +
2420
timestamp.microsecond * 1e3
2521
)
22+
if precision is None or precision == 'n':
23+
return ns
24+
elif precision == 'u':
25+
return ns / 1e3
26+
elif precision == 'ms':
27+
return ns / 1e6
28+
elif precision == 's':
29+
return ns / 1e9
30+
2631
raise ValueError(timestamp)
2732

2833

@@ -58,46 +63,56 @@ def _force_text(data):
5863
return data
5964

6065

61-
def make_lines(data):
66+
def make_lines(data, precision=None):
6267
"""
6368
Extracts the points from the given dict and returns a Unicode string
6469
matching the line protocol introduced in InfluxDB 0.9.0.
6570
"""
66-
lines = ""
71+
lines = []
6772
static_tags = data.get('tags', None)
6873
for point in data['points']:
74+
elements = []
75+
6976
# add measurement name
70-
lines += _escape_tag(_force_text(
77+
measurement = _escape_tag(_force_text(
7178
point.get('measurement', data.get('measurement'))
72-
)) + ","
79+
))
80+
key_values = [measurement]
7381

7482
# add tags
7583
if static_tags is None:
7684
tags = point.get('tags', {})
7785
else:
7886
tags = copy(static_tags)
7987
tags.update(point.get('tags', {}))
88+
8089
# tags should be sorted client-side to take load off server
8190
for tag_key in sorted(tags.keys()):
8291
key = _escape_tag(tag_key)
8392
value = _escape_tag(tags[tag_key])
8493
if key != '' and value != '':
85-
lines += "{key}={value},".format(key=key, value=value)
86-
lines = lines[:-1] + " " # strip the trailing comma
94+
key_values.append("{key}={value}".format(key=key, value=value))
95+
key_values = ','.join(key_values)
96+
elements.append(key_values)
8797

8898
# add fields
99+
field_values = []
89100
for field_key in sorted(point['fields'].keys()):
90-
lines += "{key}={value},".format(
101+
field_values.append("{key}={value}".format(
91102
key=_escape_tag(field_key),
92103
value=_escape_value(point['fields'][field_key]),
93-
)
94-
lines = lines[:-1] # strip the trailing comma
104+
))
105+
field_values = ','.join(field_values)
106+
elements.append(field_values)
95107

96108
# add timestamp
97109
if 'time' in point:
98-
lines += " " + _force_text(str(int(
99-
_convert_timestamp(point['time'])
110+
timestamp = _force_text(str(int(
111+
_convert_timestamp(point['time'], precision)
100112
)))
113+
elements.append(timestamp)
101114

102-
lines += "\n"
103-
return lines
115+
line = ' '.join(elements)
116+
lines.append(line)
117+
lines = '\n'.join(lines)
118+
return lines + '\n'

influxdb/tests/client_test.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,35 @@ def test_write_points_with_precision(self):
275275
)
276276

277277
cli = InfluxDBClient(database='db')
278-
cli.write_points(
279-
self.dummy_points,
280-
time_precision='n'
281-
)
282278

279+
cli.write_points(self.dummy_points, time_precision='n')
283280
self.assertEqual(
284281
b"cpu_load_short,host=server01,region=us-west "
285282
b"value=0.64 1257894000000000000\n",
286283
m.last_request.body,
287284
)
288285

286+
cli.write_points(self.dummy_points, time_precision='u')
287+
self.assertEqual(
288+
b"cpu_load_short,host=server01,region=us-west "
289+
b"value=0.64 1257894000000000\n",
290+
m.last_request.body,
291+
)
292+
293+
cli.write_points(self.dummy_points, time_precision='ms')
294+
self.assertEqual(
295+
b"cpu_load_short,host=server01,region=us-west "
296+
b"value=0.64 1257894000000\n",
297+
m.last_request.body,
298+
)
299+
300+
cli.write_points(self.dummy_points, time_precision='s')
301+
self.assertEqual(
302+
b"cpu_load_short,host=server01,region=us-west "
303+
b"value=0.64 1257894000\n",
304+
m.last_request.body,
305+
)
306+
289307
def test_write_points_bad_precision(self):
290308
cli = InfluxDBClient()
291309
with self.assertRaisesRegexp(

influxdb/tests/dataframe_client_test.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -119,38 +119,41 @@ def test_write_points_from_dataframe_with_time_precision(self):
119119
"http://localhost:8086/write",
120120
status_code=204)
121121

122-
points = {
123-
'database': 'db',
124-
'points': [
125-
{'time': '1970-01-01T00:00:00+00:00',
126-
'fields': {
127-
'column_one': '1',
128-
'column_three': 1.0,
129-
'column_two': 1},
130-
'tags': {},
131-
'measurement': 'foo'},
132-
{'time': '1970-01-01T01:00:00+00:00',
133-
'fields': {
134-
'column_one': '2',
135-
'column_three': 2.0,
136-
'column_two': 2},
137-
'tags': {},
138-
'measurement': 'foo'}]
139-
}
140-
141122
cli = DataFrameClient(database='db')
142123
measurement = "foo"
143124

144125
cli.write_points(dataframe, measurement, time_precision='s')
145126
self.assertEqual(m.last_request.qs['precision'], ['s'])
127+
self.assertEqual(
128+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
129+
b'column_one="2",column_three=2.0,column_two=2 3600\n',
130+
m.last_request.body,
131+
)
146132

147-
cli.write_points(dataframe, measurement, time_precision='m')
148-
points.update(precision='m')
149-
self.assertEqual(m.last_request.qs['precision'], ['m'])
133+
cli.write_points(dataframe, measurement, time_precision='ms')
134+
self.assertEqual(m.last_request.qs['precision'], ['ms'])
135+
self.assertEqual(
136+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
137+
b'column_one="2",column_three=2.0,column_two=2 3600000\n',
138+
m.last_request.body,
139+
)
150140

151141
cli.write_points(dataframe, measurement, time_precision='u')
152-
points.update(precision='u')
153142
self.assertEqual(m.last_request.qs['precision'], ['u'])
143+
self.assertEqual(
144+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
145+
b'column_one="2",column_three=2.0,column_two=2 3600000000\n',
146+
m.last_request.body,
147+
)
148+
149+
cli.write_points(dataframe, measurement, time_precision='n')
150+
self.assertEqual(m.last_request.qs['precision'], ['n'])
151+
self.assertEqual(
152+
b'foo column_one="1",column_three=1.0,column_two=1 0\n'
153+
b'foo column_one="2",column_three=2.0,column_two=2 '
154+
b'3600000000000\n',
155+
m.last_request.body,
156+
)
154157

155158
@raises(TypeError)
156159
def test_write_points_from_dataframe_fails_without_time_index(self):

0 commit comments

Comments
 (0)