Skip to content

Commit 078350e

Browse files
committed
initial implementation of line protocol
as introduced in influxdata/influxdb#2696
1 parent 3e1a03c commit 078350e

File tree

3 files changed

+114
-16
lines changed

3 files changed

+114
-16
lines changed

influxdb/client.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import requests.exceptions
1212
from sys import version_info
1313

14+
from influxdb.line_protocol import make_lines
1415
from influxdb.resultset import ResultSet
1516

1617
try:
@@ -221,14 +222,7 @@ def request(self, url, method='GET', params=None, data=None,
221222
if params is None:
222223
params = {}
223224

224-
auth = {
225-
'u': self._username,
226-
'p': self._password
227-
}
228-
229-
params.update(auth)
230-
231-
if data is not None and not isinstance(data, str):
225+
if isinstance(data, dict) or isinstance(data, list):
232226
data = json.dumps(data)
233227

234228
# Try to send the request a maximum of three times. (see #103)
@@ -238,6 +232,7 @@ def request(self, url, method='GET', params=None, data=None,
238232
response = self._session.request(
239233
method=method,
240234
url=url,
235+
auth=(self._username, self._password),
241236
params=params,
242237
data=data,
243238
headers=self._headers,
@@ -270,10 +265,10 @@ def write(self, data, params=None, expected_response_code=204):
270265
:rtype: bool
271266
"""
272267
self.request(
273-
url="write",
268+
url="write_points",
274269
method='POST',
275270
params=params,
276-
data=data,
271+
data=make_lines(data).encode('utf-8'),
277272
expected_response_code=expected_response_code
278273
)
279274
return True
@@ -396,13 +391,12 @@ def _write_points(self,
396391
if tags:
397392
data['tags'] = tags
398393

399-
data['database'] = database or self._database
400-
401394
if self.use_udp:
402395
self.send_packet(data)
403396
else:
404397
self.write(
405398
data=data,
399+
params={'db': database or self._database},
406400
expected_response_code=204
407401
)
408402

@@ -679,9 +673,8 @@ def send_packet(self, packet):
679673
:param packet: the packet to be sent
680674
:type packet: dict
681675
"""
682-
data = json.dumps(packet)
683-
byte = data.encode('utf-8')
684-
self.udp_socket.sendto(byte, (self._host, self.udp_port))
676+
data = make_lines(packet).encode('utf-8')
677+
self.udp_socket.sendto(data, (self._host, self.udp_port))
685678

686679

687680
class InfluxDBClusterClient(object):

influxdb/line_protocol.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import unicode_literals
3+
4+
from copy import copy
5+
from datetime import datetime
6+
from time import mktime
7+
8+
from dateutil.parser import parse
9+
from pytz import utc
10+
from six import binary_type, text_type
11+
12+
13+
def _convert_timestamp(timestamp):
14+
if isinstance(timestamp, int):
15+
return timestamp
16+
if isinstance(_force_text(timestamp), text_type):
17+
timestamp = parse(timestamp)
18+
if isinstance(timestamp, datetime):
19+
if timestamp.tzinfo:
20+
timestamp = timestamp.astimezone(utc)
21+
timestamp.replace(tzinfo=None)
22+
return (
23+
mktime(timestamp.timetuple()) * 1e9 +
24+
timestamp.microsecond * 1e3
25+
)
26+
raise ValueError(timestamp)
27+
28+
29+
def _escape_tag(tag):
30+
return tag.replace(
31+
"\\", "\\\\"
32+
).replace(
33+
" ", "\\ "
34+
).replace(
35+
",", "\\,"
36+
).replace(
37+
"=", "\\="
38+
)
39+
40+
41+
def _escape_value(value):
42+
value = _force_text(value)
43+
if isinstance(value, text_type):
44+
return "\"{}\"".format(value.replace(
45+
"\"", "\\\""
46+
))
47+
else:
48+
return str(value)
49+
50+
51+
def _force_text(data):
52+
"""
53+
Try to return a text aka unicode object from the given data.
54+
"""
55+
if isinstance(data, binary_type):
56+
return data.decode('utf-8', 'replace')
57+
else:
58+
return data
59+
60+
61+
def make_lines(data):
62+
"""
63+
Extracts the points from the given dict and returns a Unicode string
64+
matching the line protocol introduced in InfluxDB 0.9.0.
65+
"""
66+
lines = ""
67+
static_tags = data.get('tags', None)
68+
for point in data['points']:
69+
# add measurement name
70+
lines += _escape_tag(_force_text(
71+
point.get('measurement', data.get('measurement'))
72+
)) + ","
73+
74+
# add tags
75+
if static_tags is None:
76+
tags = point.get('tags', {})
77+
else:
78+
tags = copy(static_tags)
79+
tags.update(point.get('tags', {}))
80+
# tags should be sorted client-side to take load off server
81+
for tag_key in sorted(tags.keys()):
82+
lines += "{key}={value},".format(
83+
key=_escape_tag(tag_key),
84+
value=_escape_tag(tags[tag_key]),
85+
)
86+
lines = lines[:-1] + " " # strip the trailing comma
87+
88+
# add fields
89+
for field_key in sorted(point['fields'].keys()):
90+
lines += "{key}={value},".format(
91+
key=_escape_tag(field_key),
92+
value=_escape_value(point['fields'][field_key]),
93+
)
94+
lines = lines[:-1] # strip the trailing comma
95+
96+
# add timestamp
97+
if 'timestamp' in point:
98+
lines += " " + _force_text(str(int(
99+
_convert_timestamp(point['timestamp'])
100+
)))
101+
102+
lines += "\n"
103+
return lines

requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1+
python-dateutil>=2.0.0
2+
pytz
13
requests>=1.0.3
2-
six==1.9.0
4+
six==1.9.0

0 commit comments

Comments
 (0)