From 52bbaf8fb93dffd2ac486e4aa2f93a154e552514 Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Thu, 5 Jan 2023 13:13:02 -0700 Subject: [PATCH 1/6] feat: adds the ability to query with SQL This creates a new SQL Query API that allows for the use of SQL queries along with InfluxDB that has SQL enabled. Without further testing and usage it should be expected that this could break easily. The current model will take the user's URL so they do not need to provide two different URLs and create two different clients and will parse out the hostname. This is for use with the gRPC client. There are no tests as this largely wraps the FlightSQLClient library. --- README.rst | 24 +++++++++++++ examples/README.md | 9 ++--- examples/query_sql.py | 8 +++++ influxdb_client/__init__.py | 1 + influxdb_client/client/influxdb_client.py | 9 +++++ influxdb_client/client/query_sql_api.py | 44 +++++++++++++++++++++++ setup.py | 12 ++++++- 7 files changed, 102 insertions(+), 5 deletions(-) create mode 100644 examples/query_sql.py create mode 100644 influxdb_client/client/query_sql_api.py diff --git a/README.rst b/README.rst index 5e69b75e..61d2e18c 100644 --- a/README.rst +++ b/README.rst @@ -84,6 +84,7 @@ InfluxDB 2.0 client features - `Nanosecond precision`_ - `Delete data`_ - `Handling Errors`_ + - `SQL Support`_ - `Logging`_ Installation @@ -1579,6 +1580,29 @@ Client automatically follows HTTP redirects. The default redirect policy is to f .. marker-asyncio-end +SQL Support +^^^^^^^^^^^ +.. marker-sql-support-start + +The ability to query InfluxDB with SQL was introduced with the IOX backend. +To make use of the SQL support users can make use of the SQL Query API: + +.. code-block:: python + + from influxdb_client import InfluxDBClient + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client: + query_sql_api = client.query_sql_api() + reader = query_sql_api.query("my-bucket", "select * from cpu limit 10") + print(reader.read_all()) + +.. warning:: + + The ``QuerySQLApi`` only works with InfluxDB that has SQL support enabled. + This does not apply to all InfluxDB versions. + +.. marker-sql-support-end + Logging ^^^^^^^ .. marker-logging-start diff --git a/examples/README.md b/examples/README.md index 1678d00e..e3631881 100644 --- a/examples/README.md +++ b/examples/README.md @@ -10,9 +10,9 @@ - [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events - [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_) - [logging_handler.py](logging_handler.py) - How to set up a python native logging handler that writes to InfluxDB -- [import_parquet.py](import_parquet.py) - How to import [Apache Parquet](https://parquet.apache.org/) data files, - the example requires: - - manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) +- [import_parquet.py](import_parquet.py) - How to import [Apache Parquet](https://parquet.apache.org/) data files, + the example requires: + - manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) - install Apache Arrow `pip install pyarrow` dependency - [write_batching_by_bytes_count.py](write_batching_by_bytes_count.py) - How to use RxPY to prepare batches by maximum bytes count. @@ -35,6 +35,7 @@ - [influx_cloud.py](influx_cloud.py) - How to connect to InfluxDB 2 Cloud - [invokable_scripts.py](invokable_scripts.py) - How to use Invokable scripts Cloud API to create custom endpoints that query data - [bucket_schemas.py](bucket_schemas.py) - How to manage explicit bucket schemas to enforce column names, tags, fields, and data types for your data +- [query_sql.py](query_sql.py) - How to query buckets with SQL ## Others - [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8 @@ -46,4 +47,4 @@ - [asynchronous_management.py](asynchronous_management.py) - How to use asynchronous Management API - [asynchronous_batching.py](asynchronous_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches - [asynchronous_retry.py](asynchronous_retry.py) - How to use [aiohttp-retry](https://github.com/inyutin/aiohttp_retry) to configure retries - + diff --git a/examples/query_sql.py b/examples/query_sql.py new file mode 100644 index 00000000..1de67a86 --- /dev/null +++ b/examples/query_sql.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 +"""Demonstrate how to query data from InfluxDB.""" +from influxdb_client import InfluxDBClient + +with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client: + query_sql_api = client.query_sql_api() + result = query_sql_api.query("my-bucket", "select * from cpu limit 10") + print(result.read_all()) diff --git a/influxdb_client/__init__.py b/influxdb_client/__init__.py index a1009511..d951c74d 100644 --- a/influxdb_client/__init__.py +++ b/influxdb_client/__init__.py @@ -384,6 +384,7 @@ from influxdb_client.client.labels_api import LabelsApi from influxdb_client.client.organizations_api import OrganizationsApi from influxdb_client.client.query_api import QueryApi +from influxdb_client.client.query_sql_api import QuerySQLApi from influxdb_client.client.tasks_api import TasksApi from influxdb_client.client.users_api import UsersApi from influxdb_client.client.write_api import WriteApi, WriteOptions diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index ca8dcb36..6bfbb4bc 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -14,6 +14,7 @@ from influxdb_client.client.labels_api import LabelsApi from influxdb_client.client.organizations_api import OrganizationsApi from influxdb_client.client.query_api import QueryApi, QueryOptions +from influxdb_client.client.query_sql_api import QuerySQLApi from influxdb_client.client.tasks_api import TasksApi from influxdb_client.client.users_api import UsersApi from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings @@ -301,6 +302,14 @@ def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi: """ return QueryApi(self, query_options) + def query_sql_api(self) -> QuerySQLApi: + """ + Create an Query SQL API instance. + + :return: Query SQL API instance + """ + return QuerySQLApi(self) + def invokable_scripts_api(self) -> InvokableScriptsApi: """ Create an InvokableScripts API instance. diff --git a/influxdb_client/client/query_sql_api.py b/influxdb_client/client/query_sql_api.py new file mode 100644 index 00000000..1c672c08 --- /dev/null +++ b/influxdb_client/client/query_sql_api.py @@ -0,0 +1,44 @@ +"""Query InfluxDB with SQL.""" +from urllib.parse import urlparse + +from influxdb_client.client._base import _BaseQueryApi + + +class QuerySQLApi(_BaseQueryApi): + """Implementation for grpc+TLS client for SQL queries.""" + + def __init__(self, influxdb_client): + """ + Initialize SQL query client. + + To complete SQL requests, a different client is + + :param influxdb_client: influxdb client + """ + super().__init__(influxdb_client=influxdb_client) + + def query(self, bucket: str, query: str): + """Execute synchronous SQL query and return result as an Arrow reader. + + :param str, bucket: the Flux query + :param str, query: the SQL query to execute + :return: Arrow reader + + .. code-block:: python + + from influxdb_client import InfluxDBClient + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + query_sql_api = client.query_sql_api() + result = query_sql_api.query("test", "select * from cpu limit 10") + + """ # noqa: E501 + from flightsql import FlightSQLClient + + client = FlightSQLClient( + host=urlparse(self._influxdb_client.url).hostname, + token=self._influxdb_client.token, + metadata={"bucket-name": bucket}, + ) + info = client.execute(query) + return client.do_get(info.endpoints[0].ticket) diff --git a/setup.py b/setup.py index 2afe906c..6e7ef344 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,10 @@ 'aiocsv>=1.2.2' ] +sql_requires = [ + 'flightsql-dbapi@git+https://github.com/influxdata/flightsql-dbapi.git@fbc9fc1618528cd442a7e22ea11663856b0ecd5d' +] + with open('README.rst', 'r') as f: # Remove `class` text role as it's not allowed on PyPI lines = [] @@ -66,7 +70,13 @@ keywords=["InfluxDB", "InfluxDB Python Client"], tests_require=test_requires, install_requires=requires, - extras_require={'extra': extra_requires, 'ciso': ciso_requires, 'async': async_requires, 'test': test_requires}, + extras_require={ + 'async': async_requires, + 'ciso': ciso_requires, + 'extra': extra_requires, + 'sql': sql_requires, + 'test': test_requires, + }, long_description_content_type="text/x-rst", packages=find_packages(exclude=('tests*',)), test_suite='tests', From 600c3edec6a309bd2efb7ad422b4967cd67ffd3b Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Fri, 13 Jan 2023 14:50:36 -0700 Subject: [PATCH 2/6] Major refactor post-review * Rename to "sql_client" remove API from class/examples * Constructor now sets up gRPC client for re-use * Constructor now takes a bucket, one client per bucket * Add close function * Add schemas fucntion * Add tables function --- README.rst | 24 +++-- examples/query_sql.py | 8 -- examples/sql_client.py | 33 +++++++ influxdb_client/__init__.py | 2 +- influxdb_client/client/_base.py | 6 ++ influxdb_client/client/influxdb_client.py | 10 +- influxdb_client/client/query_sql_api.py | 44 --------- influxdb_client/client/sql_client.py | 108 ++++++++++++++++++++++ setup.py | 3 +- 9 files changed, 171 insertions(+), 67 deletions(-) delete mode 100644 examples/query_sql.py create mode 100644 examples/sql_client.py delete mode 100644 influxdb_client/client/query_sql_api.py create mode 100644 influxdb_client/client/sql_client.py diff --git a/README.rst b/README.rst index 61d2e18c..c6918c6f 100644 --- a/README.rst +++ b/README.rst @@ -84,7 +84,7 @@ InfluxDB 2.0 client features - `Nanosecond precision`_ - `Delete data`_ - `Handling Errors`_ - - `SQL Support`_ + - `SQL client support`_ - `Logging`_ Installation @@ -114,7 +114,7 @@ The python package is hosted on `PyPI QueryApi: """ return QueryApi(self, query_options) - def query_sql_api(self) -> QuerySQLApi: + def sql_client(self, bucket: str) -> SQLClient: """ - Create an Query SQL API instance. + Create an SQL client instance. - :return: Query SQL API instance + :return: SQL client instance """ - return QuerySQLApi(self) + return SQLClient(self, bucket) def invokable_scripts_api(self) -> InvokableScriptsApi: """ diff --git a/influxdb_client/client/query_sql_api.py b/influxdb_client/client/query_sql_api.py deleted file mode 100644 index 1c672c08..00000000 --- a/influxdb_client/client/query_sql_api.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Query InfluxDB with SQL.""" -from urllib.parse import urlparse - -from influxdb_client.client._base import _BaseQueryApi - - -class QuerySQLApi(_BaseQueryApi): - """Implementation for grpc+TLS client for SQL queries.""" - - def __init__(self, influxdb_client): - """ - Initialize SQL query client. - - To complete SQL requests, a different client is - - :param influxdb_client: influxdb client - """ - super().__init__(influxdb_client=influxdb_client) - - def query(self, bucket: str, query: str): - """Execute synchronous SQL query and return result as an Arrow reader. - - :param str, bucket: the Flux query - :param str, query: the SQL query to execute - :return: Arrow reader - - .. code-block:: python - - from influxdb_client import InfluxDBClient - - with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: - query_sql_api = client.query_sql_api() - result = query_sql_api.query("test", "select * from cpu limit 10") - - """ # noqa: E501 - from flightsql import FlightSQLClient - - client = FlightSQLClient( - host=urlparse(self._influxdb_client.url).hostname, - token=self._influxdb_client.token, - metadata={"bucket-name": bucket}, - ) - info = client.execute(query) - return client.do_get(info.endpoints[0].ticket) diff --git a/influxdb_client/client/sql_client.py b/influxdb_client/client/sql_client.py new file mode 100644 index 00000000..b07f879c --- /dev/null +++ b/influxdb_client/client/sql_client.py @@ -0,0 +1,108 @@ +"""InfluxDB SQL Client.""" +from urllib.parse import urlparse + +from influxdb_client.client._base import _BaseSQLClient + + +class SQLClient(_BaseSQLClient): + """ + Implementation for gRPC+TLS client for SQL. + + This class provides basic operations for interacting with InfluxDB via SQL. + """ + + def __init__(self, influxdb_client, bucket): + """ + Initialize SQL client. + + Unlike the previous APIs, this client is is produced for a specific + bucket to query against. Queries to different buckets require different + clients. + + To complete SQL requests, a different client is used. The rest of this + client library utilizes REST requests against the published API. + However, for SQL support connections are handled over gRPC+TLS. As such + this client takes the host and client and creates a new client + connection for SQL operations. + + :param influxdb_client: influxdb client + """ + super().__init__(influxdb_client=influxdb_client) + + from flightsql import FlightSQLClient + self._client = FlightSQLClient( + host=urlparse(self._influxdb_client.url).hostname, + token=self._influxdb_client.token, + metadata={"bucket-name": bucket}, + ) + + def close(self): + """Close the client connection.""" + self._client.close() + + def query(self, query: str): + """ + Execute synchronous SQL query and return result as an Arrow reader. + + :param str, query: the SQL query to execute + :return: PyArrow RecordbatchReader + + .. code-block:: python + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + # Each connection is specific to a bucket + sql_client = client.sql_client("my-bucket") + + # The returned result is a stream of data. For large result-sets users can + # iterate through those one-by-one to avoid using large chunks of memory. + with sql_client.query("select * from cpu") as result: + for r in result: + print(r) + + # For smaller results you might want to read the results at once. You + # can do so by using the `read_all()` method. + with sql_client.query("select * from cpu limit 10") as result: + data = result.read_all() + print(data) + + # To get you data into a Pandas DataFrame use the following helper function + df = data.to_pandas() + + """ # noqa: E501 + return self._get_ticket_info(self._client.execute(query)) + + def schemas(self): + """ + Returns the schema of the specified bucket. + + :return: PyArrow Table + + .. code-block:: python + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + sql_client = client.sql_client("my-bucket") + print(sql_client.schemas()) + + """ # noqa: E501 + return self._get_ticket_info(self._client.get_db_schemas()).read_all() + + def tables(self): + """ + Return tables available from the specified bucket. + + :return: PyArrow Table + + .. code-block:: python + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + sql_client = client.sql_client("my-bucket") + print(sql_client.tables()) + + """ # noqa: E501 + return self._get_ticket_info(self._client.get_table_types()).read_all() + + def _get_ticket_info(self, flightInfo): + """Helper function to collect results from FlightInfo.""" + if len(flightInfo.endpoints) == 0: + raise ValueError("no endpoints received") + return self._client.do_get(flightInfo.endpoints[0].ticket).to_reader() diff --git a/setup.py b/setup.py index 6e7ef344..021bc941 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,8 @@ ] sql_requires = [ - 'flightsql-dbapi@git+https://github.com/influxdata/flightsql-dbapi.git@fbc9fc1618528cd442a7e22ea11663856b0ecd5d' + 'flightsql-dbapi@git+https://github.com/influxdata/flightsql-dbapi.git@fbc9fc1618528cd442a7e22ea11663856b0ecd5d', + 'pandas>=0.25.3', ] with open('README.rst', 'r') as f: From 3db88eec193119e2fbb1d5519d67dfd9bcc8e725 Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Fri, 13 Jan 2023 14:55:30 -0700 Subject: [PATCH 3/6] fix pydoc linter --- influxdb_client/client/sql_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/influxdb_client/client/sql_client.py b/influxdb_client/client/sql_client.py index b07f879c..dfff7572 100644 --- a/influxdb_client/client/sql_client.py +++ b/influxdb_client/client/sql_client.py @@ -73,7 +73,7 @@ def query(self, query: str): def schemas(self): """ - Returns the schema of the specified bucket. + Return the schema of the specified bucket. :return: PyArrow Table @@ -102,7 +102,7 @@ def tables(self): return self._get_ticket_info(self._client.get_table_types()).read_all() def _get_ticket_info(self, flightInfo): - """Helper function to collect results from FlightInfo.""" + """Collect results from FlightInfo.""" if len(flightInfo.endpoints) == 0: raise ValueError("no endpoints received") return self._client.do_get(flightInfo.endpoints[0].ticket).to_reader() From 99d55f26ac322e46beac472f4eb97a5e7a08940d Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Fri, 13 Jan 2023 14:56:07 -0700 Subject: [PATCH 4/6] fix twine/rst linter --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index c6918c6f..65aec023 100644 --- a/README.rst +++ b/README.rst @@ -1589,7 +1589,7 @@ Client automatically follows HTTP redirects. The default redirect policy is to f .. marker-asyncio-end SQL Client Support -^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^ .. marker-sql-support-start The ability to query InfluxDB with SQL was introduced with the IOX backend. From 0f69180e2bab9cbdbb7121b26f9600830aab5f58 Mon Sep 17 00:00:00 2001 From: Josh Powers Date: Tue, 17 Jan 2023 08:40:01 -0700 Subject: [PATCH 5/6] reaadme: update sql example with bucket in client --- README.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 65aec023..8c1c4a0e 100644 --- a/README.rst +++ b/README.rst @@ -1600,8 +1600,8 @@ To make use of the SQL support users can create a SQL Client with this library: from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client: - sql_client = client.sql_client() - reader = sql_client.query("my-bucket", "select * from cpu limit 10") + sql_client = client.sql_client("my-bucket") + reader = sql_client.query("select * from cpu limit 10") print(reader.read_all()) .. warning:: From 3915a4736aedb31190da1da6234d1a97f42cc2ad Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Thu, 16 Mar 2023 15:29:24 +0100 Subject: [PATCH 6/6] Improve SQL-API implementation --- influxdb_client/client/sql_client.py | 31 ++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/influxdb_client/client/sql_client.py b/influxdb_client/client/sql_client.py index dfff7572..3d72136a 100644 --- a/influxdb_client/client/sql_client.py +++ b/influxdb_client/client/sql_client.py @@ -11,7 +11,7 @@ class SQLClient(_BaseSQLClient): This class provides basic operations for interacting with InfluxDB via SQL. """ - def __init__(self, influxdb_client, bucket): + def __init__(self, influxdb_client, bucket, **kwargs): """ Initialize SQL client. @@ -30,12 +30,35 @@ def __init__(self, influxdb_client, bucket): super().__init__(influxdb_client=influxdb_client) from flightsql import FlightSQLClient + + namespace = f'{influxdb_client.org}_{bucket}' + url = urlparse(self._influxdb_client.url) + port = url.port if url.port else 443 self._client = FlightSQLClient( - host=urlparse(self._influxdb_client.url).hostname, - token=self._influxdb_client.token, - metadata={"bucket-name": bucket}, + host=url.hostname, + port=port, + metadata={ + "bucket-name": bucket, # for cloud + "iox-namespace-name": namespace, # for local instance + }, + **kwargs ) + def __enter__(self): + """ + Enter the runtime context related to this object. + + It will bind this method’s return value to the target(s) + specified in the `as` clause of the statement. + + return: self instance + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit the runtime context related to this object and close the SQLClient.""" + self.close() + def close(self): """Close the client connection.""" self._client.close()