Skip to content

Commit 815f7f9

Browse files
committed
Merge pull request influxdata#211 from influxdb/line-protocol
Line protocol
2 parents 3425780 + 1775b24 commit 815f7f9

8 files changed

+242
-311
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ env:
1212
install:
1313
- sudo pip install tox
1414
- sudo pip install coveralls
15-
- wget http://get.influxdb.org/influxdb_0.9.0_amd64.deb && sudo dpkg -i influxdb_0.9.0_amd64.deb
15+
- wget https://s3.amazonaws.com/influxdb/influxdb_0.9.1_amd64.deb && sudo dpkg -i influxdb_0.9.1_amd64.deb
1616
script:
1717
- travis_wait 30 tox -e $TOX_ENV
1818
after_success:

influxdb/client.py

100755100644
Lines changed: 53 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import requests.exceptions
1212
from sys import version_info
1313

14+
from influxdb.line_protocol import make_lines
1415
from influxdb.resultset import ResultSet
1516
from .exceptions import InfluxDBClientError
1617
from .exceptions import InfluxDBServerError
@@ -96,7 +97,8 @@ def __init__(self,
9697

9798
self._headers = {
9899
'Content-type': 'application/json',
99-
'Accept': 'text/plain'}
100+
'Accept': 'text/plain'
101+
}
100102

101103
@staticmethod
102104
def from_DSN(dsn, **kwargs):
@@ -182,7 +184,7 @@ def switch_user(self, username, password):
182184
self._password = password
183185

184186
def request(self, url, method='GET', params=None, data=None,
185-
expected_response_code=200):
187+
expected_response_code=200, headers=None):
186188
"""Make a HTTP request to the InfluxDB API.
187189
188190
:param url: the path of the HTTP request, e.g. write, query, etc.
@@ -203,17 +205,13 @@ def request(self, url, method='GET', params=None, data=None,
203205
"""
204206
url = "{0}/{1}".format(self._baseurl, url)
205207

208+
if headers is None:
209+
headers = self._headers
210+
206211
if params is None:
207212
params = {}
208213

209-
auth = {
210-
'u': self._username,
211-
'p': self._password
212-
}
213-
214-
params.update(auth)
215-
216-
if data is not None and not isinstance(data, str):
214+
if isinstance(data, (dict, list)):
217215
data = json.dumps(data)
218216

219217
# Try to send the request a maximum of three times. (see #103)
@@ -223,9 +221,10 @@ def request(self, url, method='GET', params=None, data=None,
223221
response = self._session.request(
224222
method=method,
225223
url=url,
224+
auth=(self._username, self._password),
226225
params=params,
227226
data=data,
228-
headers=self._headers,
227+
headers=headers,
229228
verify=self._verify_ssl,
230229
timeout=self._timeout
231230
)
@@ -254,18 +253,24 @@ def write(self, data, params=None, expected_response_code=204):
254253
:returns: True, if the write operation is successful
255254
:rtype: bool
256255
"""
256+
257+
headers = self._headers
258+
headers['Content-type'] = 'application/octet-stream'
259+
257260
self.request(
258261
url="write",
259262
method='POST',
260263
params=params,
261-
data=data,
262-
expected_response_code=expected_response_code
264+
data=make_lines(data).encode('utf-8'),
265+
expected_response_code=expected_response_code,
266+
headers=headers
263267
)
264268
return True
265269

266270
def query(self,
267271
query,
268272
params={},
273+
epoch=None,
269274
expected_response_code=200,
270275
database=None,
271276
raise_errors=True):
@@ -294,6 +299,9 @@ def query(self,
294299
params['q'] = query
295300
params['db'] = database or self._database
296301

302+
if epoch is not None:
303+
params['epoch'] = epoch
304+
297305
response = self.request(
298306
url="query",
299307
method='GET',
@@ -391,22 +399,25 @@ def _write_points(self,
391399
'points': points
392400
}
393401

394-
if time_precision:
395-
data['precision'] = time_precision
402+
if tags is not None:
403+
data['tags'] = tags
396404

397-
if retention_policy:
398-
data['retentionPolicy'] = retention_policy
405+
params = {
406+
'db': database or self._database
407+
}
399408

400-
if tags:
401-
data['tags'] = tags
409+
if time_precision is not None:
410+
params['precision'] = time_precision
402411

403-
data['database'] = database or self._database
412+
if retention_policy is not None:
413+
params['rp'] = retention_policy
404414

405415
if self.use_udp:
406416
self.send_packet(data)
407417
else:
408418
self.write(
409419
data=data,
420+
params=params,
410421
expected_response_code=204
411422
)
412423

@@ -578,15 +589,20 @@ def get_list_users(self):
578589
"""
579590
return list(self.query("SHOW USERS").get_points())
580591

581-
def create_user(self, username, password):
592+
def create_user(self, username, password, admin=False):
582593
"""Create a new user in InfluxDB
583594
584595
:param username: the new username to create
585596
:type username: str
586597
:param password: the password for the new user
587598
:type password: str
599+
:param admin: whether the user should have cluster administration
600+
privileges or not
601+
:type admin: boolean
588602
"""
589603
text = "CREATE USER {} WITH PASSWORD '{}'".format(username, password)
604+
if admin:
605+
text += ' WITH ALL PRIVILEGES'
590606
self.query(text)
591607

592608
def drop_user(self, username):
@@ -609,29 +625,27 @@ def set_user_password(self, username, password):
609625
text = "SET PASSWORD FOR {} = '{}'".format(username, password)
610626
self.query(text)
611627

612-
def delete_series(self, id, database=None):
613-
"""Delete series from a database.
628+
def delete_series(self, database=None, measurement=None, tags=None):
629+
"""Delete series from a database. Series can be filtered by
630+
measurement and tags.
614631
615-
:param id: the id of the series to be deleted
616-
:type id: int
632+
:param measurement: Delete all series from a measurement
633+
:type id: string
634+
:param tags: Delete all series that match given tags
635+
:type id: dict
617636
:param database: the database from which the series should be
618637
deleted, defaults to client's current database
619638
:type database: str
620639
"""
621640
database = database or self._database
622-
self.query('DROP SERIES %s' % id, database=database)
623-
624-
def grant_admin_privileges(self, username):
625-
"""Grant cluster administration privileges to an user.
626-
627-
:param username: the username to grant privileges to
628-
:type username: str
641+
query_str = 'DROP SERIES'
642+
if measurement:
643+
query_str += ' FROM "{}"'.format(measurement)
629644

630-
.. note:: Only a cluster administrator can create/ drop databases
631-
and manage users.
632-
"""
633-
text = "GRANT ALL PRIVILEGES TO {}".format(username)
634-
self.query(text)
645+
if tags:
646+
query_str += ' WHERE ' + ' and '.join(["{}='{}'".format(k, v)
647+
for k, v in tags.items()])
648+
self.query(query_str, database=database)
635649

636650
def revoke_admin_privileges(self, username):
637651
"""Revoke cluster administration privileges from an user.
@@ -683,9 +697,8 @@ def send_packet(self, packet):
683697
:param packet: the packet to be sent
684698
:type packet: dict
685699
"""
686-
data = json.dumps(packet)
687-
byte = data.encode('utf-8')
688-
self.udp_socket.sendto(byte, (self._host, self.udp_port))
700+
data = make_lines(packet).encode('utf-8')
701+
self.udp_socket.sendto(data, (self._host, self.udp_port))
689702

690703

691704
class InfluxDBClusterClient(object):

influxdb/line_protocol.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import unicode_literals
3+
4+
from calendar import timegm
5+
from copy import copy
6+
from datetime import datetime
7+
8+
from dateutil.parser import parse
9+
from pytz import utc
10+
from six import binary_type, text_type
11+
12+
13+
def _convert_timestamp(timestamp):
14+
if isinstance(timestamp, int):
15+
return timestamp
16+
if isinstance(_force_text(timestamp), text_type):
17+
timestamp = parse(timestamp)
18+
if isinstance(timestamp, datetime):
19+
if timestamp.tzinfo:
20+
timestamp = timestamp.astimezone(utc)
21+
timestamp.replace(tzinfo=None)
22+
return (
23+
timegm(timestamp.timetuple()) * 1e9 +
24+
timestamp.microsecond * 1e3
25+
)
26+
raise ValueError(timestamp)
27+
28+
29+
def _escape_tag(tag):
30+
return tag.replace(
31+
"\\", "\\\\"
32+
).replace(
33+
" ", "\\ "
34+
).replace(
35+
",", "\\,"
36+
).replace(
37+
"=", "\\="
38+
)
39+
40+
41+
def _escape_value(value):
42+
value = _force_text(value)
43+
if isinstance(value, text_type):
44+
return "\"{}\"".format(value.replace(
45+
"\"", "\\\""
46+
))
47+
else:
48+
return str(value)
49+
50+
51+
def _force_text(data):
52+
"""
53+
Try to return a text aka unicode object from the given data.
54+
"""
55+
if isinstance(data, binary_type):
56+
return data.decode('utf-8')
57+
else:
58+
return data
59+
60+
61+
def make_lines(data):
62+
"""
63+
Extracts the points from the given dict and returns a Unicode string
64+
matching the line protocol introduced in InfluxDB 0.9.0.
65+
"""
66+
lines = ""
67+
static_tags = data.get('tags', None)
68+
for point in data['points']:
69+
# add measurement name
70+
lines += _escape_tag(_force_text(
71+
point.get('measurement', data.get('measurement'))
72+
)) + ","
73+
74+
# add tags
75+
if static_tags is None:
76+
tags = point.get('tags', {})
77+
else:
78+
tags = copy(static_tags)
79+
tags.update(point.get('tags', {}))
80+
# tags should be sorted client-side to take load off server
81+
for tag_key in sorted(tags.keys()):
82+
lines += "{key}={value},".format(
83+
key=_escape_tag(tag_key),
84+
value=_escape_tag(tags[tag_key]),
85+
)
86+
lines = lines[:-1] + " " # strip the trailing comma
87+
88+
# add fields
89+
for field_key in sorted(point['fields'].keys()):
90+
lines += "{key}={value},".format(
91+
key=_escape_tag(field_key),
92+
value=_escape_value(point['fields'][field_key]),
93+
)
94+
lines = lines[:-1] # strip the trailing comma
95+
96+
# add timestamp
97+
if 'time' in point:
98+
lines += " " + _force_text(str(int(
99+
_convert_timestamp(point['time'])
100+
)))
101+
102+
lines += "\n"
103+
return lines

requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1+
python-dateutil>=2.0.0
2+
pytz
13
requests>=1.0.3
2-
six==1.9.0
4+
six==1.9.0

0 commit comments

Comments
 (0)