diff --git a/influxdb/__init__.py b/influxdb/__init__.py index 4d92aba9..bd0205b6 100644 --- a/influxdb/__init__.py +++ b/influxdb/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -from influxdb.client import InfluxDBClient +from influxdb.client import InfluxDBClient, InfluxDBClientError __all__ = ['InfluxDBClient'] diff --git a/influxdb/client.py b/influxdb/client.py index 1f3892a0..8ab8f809 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -14,6 +14,9 @@ def __init__(self, message, code): self.message = message self.code = code + def __str__(self): + return "{}: {}".format(self.code, self.message) + class InfluxDBClient(object): """ @@ -34,58 +37,34 @@ def __init__(self, """ Initialize client """ - self._host = host - self._port = port - self._username = username - self._password = password - self._database = database - self._timeout = timeout + self.host = host + self.port = port + self.username = username + self.password = password + self.database = database + self.timeout = timeout - self._verify_ssl = verify_ssl + self.verify_ssl = verify_ssl self.use_udp = use_udp self.udp_port = udp_port if use_udp: self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self._scheme = "http" + self.scheme = "http" if ssl is True: - self._scheme = "https" + self.scheme = "https" self._baseurl = "{0}://{1}:{2}".format( - self._scheme, - self._host, - self._port) + self.scheme, + self.host, + self.port) self._headers = { 'Content-type': 'application/json', 'Accept': 'text/plain'} - # Change member variables - - def switch_db(self, database): - """ - Change client database - - Parameters - ---------- - database : string - """ - self._database = database - - def switch_user(self, username, password): - """ - Change client username - - Parameters - ---------- - username : string - password : string - """ - self._username = username - self._password = password - def request(self, url, method='GET', params=None, data=None, status_code=200): """ @@ -96,12 +75,12 @@ def request(self, url, method='GET', params=None, data=None, if params is None: params = {} - auth = { + args = { 'u': self._username, 'p': self._password } - params.update(auth) + args.update(params) if data is not None and not isinstance(data, str): data = json.dumps(data) @@ -109,30 +88,25 @@ def request(self, url, method='GET', params=None, data=None, response = session.request( method=method, url=url, - params=params, + params=args, data=data, headers=self._headers, verify=self._verify_ssl, timeout=self._timeout ) - if response.status_code == status_code: - return response + if response.status_code != status_code: + raise InfluxDBClientError(response.content, + response.status_code) - else: - error = InfluxDBClientError( - "{0}: {1}".format(response.status_code, response.content), - response.status_code - ) - raise error + return response - # Writing Data - # - # Assuming you have a database named foo_production you can write data - # by doing a POST to /db/foo_production/series?u=some_user&p=some_password - # with a JSON body of points. + def send_packet(self, packet): + data = json.dumps(packet) + byte = data.encode('utf-8') + self.udp_socket.sendto(byte, (self._host, self.udp_port)) - def write_points(self, *args, **kwargs): + def write_points(self, data, batch_size=None, time_precision="s"): """ Write to multiple time series names @@ -143,48 +117,23 @@ def write_points(self, *args, **kwargs): Useful for when doing data dumps from one database to another or when doing a massive write operation """ + assert time_precision in "smu" - def list_chunks(l, n): - """ Yield successive n-sized chunks from l. - """ - for i in xrange(0, len(l), n): - yield l[i:i+n] - - batch_size = kwargs.get('batch_size') if batch_size: - for data in kwargs.get('data'): - name = data.get('name') - columns = data.get('columns') - point_list = data.get('points') - - for batch in list_chunks(point_list, batch_size): - data = [{ - "points": batch, - "name": name, - "columns": columns + for dataset in data: + points = dataset["points"] + for i in xrange(0, len(points), batch_size): + batch = [{ + "name": dataset["name"], + "columns": dataset["columns"], + "points": points[i:i+n], }] - time_precision = kwargs.get('time_precision', 's') - self.write_points_with_precision( - data=data, + self.write_points(data=batch, time_precision=time_precision) - - return True - - return self.write_points_with_precision(*args, **kwargs) - - def write_points_with_precision(self, data, time_precision='s'): - """ - Write to multiple time series names - """ - if time_precision not in ['s', 'm', 'u']: - raise Exception( - "Invalid time precision is given. (use 's','m' or 'u')") + return url = "db/{0}/series".format(self._database) - - params = { - 'time_precision': time_precision - } + params = {"time_precision": time_precision} if self.use_udp: self.send_packet(data) @@ -197,85 +146,18 @@ def write_points_with_precision(self, data, time_precision='s'): status_code=200 ) - return True - - # One Time Deletes - - def delete_points(self, name): - """ - Delete an entire series - """ - url = "db/{0}/series/{1}".format(self._database, name) - - self.request( - url=url, - method='DELETE', - status_code=204 - ) - - return True - - # Regularly Scheduled Deletes - - def create_scheduled_delete(self, json_body): - """ - TODO: Create scheduled delete - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - # get list of deletes - # curl http://localhost:8086/db/site_dev/scheduled_deletes - # - # remove a regularly scheduled delete - # curl -X DELETE http://localhost:8086/db/site_dev/scheduled_deletes/:id - - def get_list_scheduled_delete(self): - """ - TODO: Get list of scheduled deletes - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def remove_scheduled_delete(self, delete_id): - """ - TODO: Remove scheduled delete - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - # Querying Data - # - # GET db/:name/series. It takes five parameters def query(self, query, time_precision='s', chunked=False): """ Quering data """ - if time_precision not in ['s', 'm', 'u']: - raise Exception( - "Invalid time precision is given. (use 's','m' or 'u')") + assert time_precision in "smu" - if chunked is True: - chunked_param = 'true' - else: - chunked_param = 'false' - - # Build the URL of the serie to query url = "db/{0}/series".format(self._database) params = { 'q': query, 'time_precision': time_precision, - 'chunked': chunked_param + 'chunked': chunked } response = self.request( @@ -293,14 +175,6 @@ def query(self, query, time_precision='s', chunked=False): return res - # Creating and Dropping Databases - # - # ### create a database - # curl -X POST http://localhost:8086/db -d '{"name": "site_development"}' - # - # ### drop a database - # curl -X DELETE http://localhost:8086/db/site_development - def create_database(self, database): """ Create a database @@ -321,8 +195,6 @@ def create_database(self, database): status_code=201 ) - return True - def delete_database(self, database): """ Drop a database @@ -340,11 +212,6 @@ def delete_database(self, database): status_code=204 ) - return True - - # ### get list of databases - # curl -X GET http://localhost:8086/db - def get_database_list(self): """ Get the list of databases @@ -379,39 +246,6 @@ def delete_series(self, series): status_code=204 ) - return True - - # Security - # get list of cluster admins - # curl http://localhost:8086/cluster_admins?u=root&p=root - - # add cluster admin - # curl -X POST http://localhost:8086/cluster_admins?u=root&p=root \ - # -d '{"name": "paul", "password": "i write teh docz"}' - - # update cluster admin password - # curl -X POST http://localhost:8086/cluster_admins/paul?u=root&p=root \ - # -d '{"password": "new pass"}' - - # delete cluster admin - # curl -X DELETE http://localhost:8086/cluster_admins/paul?u=root&p=root - - # Database admins, with a database name of site_dev - # get list of database admins - # curl http://localhost:8086/db/site_dev/admins?u=root&p=root - - # add database admin - # curl -X POST http://localhost:8086/db/site_dev/admins?u=root&p=root \ - # -d '{"name": "paul", "password": "i write teh docz"}' - - # update database admin password - # curl -X POST http://localhost:8086/db/site_dev/admins/paul?u=root&p=root\ - # -d '{"password": "new pass"}' - - # delete database admin - # curl -X DELETE \ - # http://localhost:8086/db/site_dev/admins/paul?u=root&p=root - def get_list_cluster_admins(self): """ Get list of cluster admins @@ -440,8 +274,6 @@ def add_cluster_admin(self, new_username, new_password): status_code=200 ) - return True - def update_cluster_admin_password(self, username, new_password): """ Update cluster admin password @@ -459,8 +291,6 @@ def update_cluster_admin_password(self, username, new_password): status_code=200 ) - return True - def delete_cluster_admin(self, username): """ Delete cluster admin @@ -473,8 +303,6 @@ def delete_cluster_admin(self, username): status_code=204 ) - return True - def set_database_admin(self, username): """ Set user as database admin @@ -499,66 +327,6 @@ def alter_database_admin(self, username, is_admin): status_code=200 ) - return True - - def get_list_database_admins(self): - """ - TODO: Get list of database admins - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def add_database_admin(self, new_username, new_password): - """ - TODO: Add cluster admin - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def update_database_admin_password(self, username, new_password): - """ - TODO: Update database admin password - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def delete_database_admin(self, username): - """ - TODO: Delete database admin - - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - ### - # Limiting User Access - - # Database users - # get list of database users - # curl http://localhost:8086/db/site_dev/users?u=root&p=root - - # add database user - # curl -X POST http://localhost:8086/db/site_dev/users?u=root&p=root \ - # -d '{"name": "paul", "password": "i write teh docz"}' - - # update database user password - # curl -X POST http://localhost:8086/db/site_dev/users/paul?u=root&p=root \ - # -d '{"password": "new pass"}' - - # delete database user - # curl -X DELETE http://localhost:8086/db/site_dev/users/paul?u=root&p=root - def get_database_users(self): """ Get list of database users @@ -591,8 +359,6 @@ def add_database_user(self, new_username, new_password): status_code=200 ) - return True - def update_database_user_password(self, username, new_password): """ Update password @@ -613,8 +379,6 @@ def update_database_user_password(self, username, new_password): if username == self._username: self._password = new_password - return True - def delete_database_user(self, username): """ Delete database user @@ -627,21 +391,4 @@ def delete_database_user(self, username): status_code=200 ) - return True - - # update the user by POSTing to db/site_dev/users/paul - - def update_permission(self, username, json_body): - """ - TODO: Update read/write permission - 2013-11-08: This endpoint has not been implemented yet in ver0.0.8, - but it is documented in http://influxdb.org/docs/api/http.html. - See also: src/api/http/api.go:l57 - """ - raise NotImplementedError() - - def send_packet(self, packet): - data = json.dumps(packet) - byte = data.encode('utf-8') - self.udp_socket.sendto(byte, (self._host, self.udp_port))