11
11
import requests .exceptions
12
12
from sys import version_info
13
13
14
+ from influxdb .line_protocol import make_lines
14
15
from influxdb .resultset import ResultSet
15
16
from .exceptions import InfluxDBClientError
16
17
from .exceptions import InfluxDBServerError
@@ -96,7 +97,8 @@ def __init__(self,
96
97
97
98
self ._headers = {
98
99
'Content-type' : 'application/json' ,
99
- 'Accept' : 'text/plain' }
100
+ 'Accept' : 'text/plain'
101
+ }
100
102
101
103
@staticmethod
102
104
def from_DSN (dsn , ** kwargs ):
@@ -182,7 +184,7 @@ def switch_user(self, username, password):
182
184
self ._password = password
183
185
184
186
def request (self , url , method = 'GET' , params = None , data = None ,
185
- expected_response_code = 200 ):
187
+ expected_response_code = 200 , headers = None ):
186
188
"""Make a HTTP request to the InfluxDB API.
187
189
188
190
:param url: the path of the HTTP request, e.g. write, query, etc.
@@ -203,17 +205,13 @@ def request(self, url, method='GET', params=None, data=None,
203
205
"""
204
206
url = "{0}/{1}" .format (self ._baseurl , url )
205
207
208
+ if headers is None :
209
+ headers = self ._headers
210
+
206
211
if params is None :
207
212
params = {}
208
213
209
- auth = {
210
- 'u' : self ._username ,
211
- 'p' : self ._password
212
- }
213
-
214
- params .update (auth )
215
-
216
- if data is not None and not isinstance (data , str ):
214
+ if isinstance (data , (dict , list )):
217
215
data = json .dumps (data )
218
216
219
217
# Try to send the request a maximum of three times. (see #103)
@@ -223,9 +221,10 @@ def request(self, url, method='GET', params=None, data=None,
223
221
response = self ._session .request (
224
222
method = method ,
225
223
url = url ,
224
+ auth = (self ._username , self ._password ),
226
225
params = params ,
227
226
data = data ,
228
- headers = self . _headers ,
227
+ headers = headers ,
229
228
verify = self ._verify_ssl ,
230
229
timeout = self ._timeout
231
230
)
@@ -254,18 +253,24 @@ def write(self, data, params=None, expected_response_code=204):
254
253
:returns: True, if the write operation is successful
255
254
:rtype: bool
256
255
"""
256
+
257
+ headers = self ._headers
258
+ headers ['Content-type' ] = 'application/octet-stream'
259
+
257
260
self .request (
258
261
url = "write" ,
259
262
method = 'POST' ,
260
263
params = params ,
261
- data = data ,
262
- expected_response_code = expected_response_code
264
+ data = make_lines (data ).encode ('utf-8' ),
265
+ expected_response_code = expected_response_code ,
266
+ headers = headers
263
267
)
264
268
return True
265
269
266
270
def query (self ,
267
271
query ,
268
272
params = {},
273
+ epoch = None ,
269
274
expected_response_code = 200 ,
270
275
database = None ,
271
276
raise_errors = True ):
@@ -294,6 +299,9 @@ def query(self,
294
299
params ['q' ] = query
295
300
params ['db' ] = database or self ._database
296
301
302
+ if epoch is not None :
303
+ params ['epoch' ] = epoch
304
+
297
305
response = self .request (
298
306
url = "query" ,
299
307
method = 'GET' ,
@@ -391,22 +399,25 @@ def _write_points(self,
391
399
'points' : points
392
400
}
393
401
394
- if time_precision :
395
- data ['precision ' ] = time_precision
402
+ if tags is not None :
403
+ data ['tags ' ] = tags
396
404
397
- if retention_policy :
398
- data ['retentionPolicy' ] = retention_policy
405
+ params = {
406
+ 'db' : database or self ._database
407
+ }
399
408
400
- if tags :
401
- data [ 'tags ' ] = tags
409
+ if time_precision is not None :
410
+ params [ 'precision ' ] = time_precision
402
411
403
- data ['database' ] = database or self ._database
412
+ if retention_policy is not None :
413
+ params ['rp' ] = retention_policy
404
414
405
415
if self .use_udp :
406
416
self .send_packet (data )
407
417
else :
408
418
self .write (
409
419
data = data ,
420
+ params = params ,
410
421
expected_response_code = 204
411
422
)
412
423
@@ -578,15 +589,20 @@ def get_list_users(self):
578
589
"""
579
590
return list (self .query ("SHOW USERS" ).get_points ())
580
591
581
- def create_user (self , username , password ):
592
+ def create_user (self , username , password , admin = False ):
582
593
"""Create a new user in InfluxDB
583
594
584
595
:param username: the new username to create
585
596
:type username: str
586
597
:param password: the password for the new user
587
598
:type password: str
599
+ :param admin: whether the user should have cluster administration
600
+ privileges or not
601
+ :type admin: boolean
588
602
"""
589
603
text = "CREATE USER {} WITH PASSWORD '{}'" .format (username , password )
604
+ if admin :
605
+ text += ' WITH ALL PRIVILEGES'
590
606
self .query (text )
591
607
592
608
def drop_user (self , username ):
@@ -609,29 +625,27 @@ def set_user_password(self, username, password):
609
625
text = "SET PASSWORD FOR {} = '{}'" .format (username , password )
610
626
self .query (text )
611
627
612
- def delete_series (self , id , database = None ):
613
- """Delete series from a database.
628
+ def delete_series (self , database = None , measurement = None , tags = None ):
629
+ """Delete series from a database. Series can be filtered by
630
+ measurement and tags.
614
631
615
- :param id: the id of the series to be deleted
616
- :type id: int
632
+ :param measurement: Delete all series from a measurement
633
+ :type id: string
634
+ :param tags: Delete all series that match given tags
635
+ :type id: dict
617
636
:param database: the database from which the series should be
618
637
deleted, defaults to client's current database
619
638
:type database: str
620
639
"""
621
640
database = database or self ._database
622
- self .query ('DROP SERIES %s' % id , database = database )
623
-
624
- def grant_admin_privileges (self , username ):
625
- """Grant cluster administration privileges to an user.
626
-
627
- :param username: the username to grant privileges to
628
- :type username: str
641
+ query_str = 'DROP SERIES'
642
+ if measurement :
643
+ query_str += ' FROM "{}"' .format (measurement )
629
644
630
- .. note:: Only a cluster administrator can create/ drop databases
631
- and manage users.
632
- """
633
- text = "GRANT ALL PRIVILEGES TO {}" .format (username )
634
- self .query (text )
645
+ if tags :
646
+ query_str += ' WHERE ' + ' and ' .join (["{}='{}'" .format (k , v )
647
+ for k , v in tags .items ()])
648
+ self .query (query_str , database = database )
635
649
636
650
def revoke_admin_privileges (self , username ):
637
651
"""Revoke cluster administration privileges from an user.
@@ -683,9 +697,8 @@ def send_packet(self, packet):
683
697
:param packet: the packet to be sent
684
698
:type packet: dict
685
699
"""
686
- data = json .dumps (packet )
687
- byte = data .encode ('utf-8' )
688
- self .udp_socket .sendto (byte , (self ._host , self .udp_port ))
700
+ data = make_lines (packet ).encode ('utf-8' )
701
+ self .udp_socket .sendto (data , (self ._host , self .udp_port ))
689
702
690
703
691
704
class InfluxDBClusterClient (object ):
0 commit comments