diff --git a/CHANGELOG.md b/CHANGELOG.md index d8315ded..3470d909 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,61 @@ +## 1.49.0 [unreleased] + +### Bug Fixes + +1. [#682](https://github.com/influxdata/influxdb-client-python/pull/682): Check core types when creating Authentication instances. + +### Examples + +1. [#682](https://github.com/influxdata/influxdb-client-python/pull/682): New example for working with Authentication API. + +## 1.48.0 [2024-11-27] + +### Bug Fixes + +1. [#679](https://github.com/influxdata/influxdb-client-python/pull/679): Add note to caught errors about need to check client timeout. + +## 1.47.0 [2024-10-22] + +### Bug Fixes + +1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Adding type validation to url attribute in client object +2. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py +3. [#675](https://github.com/influxdata/influxdb-client-python/pull/675): Ensures WritePrecision in Point is preferred to `DEFAULT_PRECISION` + +## 1.46.0 [2024-09-13] + +### Bug Fixes +1. [#667](https://github.com/influxdata/influxdb-client-python/pull/667): Missing `py.typed` in distribution package + +### Examples: +1. [#664](https://github.com/influxdata/influxdb-client-python/pull/664/): Multiprocessing example uses new source of data +1. [#665](https://github.com/influxdata/influxdb-client-python/pull/665): Shows how to leverage header fields in errors returned on write. + +## 1.45.0 [2024-08-12] + +### Bug Fixes +1. [#652](https://github.com/influxdata/influxdb-client-python/pull/652): Refactor to `timezone` specific `datetime` helpers to avoid use deprecated functions +1. [#663](https://github.com/influxdata/influxdb-client-python/pull/663): Accept HTTP 201 response to write request + +## 1.44.0 [2024-06-24] + +### Features +1. [#657](https://github.com/influxdata/influxdb-client-python/pull/657): Prefer datetime.fromisoformat over dateutil.parse in Python 3.11+ +1. [#658](https://github.com/influxdata/influxdb-client-python/pull/658): Add `find_buckets_iter` function that allow iterate through all pages of buckets. + +## 1.43.0 [2024-05-17] + +### Bug Fixes +1. [#655](https://github.com/influxdata/influxdb-client-python/pull/655): Replace deprecated `urllib` calls `HTTPResponse.getheaders()` and `HTTPResponse.getheader()`. + +### Others +1. [#654](https://github.com/influxdata/influxdb-client-python/pull/654): Enable packaging type information - `py.typed` + +## 1.42.0 [2024-04-17] + +### Bug Fixes +1. [#648](https://github.com/influxdata/influxdb-client-python/pull/648): Fix `DataFrame` serialization with `NaN` values + ## 1.41.0 [2024-03-01] ### Features diff --git a/README.md b/README.md index ce78bd00..5b541dcf 100644 --- a/README.md +++ b/README.md @@ -392,7 +392,7 @@ The batching is configurable by `write_options`: | **exponential_base** | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval `retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts)`. Example for `retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5` Retry delays are random distributed values within the ranges of `[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]` | `2` | ``` python -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pandas as pd import reactivex as rx @@ -456,7 +456,7 @@ with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") """ Write Pandas DataFrame """ - _now = datetime.utcnow() + _now = datetime.now(tz=timezone.utc) _data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]], index=[_now, _now + timedelta(hours=1)], columns=["location", "water_level"]) @@ -923,7 +923,7 @@ The last step is run a python script via: `python3 influx_cloud.py`. Connect to InfluxDB 2.0 - write data and query them """ -from datetime import datetime +from datetime import datetime, timezone from influxdb_client import Point, InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS @@ -945,7 +945,7 @@ try: """ Write data by Point structure """ - point = Point(kind).tag('host', host).tag('device', device).field('value', 25.3).time(time=datetime.utcnow()) + point = Point(kind).tag('host', host).tag('device', device).field('value', 25.3).time(time=datetime.now(tz=timezone.utc)) print(f'Writing to InfluxDB cloud: {point.to_line_protocol()} ...') @@ -1313,6 +1313,13 @@ All async APIs are available via `influxdb_client.client.influxdb_client_async.I and also check to readiness of the InfluxDB via `/ping` endpoint: +The `InfluxDBClientAsync` constructor accepts a number of __configuration properties__. Most useful among these are: + +* `connection_pool_maxsize` - The total number of simultaneous connections. Defaults to `multiprocessing.cpu_count() * 5`. +* `enable_gzip` - enable gzip compression during `write` and `query` calls. Defaults to `false`. +* `proxy` - URL of an HTTP proxy to be used. +* `timeout` - The maximum number of milliseconds for handling HTTP requests from initial handshake to handling response data. This is passed directly to the underlying transport library. If large amounts of data are anticipated, for example from `query_api.query_stream(...)`, this should be increased to avoid `TimeoutError` or `CancelledError`. Defaults to 10_000 ms. + > ``` python > import asyncio > @@ -1407,7 +1414,7 @@ The `influxdb_client.client.query_api_async.QueryApiAsync` supports retrieve dat > > async def main(): > async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: -> start = datetime.utcfromtimestamp(0) +> start = datetime.fromtimestamp(0) > stop = datetime.now() > # Delete data with location = 'Prague' > successfully = await client.delete_api().delete(start=start, stop=stop, bucket="my-bucket", diff --git a/conda/meta.yaml b/conda/meta.yaml index da95cce4..af5027bd 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -1,5 +1,5 @@ {% set name = "influxdb_client" %} -{% set version = "1.40.0" %} +{% set version = "1.48.0" %} package: @@ -7,8 +7,8 @@ package: version: {{ version }} source: - url: https://files.pythonhosted.org/packages/63/11/07ed82352a28e4e8b623a487337befec77d5bd18293dcc940d769e633f82/influxdb_client-1.40.0.tar.gz - sha256: 027f970af1518479d8806f1cdf5ba20280f943e1b621c2acdbf9ca8dc9bdf1cb + url: https://files.pythonhosted.org/packages/11/47/b756380917cb4b968bd871fc006128e2cc9897fb1ab4bcf7d108f9601e78/influxdb_client-1.48.0.tar.gz + sha256: 414d5b5eff7d2b6b453f33e2826ea9872ea04a11996ba9c8604b0c1df57c8559 build: number: 0 diff --git a/examples/README.md b/examples/README.md index 1678d00e..7d3a5eea 100644 --- a/examples/README.md +++ b/examples/README.md @@ -15,6 +15,7 @@ - manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) - install Apache Arrow `pip install pyarrow` dependency - [write_batching_by_bytes_count.py](write_batching_by_bytes_count.py) - How to use RxPY to prepare batches by maximum bytes count. +- [http_error_handling.py](http_error_handling.py) - How to leverage HttpHeader information when errors are returned on write. ## Queries - [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV` @@ -27,6 +28,7 @@ - [monitoring_and_alerting.py](monitoring_and_alerting.py) - How to create the Check with Slack notification. - [task_example.py](task_example.py) - How to create a Task by API - [templates_management.py](templates_management.py) - How to use Templates and Stack API +- [authorizations.py](authorizations.py) - How to create and use authorizations. ## InfluxDB Cloud diff --git a/examples/asynchronous.py b/examples/asynchronous.py index 4205d461..ad0b876c 100644 --- a/examples/asynchronous.py +++ b/examples/asynchronous.py @@ -76,7 +76,7 @@ async def main(): Delete data """ print(f"\n------- Delete data with location = 'Prague' -------\n") - successfully = await client.delete_api().delete(start=datetime.utcfromtimestamp(0), stop=datetime.now(), + successfully = await client.delete_api().delete(start=datetime.fromtimestamp(0), stop=datetime.now(), predicate="location = \"Prague\"", bucket="my-bucket") print(f" > successfully: {successfully}") diff --git a/examples/authorizations.py b/examples/authorizations.py new file mode 100644 index 00000000..5857f624 --- /dev/null +++ b/examples/authorizations.py @@ -0,0 +1,103 @@ +import os + +from influxdb_client import InfluxDBClient, BucketRetentionRules, PermissionResource, Permission, Authorization, \ + WriteOptions +from influxdb_client.client.write_api import WriteType +from influxdb_client.rest import ApiException + +HOST_URL = os.environ.get("INFLUX_HOST") if os.environ.get("INFLUX_HOST") is not None else "http://localhost:8086" +TOKEN = os.environ.get("INFLUX_TOKEN") if os.environ.get("INFLUX_TOKEN") is not None else "my-token" +ORG = os.environ.get("INFLUX_ORG") if os.environ.get("INFLUX_ORG") is not None else "my-org" +SYS_BUCKET = os.environ.get("INFLUX_DB") if os.environ.get("INFLUX_DB") is not None else "my-bucket" +BUCKET = "special-bucket" + + +def create_auths(): + # Create authorizations with an initial client using all-access permissions + with InfluxDBClient(url=HOST_URL, token=TOKEN, org=ORG, debug=False) as globalClient: + bucket_rules = BucketRetentionRules(type="expire", every_seconds=3600) + bucket = globalClient.buckets_api().create_bucket(bucket_name=BUCKET, + retention_rules=bucket_rules, + org=ORG) + + bucket_permission_resource_r = PermissionResource(org=ORG, + org_id=bucket.org_id, + type="buckets", + id=bucket.id) + bucket_permission_resource_w = PermissionResource(org=ORG, + org_id=bucket.org_id, + type="buckets", + id=bucket.id) + read_bucket = Permission(action="read", resource=bucket_permission_resource_r) + write_bucket = Permission(action="write", resource=bucket_permission_resource_w) + permissions = [read_bucket, write_bucket] + auth_payload = Authorization(org_id=bucket.org_id, + permissions=permissions, + description="Shared bucket auth from Authorization object", + id="auth1_base") + auth_api = globalClient.authorizations_api() + # use keyword arguments + auth1 = auth_api.create_authorization(authorization=auth_payload) + # or use positional arguments + auth2 = auth_api.create_authorization(bucket.org_id, permissions) + + return auth1, auth2 + + +def try_sys_bucket(client): + print("starting to write") + + w_api = client.write_api(write_options=WriteOptions(write_type=WriteType.synchronous)) + try: + w_api.write(bucket=SYS_BUCKET, record="cpu,host=r2d2 use=3.14") + except ApiException as ae: + print(f"Write to {SYS_BUCKET} failed (as expected) due to:") + print(ae) + + +def try_restricted_bucket(client): + print("starting to write") + w_api = client.write_api(write_options=WriteOptions(write_type=WriteType.synchronous)) + + w_api.write(bucket=BUCKET, record="cpu,host=r2d2 usage=3.14") + print("written") + print("now query") + q_api = client.query_api() + query = f''' + from(bucket:"{BUCKET}") + |> range(start: -5m) + |> filter(fn: (r) => r["_measurement"] == "cpu")''' + + tables = q_api.query(query=query, org=ORG) + for table in tables: + for record in table.records: + print(record["_time"].isoformat(sep="T") + " | " + record["host"] + " | " + record["_field"] + "=" + str(record["_value"])) + + +def main(): + """ + a1 is generated using a local Authorization instance + a2 is generated using local permissions and an internally created Authorization + :return: void + """ + print("=== Setting up authorizations ===") + a1, a2 = create_auths() + + print("=== Using a1 authorization ===") + client1 = InfluxDBClient(url=HOST_URL, token=a1.token, org=ORG, debug=False) + print(" --- Try System Bucket ---") + try_sys_bucket(client1) + print(" --- Try Special Bucket ---") + try_restricted_bucket(client1) + print() + + print("=== Using a2 authorization ===") + client2 = InfluxDBClient(url=HOST_URL, token=a2.token, org=ORG, debug=False) + print(" --- Try System Bucket ---") + try_sys_bucket(client2) + print(" --- Try Special Bucket ---") + try_restricted_bucket(client2) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/buckets_management.py b/examples/buckets_management.py index cc81b58f..c2a24092 100644 --- a/examples/buckets_management.py +++ b/examples/buckets_management.py @@ -36,7 +36,7 @@ List all Buckets """ print(f"\n------- List -------\n") - buckets = buckets_api.find_buckets().buckets + buckets = buckets_api.find_buckets_iter() print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}" for bucket in buckets])) print("---") diff --git a/examples/example.py b/examples/example.py index 0082ade1..f6ac61f6 100644 --- a/examples/example.py +++ b/examples/example.py @@ -1,5 +1,5 @@ import codecs -from datetime import datetime +from datetime import datetime, timezone from influxdb_client import WritePrecision, InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS @@ -7,8 +7,8 @@ with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client: query_api = client.query_api() - p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.utcnow(), - WritePrecision.MS) + p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3) \ + .time(datetime.now(tz=timezone.utc), WritePrecision.MS) write_api = client.write_api(write_options=SYNCHRONOUS) # write using point structure diff --git a/examples/http_error_handling.py b/examples/http_error_handling.py new file mode 100644 index 00000000..c125a7ff --- /dev/null +++ b/examples/http_error_handling.py @@ -0,0 +1,126 @@ +""" +Illustrates getting header values from Errors that may occur on write. + +To test against cloud set the following environment variables: + INFLUX_URL + INFLUX_TOKEN + INFLUX_DATABASE + INFLUX_ORG + +...otherwise will run against a standard OSS endpoint. +""" +import asyncio +import os +from typing import MutableMapping + +from influxdb_client import InfluxDBClient +from influxdb_client.client.exceptions import InfluxDBError +from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync +from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.rest import ApiException + + +def get_envar(key, default): + try: + return os.environ[key] + except: + return default + + +class Config(object): + + def __init__(self): + self.url = get_envar("INFLUX_URL", "http://localhost:8086") + self.token = get_envar("INFLUX_TOKEN", "my-token") + self.bucket = get_envar("INFLUX_DATABASE", "my-bucket") + self.org = get_envar("INFLUX_ORG", "my-org") + + def __str__(self): + return (f"config:\n" + f" url: {self.url}\n" + f" token: ****redacted*****\n" + f" bucket: {self.bucket}\n" + f" org: {self.org}\n" + ) + + +# To encapsulate functions used in batch writing +class BatchCB(object): + + def success(self, conf: (str, str, str), data: str): + print(f"Write success: {conf}, data: {data}") + + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"\nBatch -> Write failed: {conf}, data: {data}, error: {exception.message}") + report_headers(exception.headers) + + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Write failed but retryable: {conf}, data: {data}, error: {exception}") + + +# simple reporter that server is available +def report_ping(ping: bool): + if not ping: + raise ValueError("InfluxDB: Failed to ping server") + else: + print("InfluxDB: ready") + + +# report some useful expected header fields +def report_headers(headers: MutableMapping[str, str]): + print(" Date: ", headers.get("Date")) + print(" X-Influxdb-Build: ", headers.get("X-Influxdb-Build")) + print(" X-Influxdb-Version: ", headers.get("X-Influxdb-Version")) # OSS version, Cloud should be None + print(" X-Platform-Error-Code: ", headers.get("X-Platform-Error-Code")) # OSS invalid, Cloud should be None + print(" Retry-After: ", headers.get("Retry-After")) # Should be None + print(" Trace-Id: ", headers.get("Trace-Id")) # OSS should be None, Cloud should return value + + +# try a write using a synchronous call +def use_sync(conf: Config): + print("Using sync") + with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client: + report_ping(client.ping()) + try: + client.write_api(write_options=SYNCHRONOUS).write(bucket=conf.bucket, record="cpu,location=G4 usage=") + except ApiException as ae: + print("\nSync -> Caught ApiException: ", ae.message) + report_headers(ae.headers) + + print("Sync write done") + + +# try a write using batch API +def use_batch(conf: Config): + print("Using batch") + with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client: + cb = BatchCB() + with client.write_api(success_callback=cb.success, + error_callback=cb.error, + retry_callback=cb.retry) as write_api: + write_api.write(bucket=conf.bucket, record="cpu,location=G9 usage=") + print("Batch write sent") + print("Batch write done") + + +# try a write using async.io +async def use_async(conf: Config): + print("Using async") + async with InfluxDBClientAsync(url=conf.url, token=conf.token, org=conf.org) as client: + report_ping(await client.ping()) + try: + await client.write_api().write(bucket=conf.bucket, record="cpu,location=G7 usage=") + except InfluxDBError as ie: + print("\nAsync -> Caught InfluxDBError: ", ie.message) + report_headers(ie.headers) + print("Async write done") + + +if __name__ == "__main__": + conf = Config() + print(conf) + use_sync(conf) + print("\n Continuing...\n") + use_batch(conf) + print("\n Continuing...\n") + asyncio.run(use_async(conf)) diff --git a/examples/import_data_set_multiprocessing.py b/examples/import_data_set_multiprocessing.py index 60de64c5..b20b6174 100644 --- a/examples/import_data_set_multiprocessing.py +++ b/examples/import_data_set_multiprocessing.py @@ -4,6 +4,7 @@ https://github.com/toddwschneider/nyc-taxi-data """ import concurrent.futures +import gzip import io import multiprocessing from collections import OrderedDict @@ -92,10 +93,10 @@ def parse_row(row: OrderedDict): return Point("taxi-trip-data") \ .tag("dispatching_base_num", row['dispatching_base_num']) \ - .tag("PULocationID", row['PULocationID']) \ - .tag("DOLocationID", row['DOLocationID']) \ + .tag("PULocationID", row['PUlocationID']) \ + .tag("DOLocationID", row['DOlocationID']) \ .tag("SR_Flag", row['SR_Flag']) \ - .field("dropoff_datetime", row['dropoff_datetime']) \ + .field("dropoff_datetime", row['dropOff_datetime']) \ .time(row['pickup_datetime']) \ .to_line_protocol() @@ -113,7 +114,7 @@ def parse_rows(rows, total_size): counter_.value += len(_parsed_rows) if counter_.value % 10_000 == 0: print('{0:8}{1}'.format(counter_.value, ' - {0:.2f} %' - .format(100 * float(progress_.value) / float(int(total_size))) if total_size else "")) + .format(float(progress_.value) / float(int(total_size))) if total_size else "")) pass queue_.put(_parsed_rows) @@ -141,80 +142,80 @@ def init_counter(counter, progress, queue): progress_ = Value('i', 0) startTime = datetime.now() - url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv" - # url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv" + url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-01.csv.gz" """ Open URL and for stream data """ response = urlopen(url) - if response.headers: - content_length = response.headers['Content-length'] - io_wrapper = ProgressTextIOWrapper(response) - io_wrapper.progress = progress_ + # we can't get content length from response because the gzip stream content length is unknown + # so we set it to this value, just for progress display + content_length = 23143223 """ - Start writer as a new process + Open GZIP stream """ - writer = InfluxDBWriter(queue_) - writer.start() + with gzip.open(response, 'rb') as stream: + io_wrapper = ProgressTextIOWrapper(stream, encoding='utf-8') + io_wrapper.progress = progress_ - """ - Create process pool for parallel encoding into LineProtocol - """ - cpu_count = multiprocessing.cpu_count() - with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter, - initargs=(counter_, progress_, queue_)) as executor: """ - Converts incoming HTTP stream into sequence of LineProtocol + Start writer as a new process """ - data = rx \ - .from_iterable(DictReader(io_wrapper)) \ - .pipe(ops.buffer_with_count(10_000), - # Parse 10_000 rows into LineProtocol on subprocess - ops.flat_map(lambda rows: executor.submit(parse_rows, rows, content_length))) + writer = InfluxDBWriter(queue_) + writer.start() """ - Write data into InfluxDB + Create process pool for parallel encoding into LineProtocol """ - data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}')) - - """ - Terminate Writer - """ - queue_.put(None) - queue_.join() + cpu_count = multiprocessing.cpu_count() + with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter, + initargs=(counter_, progress_, queue_)) as executor: + """ + Converts incoming HTTP stream into sequence of LineProtocol + """ + data = rx \ + .from_iterable(DictReader(io_wrapper)) \ + .pipe(ops.buffer_with_count(10_000), + # Parse 10_000 rows into LineProtocol on subprocess + ops.map(lambda rows: executor.submit(parse_rows, rows, content_length))) + + """ + Write data into InfluxDB + """ + data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}')) - print() - print(f'Import finished in: {datetime.now() - startTime}') - print() - - """ - Querying 10 pickups from dispatching 'B00008' - """ - query = 'from(bucket:"my-bucket")' \ - '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \ - '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \ - '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \ - '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ - '|> rename(columns: {_time: "pickup_datetime"})' \ - '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)' - - client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) - result = client.query_api().query(query=query) + """ + Terminate Writer + """ + queue_.put(None) + queue_.join() - """ - Processing results - """ - print() - print("=== Querying 10 pickups from dispatching 'B00008' ===") - print() - for table in result: - for record in table.records: - print( - f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}') + print() + print(f'Import finished in: {datetime.now() - startTime}') + print() - """ - Close client - """ - client.close() + """ + Querying 10 pickups from dispatching 'B00008' + """ + query = 'from(bucket:"my-bucket")' \ + '|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \ + '|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \ + '|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \ + '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ + '|> rename(columns: {_time: "pickup_datetime"})' \ + '|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)' + + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client: + result = client.query_api().query(query=query) + + """ + Processing results + """ + print() + print("=== Querying 10 pickups from dispatching 'B00008' ===") + print() + for table in result: + for record in table.records: + print( + f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}') diff --git a/examples/influx_cloud.py b/examples/influx_cloud.py index 6c8ed6f2..96b0fc3c 100644 --- a/examples/influx_cloud.py +++ b/examples/influx_cloud.py @@ -2,7 +2,7 @@ Connect to InfluxDB 2.0 - write data and query them """ -from datetime import datetime +from datetime import datetime, timezone from influxdb_client import Point, InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS @@ -23,7 +23,8 @@ """ Write data by Point structure """ - point = Point(kind).tag('host', host).tag('device', device).field('value', 25.3).time(time=datetime.utcnow()) + point = Point(kind).tag('host', host).tag('device', device).field('value', 25.3) \ + .time(time=datetime.now(tz=timezone.utc)) print(f'Writing to InfluxDB cloud: {point.to_line_protocol()} ...') diff --git a/examples/logging_handler.py b/examples/logging_handler.py index 08f2ae05..6f875f7b 100644 --- a/examples/logging_handler.py +++ b/examples/logging_handler.py @@ -45,7 +45,7 @@ def use_logger(): Point('my-measurement') .tag('host', 'host1') .field('temperature', 25.3) - .time(datetime.datetime.utcnow(), WritePrecision.MS) + .time(datetime.datetime.now(tz=datetime.timezone.utc), WritePrecision.MS) ) diff --git a/examples/write_structured_data.py b/examples/write_structured_data.py index 26a904f3..14a4e8ae 100644 --- a/examples/write_structured_data.py +++ b/examples/write_structured_data.py @@ -1,6 +1,6 @@ from collections import namedtuple from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS @@ -37,7 +37,7 @@ class Car: version="2021.06.05.5874", pressure=125, temperature=10, - timestamp=datetime.utcnow()) + timestamp=datetime.now(tz=timezone.utc)) print(sensor) """ diff --git a/influxdb_client/client/_base.py b/influxdb_client/client/_base.py index 8dcf75e9..d4f17901 100644 --- a/influxdb_client/client/_base.py +++ b/influxdb_client/client/_base.py @@ -53,6 +53,8 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or self.default_tags = default_tags self.conf = _Configuration() + if not isinstance(self.url, str): + raise ValueError('"url" attribute is not str instance') if self.url.endswith("/"): self.conf.host = self.url[:-1] else: diff --git a/influxdb_client/client/_pages.py b/influxdb_client/client/_pages.py new file mode 100644 index 00000000..5e418427 --- /dev/null +++ b/influxdb_client/client/_pages.py @@ -0,0 +1,66 @@ + + +class _Page: + def __init__(self, values, has_next, next_after): + self.has_next = has_next + self.values = values + self.next_after = next_after + + @staticmethod + def empty(): + return _Page([], False, None) + + @staticmethod + def initial(after): + return _Page([], True, after) + + +class _PageIterator: + def __init__(self, page: _Page, get_next_page): + self.page = page + self.get_next_page = get_next_page + + def __iter__(self): + return self + + def __next__(self): + if not self.page.values: + if self.page.has_next: + self.page = self.get_next_page(self.page) + if not self.page.values: + raise StopIteration + return self.page.values.pop(0) + + +class _Paginated: + def __init__(self, paginated_getter, pluck_page_resources_from_response): + self.paginated_getter = paginated_getter + self.pluck_page_resources_from_response = pluck_page_resources_from_response + + def find_iter(self, **kwargs): + """Iterate over resources with pagination. + + :key str org: The organization name. + :key str org_id: The organization ID. + :key str after: The last resource ID from which to seek from (but not including). + :key int limit: the maximum number of items per page + :return: resources iterator + """ + + def get_next_page(page: _Page): + return self._find_next_page(page, **kwargs) + + return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page)) + + def _find_next_page(self, page: _Page, **kwargs): + if not page.has_next: + return _Page.empty() + + kw_args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs + response = self.paginated_getter(**kw_args) + + resources = self.pluck_page_resources_from_response(response) + has_next = response.links.next is not None + last_id = resources[-1].id if resources else None + + return _Page(resources, has_next, last_id) diff --git a/influxdb_client/client/authorizations_api.py b/influxdb_client/client/authorizations_api.py index b7179b62..05be6ecd 100644 --- a/influxdb_client/client/authorizations_api.py +++ b/influxdb_client/client/authorizations_api.py @@ -11,7 +11,7 @@ def __init__(self, influxdb_client): self._influxdb_client = influxdb_client self._authorizations_service = AuthorizationsService(influxdb_client.api_client) - def create_authorization(self, org_id=None, permissions: list = None, + def create_authorization(self, org_id: str = None, permissions: list = None, authorization: Authorization = None) -> Authorization: """ Create an authorization. @@ -23,6 +23,8 @@ def create_authorization(self, org_id=None, permissions: list = None, """ if authorization is not None: + if not isinstance(authorization, Authorization): + raise TypeError(f"Attempt to use non-Authorization value for authorization: {authorization}") return self._authorizations_service.post_authorizations(authorization_post_request=authorization) # if org_id is not None and permissions is not None: diff --git a/influxdb_client/client/bucket_api.py b/influxdb_client/client/bucket_api.py index 47763bee..684da767 100644 --- a/influxdb_client/client/bucket_api.py +++ b/influxdb_client/client/bucket_api.py @@ -8,6 +8,7 @@ from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest from influxdb_client.client.util.helpers import get_org_query_param +from influxdb_client.client._pages import _Paginated class BucketsApi(object): @@ -117,3 +118,15 @@ def find_buckets(self, **kwargs): :return: Buckets """ return self._buckets_service.get_buckets(**kwargs) + + def find_buckets_iter(self, **kwargs): + """Iterate over all buckets with pagination. + + :key str name: Only returns buckets with the specified name + :key str org: The organization name. + :key str org_id: The organization ID. + :key str after: The last resource ID from which to seek from (but not including). + :key int limit: the maximum number of buckets in one page + :return: Buckets iterator + """ + return _Paginated(self._buckets_service.get_buckets, lambda response: response.buckets).find_iter(**kwargs) diff --git a/influxdb_client/client/exceptions.py b/influxdb_client/client/exceptions.py index 48681add..bfa453e2 100644 --- a/influxdb_client/client/exceptions.py +++ b/influxdb_client/client/exceptions.py @@ -15,7 +15,12 @@ def __init__(self, response: HTTPResponse = None, message: str = None): if response is not None: self.response = response self.message = self._get_message(response) - self.retry_after = response.getheader('Retry-After') + if isinstance(response, HTTPResponse): # response is HTTPResponse + self.headers = response.headers + self.retry_after = response.headers.get('Retry-After') + else: # response is RESTResponse + self.headers = response.getheaders() + self.retry_after = response.getheader('Retry-After') else: self.response = None self.message = message or 'no response' diff --git a/influxdb_client/client/flux_csv_parser.py b/influxdb_client/client/flux_csv_parser.py index 7a73e3f8..99e68094 100644 --- a/influxdb_client/client/flux_csv_parser.py +++ b/influxdb_client/client/flux_csv_parser.py @@ -1,6 +1,5 @@ """Parsing response from InfluxDB to FluxStructures or DataFrame.""" - import base64 import codecs import csv as csv_parser @@ -147,6 +146,11 @@ async def _parse_flux_response_async(self): df = self._prepare_data_frame() if not self._is_profiler_table(metadata.table): yield df + except BaseException as e: + e_type = type(e).__name__ + if "CancelledError" in e_type or "TimeoutError" in e_type: + e.add_note("Stream cancelled during read. Recommended: Check Influxdb client `timeout` setting.") + raise finally: self._close() diff --git a/influxdb_client/client/flux_table.py b/influxdb_client/client/flux_table.py index 98a83159..5fd9a061 100644 --- a/influxdb_client/client/flux_table.py +++ b/influxdb_client/client/flux_table.py @@ -46,8 +46,8 @@ class FluxTable(FluxStructure): def __init__(self) -> None: """Initialize defaults.""" - self.columns = [] - self.records = [] + self.columns: List[FluxColumn] = [] + self.records: List[FluxRecord] = [] def get_group_key(self): """ diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index 6079aac0..cbae75a9 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -265,7 +265,7 @@ def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): :param write_options: Write API configuration :param point_settings: settings to store default tags - :key success_callback: The callable ``callback`` to run after successfully writen a batch. + :key success_callback: The callable ``callback`` to run after having successfully written a batch. The callable must accept two arguments: - `Tuple`: ``(bucket, organization, precision)`` @@ -273,7 +273,7 @@ def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): **[batching mode]** - :key error_callback: The callable ``callback`` to run after unsuccessfully writen a batch. + :key error_callback: The callable ``callback`` to run after having unsuccessfully written a batch. The callable must accept three arguments: - `Tuple`: ``(bucket, organization, precision)`` diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index 9edb2ec9..5ca18fbd 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -9,38 +9,7 @@ from influxdb_client import TasksService, Task, TaskCreateRequest, TaskUpdateRequest, LabelResponse, LabelMapping, \ AddResourceMemberRequestBody, RunManually, Run, LogEvent - - -class _Page: - def __init__(self, values, has_next, next_after): - self.has_next = has_next - self.values = values - self.next_after = next_after - - @staticmethod - def empty(): - return _Page([], False, None) - - @staticmethod - def initial(after): - return _Page([], True, after) - - -class _PageIterator: - def __init__(self, page: _Page, get_next_page): - self.page = page - self.get_next_page = get_next_page - - def __iter__(self): - return self - - def __next__(self): - if not self.page.values: - if self.page.has_next: - self.page = self.get_next_page(self.page) - if not self.page.values: - raise StopIteration - return self.page.values.pop(0) +from influxdb_client.client._pages import _Paginated class TasksApi(object): @@ -80,11 +49,7 @@ def find_tasks_iter(self, **kwargs): :key int limit: the number of tasks in one page :return: Tasks iterator """ - - def get_next_page(page: _Page): - return self._find_tasks_next_page(page, **kwargs) - - return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page)) + return _Paginated(self._service.get_tasks, lambda response: response.tasks).find_iter(**kwargs) def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task: """Create a new task.""" @@ -259,16 +224,3 @@ def get_logs(self, task_id: str) -> List['LogEvent']: def find_tasks_by_user(self, task_user_id): """List all tasks by user.""" return self.find_tasks(user=task_user_id) - - def _find_tasks_next_page(self, page: _Page, **kwargs): - if not page.has_next: - return _Page.empty() - - args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs - tasks_response = self._service.get_tasks(**args) - - tasks = tasks_response.tasks - has_next = tasks_response.links.next is not None - last_id = tasks[-1].id if tasks else None - - return _Page(tasks, has_next, last_id) diff --git a/influxdb_client/client/util/date_utils.py b/influxdb_client/client/util/date_utils.py index 11baecb5..7b6750c8 100644 --- a/influxdb_client/client/util/date_utils.py +++ b/influxdb_client/client/util/date_utils.py @@ -1,5 +1,6 @@ """Utils to get right Date parsing function.""" import datetime +from sys import version_info import threading from datetime import timezone as tz @@ -78,7 +79,8 @@ def get_date_helper() -> DateHelper: """ Return DateHelper with proper implementation. - If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else use 'dateutil.parse'. + If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else + use 'datetime.fromisoformat' (Python >= 3.11) or 'dateutil.parse' (Python < 3.11). """ global date_helper if date_helper is None: @@ -90,7 +92,10 @@ def get_date_helper() -> DateHelper: import ciso8601 _date_helper.parse_date = ciso8601.parse_datetime except ModuleNotFoundError: - _date_helper.parse_date = parser.parse + if (version_info.major, version_info.minor) >= (3, 11): + _date_helper.parse_date = datetime.datetime.fromisoformat + else: + _date_helper.parse_date = parser.parse date_helper = _date_helper return date_helper diff --git a/influxdb_client/client/write/dataframe_serializer.py b/influxdb_client/client/write/dataframe_serializer.py index 6121171f..ccc198ac 100644 --- a/influxdb_client/client/write/dataframe_serializer.py +++ b/influxdb_client/client/write/dataframe_serializer.py @@ -19,14 +19,6 @@ def _itertuples(data_frame): return zip(data_frame.index, *cols) -def _not_nan(x): - return x == x - - -def _any_not_nan(p, indexes): - return any(map(lambda x: _not_nan(p[x]), indexes)) - - class DataframeSerializer: """Serialize DataFrame into LineProtocols.""" @@ -77,7 +69,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # When NaNs are present, the expression looks like this (split # across two lines to satisfy the code-style checker) # - # lambda p: f"""{measurement_name} {"" if math.isnan(p[1]) + # lambda p: f"""{measurement_name} {"" if pd.isna(p[1]) # else f"{keys[0]}={p[1]}"},{keys[1]}={p[2]}i {p[0].value}""" # # When there's a NaN value in column a, we'll end up with a comma at the start of the @@ -175,7 +167,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # This column is a tag column. if null_columns.iloc[index]: key_value = f"""{{ - '' if {val_format} == '' or type({val_format}) == float and math.isnan({val_format}) else + '' if {val_format} == '' or pd.isna({val_format}) else f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}' }}""" else: @@ -192,19 +184,16 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # field column has no nulls, we don't run the comma-removal # regexp substitution step. sep = '' if len(field_indexes) == 0 else ',' - if issubclass(value.type, np.integer): - field_value = f"{sep}{key_format}={{{val_format}}}i" - elif issubclass(value.type, np.bool_): - field_value = f'{sep}{key_format}={{{val_format}}}' - elif issubclass(value.type, np.floating): + if issubclass(value.type, np.integer) or issubclass(value.type, np.floating) or issubclass(value.type, np.bool_): # noqa: E501 + suffix = 'i' if issubclass(value.type, np.integer) else '' if null_columns.iloc[index]: - field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}""" + field_value = f"""{{"" if pd.isna({val_format}) else f"{sep}{key_format}={{{val_format}}}{suffix}"}}""" # noqa: E501 else: - field_value = f'{sep}{key_format}={{{val_format}}}' + field_value = f"{sep}{key_format}={{{val_format}}}{suffix}" else: if null_columns.iloc[index]: field_value = f"""{{ - '' if type({val_format}) == float and math.isnan({val_format}) else + '' if pd.isna({val_format}) else f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"' }}""" else: @@ -229,17 +218,21 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION '_ESCAPE_KEY': _ESCAPE_KEY, '_ESCAPE_STRING': _ESCAPE_STRING, 'keys': keys, - 'math': math, + 'pd': pd, }) for k, v in dict(data_frame.dtypes).items(): if k in data_frame_tag_columns: data_frame = data_frame.replace({k: ''}, np.nan) + def _any_not_nan(p, indexes): + return any(map(lambda x: not pd.isna(p[x]), indexes)) + self.data_frame = data_frame self.f = f self.field_indexes = field_indexes self.first_field_maybe_null = null_columns.iloc[field_indexes[0] - 1] + self._any_not_nan = _any_not_nan # # prepare chunks @@ -266,7 +259,7 @@ def serialize(self, chunk_idx: int = None): # When the first field is null (None/NaN), we'll have # a spurious leading comma which needs to be removed. lp = (re.sub('^(( |[^ ])* ),([a-zA-Z0-9])(.*)', '\\1\\3\\4', self.f(p)) - for p in filter(lambda x: _any_not_nan(x, self.field_indexes), _itertuples(chunk))) + for p in filter(lambda x: self._any_not_nan(x, self.field_indexes), _itertuples(chunk))) return list(lp) else: return list(map(self.f, _itertuples(chunk))) diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index 31d44d5c..cc95d204 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -10,7 +10,7 @@ from influxdb_client.client.util.date_utils import get_date_helper from influxdb_client.domain.write_precision import WritePrecision -EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc) +EPOCH = datetime.fromtimestamp(0, tz=timezone.utc) DEFAULT_WRITE_PRECISION = WritePrecision.NS diff --git a/influxdb_client/client/write_api_async.py b/influxdb_client/client/write_api_async.py index 2f32802f..38937eca 100644 --- a/influxdb_client/client/write_api_async.py +++ b/influxdb_client/client/write_api_async.py @@ -1,5 +1,6 @@ """Collect and async write time series data to InfluxDB Cloud or InfluxDB OSS.""" import logging +from asyncio import ensure_future, gather from collections import defaultdict from typing import Union, Iterable, NamedTuple @@ -114,12 +115,20 @@ async def write(self, bucket: str, org: str = None, self._append_default_tags(record) payloads = defaultdict(list) - self._serialize(record, write_precision, payloads, precision_from_point=False, **kwargs) - - # joint list by \n - body = b'\n'.join(payloads[write_precision]) - response = await self._write_service.post_write_async(org=org, bucket=bucket, body=body, - precision=write_precision, async_req=False, - _return_http_data_only=False, - content_type="text/plain; charset=utf-8") - return response[1] == 204 + self._serialize(record, write_precision, payloads, precision_from_point=True, **kwargs) + + futures = [] + for payload_precision, payload_line in payloads.items(): + futures.append(ensure_future + (self._write_service.post_write_async(org=org, bucket=bucket, + body=b'\n'.join(payload_line), + precision=payload_precision, async_req=False, + _return_http_data_only=False, + content_type="text/plain; charset=utf-8"))) + + results = await gather(*futures, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + raise result + + return False not in [re[1] in (201, 204) for re in results] diff --git a/influxdb_client/domain/authorization.py b/influxdb_client/domain/authorization.py index 67a0bfd3..aef38d9c 100644 --- a/influxdb_client/domain/authorization.py +++ b/influxdb_client/domain/authorization.py @@ -82,8 +82,12 @@ def __init__(self, created_at=None, updated_at=None, org_id=None, permissions=No if updated_at is not None: self.updated_at = updated_at if org_id is not None: + if not isinstance(org_id, str): + raise TypeError("org_id must be a string.") self.org_id = org_id if permissions is not None: + if not isinstance(permissions, list): + raise TypeError("permissions must be a list.") self.permissions = permissions if id is not None: self.id = id diff --git a/influxdb_client/py.typed b/influxdb_client/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/influxdb_client/rest.py b/influxdb_client/rest.py index 8f50e51a..cd4dbff4 100644 --- a/influxdb_client/rest.py +++ b/influxdb_client/rest.py @@ -13,7 +13,7 @@ import logging from typing import Dict - +from urllib3 import HTTPResponse from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.configuration import Configuration @@ -34,7 +34,10 @@ def __init__(self, status=None, reason=None, http_resp=None): self.status = http_resp.status self.reason = http_resp.reason self.body = http_resp.data - self.headers = http_resp.getheaders() + if isinstance(http_resp, HTTPResponse): # response is HTTPResponse + self.headers = http_resp.headers + else: # response is RESTResponse + self.headers = http_resp.getheaders() else: self.status = status self.reason = reason diff --git a/influxdb_client/version.py b/influxdb_client/version.py index d9a0312b..a4ac1780 100644 --- a/influxdb_client/version.py +++ b/influxdb_client/version.py @@ -1,3 +1,3 @@ """Version of the Client that is used in User-Agent header.""" -VERSION = '1.41.0' +VERSION = '1.49.0dev0' diff --git a/setup.py b/setup.py index ac1154d3..cda0d087 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ 'aioresponses>=0.7.3', 'sphinx==1.8.5', 'sphinx_rtd_theme', - 'jinja2==3.1.3' + 'jinja2>=3.1.4' ] extra_requires = [ @@ -44,7 +44,6 @@ 'aiocsv>=1.2.2' ] -from pathlib import Path this_directory = Path(__file__).parent long_description = (this_directory / "README.md").read_text() @@ -66,6 +65,7 @@ extras_require={'extra': extra_requires, 'ciso': ciso_requires, 'async': async_requires, 'test': test_requires}, long_description_content_type="text/markdown", packages=find_packages(exclude=('tests*',)), + package_data={'influxdb_client': ['py.typed']}, test_suite='tests', python_requires='>=3.7', include_package_data=True, diff --git a/tests/test_AuthorizationApi.py b/tests/test_AuthorizationApi.py index 8b1850d9..036f0d60 100644 --- a/tests/test_AuthorizationApi.py +++ b/tests/test_AuthorizationApi.py @@ -45,6 +45,25 @@ def test_createAuthorization(self): self.assertEqual(authorization.links["user"], "/api/v2/users/" + self.user.id) + def test_AuthorizationTypeAssert(self): + self.assertRaisesRegex(TypeError, "org_id must be a string.", Authorization, org_id={}) + self.assertRaisesRegex(TypeError, "permissions must be a list.", Authorization, permissions={}) + + def test_createAuthorizationWrongTypes(self): + user_resource = PermissionResource(org_id=self.organization.id, type="users") + read_users = Permission(action="read", resource=user_resource) + + org_resource = PermissionResource(org_id=self.organization.id, type="orgs") + write_organizations = Permission(action="write", resource=org_resource) + + permissions = [read_users, write_organizations] + self.assertRaisesRegex(TypeError, "org_id must be a string.", + self.authorizations_api.create_authorization, permissions) + self.assertRaisesRegex(TypeError, "permissions must be a list", + self.authorizations_api.create_authorization, "123456789ABCDEF0", "Foo") + self.assertRaisesRegex(TypeError, "Attempt to use non-Authorization value for authorization: Foo", + self.authorizations_api.create_authorization, "123456789ABCDEF0", permissions, "Foo") + def test_authorizationDescription(self): organization = self.my_organization diff --git a/tests/test_BucketsApi.py b/tests/test_BucketsApi.py index db7e28d1..58bbd280 100644 --- a/tests/test_BucketsApi.py +++ b/tests/test_BucketsApi.py @@ -83,26 +83,65 @@ def test_create_bucket_retention_list(self): self.delete_test_bucket(my_bucket) - def test_pagination(self): + def test_find_buckets(self): my_org = self.find_my_org() - buckets = self.buckets_api.find_buckets().buckets + buckets = self.buckets_api.find_buckets(limit=100).buckets size = len(buckets) # create 2 buckets self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org) self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org) - buckets = self.buckets_api.find_buckets().buckets + buckets = self.buckets_api.find_buckets(limit=size + 2).buckets self.assertEqual(size + 2, len(buckets)) # offset 1 - buckets = self.buckets_api.find_buckets(offset=1).buckets + buckets = self.buckets_api.find_buckets(offset=1, limit=size + 2).buckets self.assertEqual(size + 1, len(buckets)) # count 1 buckets = self.buckets_api.find_buckets(limit=1).buckets self.assertEqual(1, len(buckets)) + def test_find_buckets_iter(self): + def count_unique_ids(items): + return len(set(map(lambda item: item.id, items))) + + my_org = self.find_my_org() + more_buckets = 10 + num_of_buckets = count_unique_ids(self.buckets_api.find_buckets_iter()) + more_buckets + + a_bucket_name = None + for _ in range(more_buckets): + bucket_name = self.generate_name("it find_buckets_iter") + self.buckets_api.create_bucket(bucket_name=bucket_name, org=my_org) + a_bucket_name = bucket_name + + # get no buckets + buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name + "blah") + self.assertEqual(count_unique_ids(buckets), 0) + + # get bucket by name + buckets = self.buckets_api.find_buckets_iter(name=a_bucket_name) + self.assertEqual(count_unique_ids(buckets), 1) + + # get buckets in 3-4 batches + buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets // 3) + self.assertEqual(count_unique_ids(buckets), num_of_buckets) + + # get buckets in one batch + buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets) + self.assertEqual(count_unique_ids(buckets), num_of_buckets) + + # get buckets in one batch, requesting too much + buckets = self.buckets_api.find_buckets_iter(limit=num_of_buckets + 1) + self.assertEqual(count_unique_ids(buckets), num_of_buckets) + + # skip some buckets + *_, skip_bucket = self.buckets_api.find_buckets(limit=num_of_buckets // 3).buckets + buckets = self.buckets_api.find_buckets_iter(after=skip_bucket.id) + self.assertEqual(count_unique_ids(buckets), num_of_buckets - num_of_buckets // 3) + def test_update_bucket(self): my_org = self.find_my_org() diff --git a/tests/test_InfluxDBClient.py b/tests/test_InfluxDBClient.py index 7fdf834f..228f391b 100644 --- a/tests/test_InfluxDBClient.py +++ b/tests/test_InfluxDBClient.py @@ -323,6 +323,35 @@ def test_version(self): version = self.client.version() self.assertTrue(len(version) > 0) + def test_url_attribute(self): + # Wrong URL attribute + wrong_types = [ + None, + True, False, + 123, 123.5, + dict({"url" : "http://localhost:8086"}), + list(["http://localhost:8086"]), + tuple(("http://localhost:8086")) + ] + correct_types = [ + "http://localhost:8086" + ] + for url_type in wrong_types: + try: + client_not_running = InfluxDBClient(url=url_type, token="my-token", debug=True) + status = True + except ValueError as e: + status = False + self.assertFalse(status) + for url_type in correct_types: + try: + client_not_running = InfluxDBClient(url=url_type, token="my-token", debug=True) + status = True + except ValueError as e: + status = False + self.assertTrue(status) + + def test_build(self): build = self.client.build() self.assertEqual('oss', build.lower()) diff --git a/tests/test_InfluxDBClientAsync.py b/tests/test_InfluxDBClientAsync.py index 123967a7..cb0586b9 100644 --- a/tests/test_InfluxDBClientAsync.py +++ b/tests/test_InfluxDBClientAsync.py @@ -1,10 +1,15 @@ import asyncio +import dateutil.parser import logging +import math +import re +import time import unittest import os -from datetime import datetime +from datetime import datetime, timezone from io import StringIO +import pandas import pytest import warnings from aioresponses import aioresponses @@ -198,37 +203,159 @@ async def test_write_empty_data(self): self.assertEqual(True, response) + def gen_fractional_utc(self, nano, precision) -> str: + raw_sec = nano / 1_000_000_000 + if precision == WritePrecision.NS: + rem = f"{nano % 1_000_000_000}".rjust(9,"0").rstrip("0") + return (datetime.fromtimestamp(math.floor(raw_sec), tz=timezone.utc) + .isoformat() + .replace("+00:00", "") + f".{rem}Z") + #f".{rem}Z")) + elif precision == WritePrecision.US: + # rem = f"{round(nano / 1_000) % 1_000_000}"#.ljust(6,"0") + return (datetime.fromtimestamp(round(raw_sec,6), tz=timezone.utc) + .isoformat() + .replace("+00:00","") + .strip("0") + "Z" + ) + elif precision == WritePrecision.MS: + #rem = f"{round(nano / 1_000_000) % 1_000}".rjust(3, "0") + return (datetime.fromtimestamp(round(raw_sec,3), tz=timezone.utc) + .isoformat() + .replace("+00:00","") + .strip("0") + "Z" + ) + elif precision == WritePrecision.S: + return (datetime.fromtimestamp(round(raw_sec), tz=timezone.utc) + .isoformat() + .replace("+00:00","Z")) + else: + raise ValueError(f"Unknown precision: {precision}") + + @async_test async def test_write_points_different_precision(self): + now_ns = time.time_ns() + now_us = now_ns / 1_000 + now_ms = now_us / 1_000 + now_s = now_ms / 1_000 + + now_date_s = self.gen_fractional_utc(now_ns, WritePrecision.S) + now_date_ms = self.gen_fractional_utc(now_ns, WritePrecision.MS) + now_date_us = self.gen_fractional_utc(now_ns, WritePrecision.US) + now_date_ns = self.gen_fractional_utc(now_ns, WritePrecision.NS) + + points = { + WritePrecision.S: [], + WritePrecision.MS: [], + WritePrecision.US: [], + WritePrecision.NS: [] + } + + expected = {} + measurement = generate_name("measurement") - _point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) \ - .time(datetime.utcfromtimestamp(0), write_precision=WritePrecision.S) - _point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3) \ - .time(datetime.utcfromtimestamp(1), write_precision=WritePrecision.MS) - _point3 = Point(measurement).tag("location", "Berlin").field("temperature", 24.3) \ - .time(datetime.utcfromtimestamp(2), write_precision=WritePrecision.NS) - await self.client.write_api().write(bucket="my-bucket", record=[_point1, _point2, _point3], + # basic date-time value + points[WritePrecision.S].append(Point(measurement).tag("method", "SecDateTime").field("temperature", 25.3) \ + .time(datetime.fromtimestamp(round(now_s), tz=timezone.utc), write_precision=WritePrecision.S)) + expected['SecDateTime'] = now_date_s + points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDateTime").field("temperature", 24.3) \ + .time(datetime.fromtimestamp(round(now_s,3), tz=timezone.utc), write_precision=WritePrecision.MS)) + expected['MilDateTime'] = now_date_ms + points[WritePrecision.US].append(Point(measurement).tag("method", "MicDateTime").field("temperature", 24.3) \ + .time(datetime.fromtimestamp(round(now_s,6), tz=timezone.utc), write_precision=WritePrecision.US)) + expected['MicDateTime'] = now_date_us + # N.B. datetime does not handle nanoseconds +# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDateTime").field("temperature", 24.3) \ +# .time(datetime.fromtimestamp(now_s, tz=timezone.utc), write_precision=WritePrecision.NS)) + + # long timestamps based on POSIX time + points[WritePrecision.S].append(Point(measurement).tag("method", "SecPosix").field("temperature", 24.3) \ + .time(round(now_s), write_precision=WritePrecision.S)) + expected['SecPosix'] = now_date_s + points[WritePrecision.MS].append(Point(measurement).tag("method", "MilPosix").field("temperature", 24.3) \ + .time(round(now_ms), write_precision=WritePrecision.MS)) + expected['MilPosix'] = now_date_ms + points[WritePrecision.US].append(Point(measurement).tag("method", "MicPosix").field("temperature", 24.3) \ + .time(round(now_us), write_precision=WritePrecision.US)) + expected['MicPosix'] = now_date_us + points[WritePrecision.NS].append(Point(measurement).tag("method", "NanPosix").field("temperature", 24.3) \ + .time(now_ns, write_precision=WritePrecision.NS)) + expected['NanPosix'] = now_date_ns + + # ISO Zulu datetime with ms, us and ns e.g. "2024-09-27T13:17:16.412399728Z" + points[WritePrecision.S].append(Point(measurement).tag("method", "SecDTZulu").field("temperature", 24.3) \ + .time(now_date_s, write_precision=WritePrecision.S)) + expected['SecDTZulu'] = now_date_s + points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDTZulu").field("temperature", 24.3) \ + .time(now_date_ms, write_precision=WritePrecision.MS)) + expected['MilDTZulu'] = now_date_ms + points[WritePrecision.US].append(Point(measurement).tag("method", "MicDTZulu").field("temperature", 24.3) \ + .time(now_date_us, write_precision=WritePrecision.US)) + expected['MicDTZulu'] = now_date_us + # This keeps resulting in micro second resolution in response +# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDTZulu").field("temperature", 24.3) \ +# .time(now_date_ns, write_precision=WritePrecision.NS)) + + recs = [x for x in [v for v in points.values()]] + + await self.client.write_api().write(bucket="my-bucket", record=recs, write_precision=WritePrecision.NS) query = f''' from(bucket:"my-bucket") |> range(start: 0) |> filter(fn: (r) => r["_measurement"] == "{measurement}") - |> keep(columns: ["_time"]) + |> keep(columns: ["method","_time"]) ''' query_api = self.client.query_api() + # ensure calls fully processed on server + await asyncio.sleep(1) + raw = await query_api.query_raw(query) - self.assertEqual(8, len(raw.splitlines())) - self.assertEqual(',,0,1970-01-01T00:00:02Z', raw.splitlines()[4]) - self.assertEqual(',,0,1970-01-01T00:00:01Z', raw.splitlines()[5]) - self.assertEqual(',,0,1970-01-01T00:00:00Z', raw.splitlines()[6]) + linesRaw = raw.splitlines()[4:] + + lines = [] + for lnr in linesRaw: + lines.append(lnr[2:].split(",")) + + def get_time_for_method(lines, method): + for l in lines: + if l[2] == method: + return l[1] + return "" + + self.assertEqual(15, len(raw.splitlines())) + + for key in expected: + t = get_time_for_method(lines,key) + comp_time = dateutil.parser.isoparse(get_time_for_method(lines,key)) + target_time = dateutil.parser.isoparse(expected[key]) + self.assertEqual(target_time.date(), comp_time.date()) + self.assertEqual(target_time.hour, comp_time.hour) + self.assertEqual(target_time.second,comp_time.second) + dif = abs(target_time.microsecond - comp_time.microsecond) + if key[:3] == "Sec": + # Already tested + pass + elif key[:3] == "Mil": + # may be slight rounding differences + self.assertLess(dif, 1500, f"failed to match timestamp for {key} {target_time} != {comp_time}") + elif key[:3] == "Mic": + # may be slight rounding differences + self.assertLess(dif, 150, f"failed to match timestamp for {key} {target_time} != {comp_time}") + elif key[:3] == "Nan": + self.assertEqual(expected[key], get_time_for_method(lines, key)) + else: + raise Exception(f"Unhandled key {key}") @async_test async def test_delete_api(self): measurement = generate_name("measurement") await self._prepare_data(measurement) - successfully = await self.client.delete_api().delete(start=datetime.utcfromtimestamp(0), stop=datetime.utcnow(), + successfully = await self.client.delete_api().delete(start=datetime.fromtimestamp(0), + stop=datetime.now(tz=timezone.utc), predicate="location = \"Prague\"", bucket="my-bucket") self.assertEqual(True, successfully) query = f''' @@ -389,6 +516,24 @@ async def test_query_exception_propagation(self): await self.client.query_api().query("buckets()", "my-org") self.assertEqual("unauthorized access", e.value.message) + @async_test + async def test_write_exception_propagation(self): + await self.client.close() + self.client = InfluxDBClientAsync(url="http://localhost:8086", token="wrong", org="my-org") + + with pytest.raises(InfluxDBError) as e: + await self.client.write_api().write(bucket="my_bucket", + record="temperature,location=hic cels=") + self.assertEqual("unauthorized access", e.value.message) + headers = e.value.headers + self.assertIsNotNone(headers) + self.assertIsNotNone(headers.get("Content-Length")) + self.assertIsNotNone(headers.get("Date")) + self.assertIsNotNone(headers.get("X-Platform-Error-Code")) + self.assertIn("application/json", headers.get("Content-Type")) + self.assertTrue(re.compile("^v.*").match(headers.get("X-Influxdb-Version"))) + self.assertEqual("OSS", headers.get("X-Influxdb-Build")) + @async_test @aioresponses() async def test_parse_utf8_two_bytes_character(self, mocked): diff --git a/tests/test_MultiprocessingWriter.py b/tests/test_MultiprocessingWriter.py index 940ae6ec..e7996b5f 100644 --- a/tests/test_MultiprocessingWriter.py +++ b/tests/test_MultiprocessingWriter.py @@ -1,6 +1,6 @@ import os import unittest -from datetime import datetime +from datetime import datetime, timezone from influxdb_client import WritePrecision, InfluxDBClient from influxdb_client.client.util.date_utils import get_date_helper @@ -53,7 +53,7 @@ def test_use_context_manager(self): self.assertIsNotNone(writer) def test_pass_parameters(self): - unique = get_date_helper().to_nanoseconds(datetime.utcnow() - datetime.utcfromtimestamp(0)) + unique = get_date_helper().to_nanoseconds(datetime.now(tz=timezone.utc) - datetime.fromtimestamp(0, tz=timezone.utc)) # write data with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer: @@ -69,4 +69,4 @@ def test_pass_parameters(self): self.assertIsNotNone(record) self.assertEqual("a", record["tag"]) self.assertEqual(5, record["_value"]) - self.assertEqual(get_date_helper().to_utc(datetime.utcfromtimestamp(10)), record["_time"]) + self.assertEqual(get_date_helper().to_utc(datetime.fromtimestamp(10, tz=timezone.utc)), record["_time"]) diff --git a/tests/test_PandasDateTimeHelper.py b/tests/test_PandasDateTimeHelper.py index 60017172..2c7e4ce5 100644 --- a/tests/test_PandasDateTimeHelper.py +++ b/tests/test_PandasDateTimeHelper.py @@ -23,7 +23,7 @@ def test_parse_date(self): def test_to_nanoseconds(self): date = self.helper.parse_date('2020-08-07T06:21:57.331249158Z').replace(tzinfo=timezone.utc) - nanoseconds = self.helper.to_nanoseconds(date - datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc)) + nanoseconds = self.helper.to_nanoseconds(date - datetime.fromtimestamp(0, tz=timezone.utc)) self.assertEqual(nanoseconds, 1596781317331249158) diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 474bf394..b2cc7ca7 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -3,12 +3,16 @@ from __future__ import absolute_import import datetime +import json +import logging import os +import re import sys import unittest from collections import namedtuple from datetime import timedelta from multiprocessing.pool import ApplyResult +from types import SimpleNamespace import httpretty import pytest @@ -190,6 +194,17 @@ def test_write_error(self): self.assertEqual(400, exception.status) self.assertEqual("Bad Request", exception.reason) + # assert headers + self.assertIsNotNone(exception.headers) + self.assertIsNotNone(exception.headers.get("Content-Length")) + self.assertIsNotNone(exception.headers.get("Date")) + self.assertIsNotNone(exception.headers.get("X-Platform-Error-Code")) + self.assertIn("application/json", exception.headers.get("Content-Type")) + self.assertTrue(re.compile("^v.*").match(exception.headers.get("X-Influxdb-Version"))) + self.assertEqual("OSS", exception.headers.get("X-Influxdb-Build")) + # assert body + b = json.loads(exception.body, object_hook=lambda d: SimpleNamespace(**d)) + self.assertTrue(re.compile("^unable to parse.*invalid field format").match(b.message)) def test_write_dictionary(self): _bucket = self.create_test_bucket() @@ -609,6 +624,28 @@ def test_write_result(self): self.assertEqual(None, result.get()) self.delete_test_bucket(_bucket) + def test_write_error(self): + _bucket = self.create_test_bucket() + + _record = "h2o_feet,location=coyote_creek level\\ water_level=" + result = self.write_client.write(_bucket.name, self.org, _record) + + with self.assertRaises(ApiException) as cm: + result.get() + self.assertEqual(400, cm.exception.status) + self.assertEqual("Bad Request", cm.exception.reason) + # assert headers + self.assertIsNotNone(cm.exception.headers) + self.assertIsNotNone(cm.exception.headers.get("Content-Length")) + self.assertIsNotNone(cm.exception.headers.get("Date")) + self.assertIsNotNone(cm.exception.headers.get("X-Platform-Error-Code")) + self.assertIn("application/json", cm.exception.headers.get("Content-Type")) + self.assertTrue(re.compile("^v.*").match(cm.exception.headers.get("X-Influxdb-Version"))) + self.assertEqual("OSS", cm.exception.headers.get("X-Influxdb-Build")) + # assert body + b = json.loads(cm.exception.body, object_hook=lambda d: SimpleNamespace(**d)) + self.assertTrue(re.compile("^unable to parse.*missing field value").match(b.message)) + def test_write_dictionaries(self): bucket = self.create_test_bucket() diff --git a/tests/test_WriteApiDataFrame.py b/tests/test_WriteApiDataFrame.py index 6ea7a98b..1e1f0ad3 100644 --- a/tests/test_WriteApiDataFrame.py +++ b/tests/test_WriteApiDataFrame.py @@ -159,6 +159,32 @@ def test_write_object_field_nan(self): self.assertEqual("measurement val=2i 1586046600000000000", points[1]) + def test_write_missing_values(self): + from influxdb_client.extras import pd + + data_frame = pd.DataFrame({ + "a_bool": [True, None, False], + "b_int": [None, 1, 2], + "c_float": [1.0, 2.0, None], + "d_str": ["a", "b", None], + }) + + data_frame['a_bool'] = data_frame['a_bool'].astype(pd.BooleanDtype()) + data_frame['b_int'] = data_frame['b_int'].astype(pd.Int64Dtype()) + data_frame['c_float'] = data_frame['c_float'].astype(pd.Float64Dtype()) + data_frame['d_str'] = data_frame['d_str'].astype(pd.StringDtype()) + + print(data_frame) + points = data_frame_to_list_of_points( + data_frame=data_frame, + point_settings=PointSettings(), + data_frame_measurement_name='measurement') + + self.assertEqual(3, len(points)) + self.assertEqual("measurement a_bool=True,c_float=1.0,d_str=\"a\" 0", points[0]) + self.assertEqual("measurement b_int=1i,c_float=2.0,d_str=\"b\" 1", points[1]) + self.assertEqual("measurement a_bool=False,b_int=2i 2", points[2]) + def test_write_field_bool(self): from influxdb_client.extras import pd