From 4bb4363c2c24e2ae7bf431b25e4e9f15d083c066 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 13 Aug 2024 17:12:01 +0200 Subject: [PATCH 1/6] feat: (WIP) adds example error_handling.py and tests of headers in error types --- examples/error_handling.py | 86 ++++++++++++++++++++++++++++ influxdb_client/client/exceptions.py | 2 + tests/test_InfluxDBClientAsync.py | 20 +++++++ tests/test_WriteApi.py | 37 ++++++++++++ 4 files changed, 145 insertions(+) create mode 100644 examples/error_handling.py diff --git a/examples/error_handling.py b/examples/error_handling.py new file mode 100644 index 00000000..6f632f19 --- /dev/null +++ b/examples/error_handling.py @@ -0,0 +1,86 @@ +import asyncio +import time +from datetime import datetime, timezone + +from influxdb_client.client.exceptions import InfluxDBError + +from influxdb_client.rest import ApiException + +from influxdb_client.client.write_api import SYNCHRONOUS + +from influxdb_client import WritePrecision, InfluxDBClient, Point +from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync + + +class BatchCB(object): + + @staticmethod + def success(self, conf: (str, str, str), data: str): + print(f"Write success: {conf}, data: {data}") + + @staticmethod + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Write failed: {conf}, data: {data}, error: {exception.message}") + print(f" Date: {exception.headers.get("Date")}") + print(f" X-Influxdb-Build: {exception.headers.get("X-Influxdb-Build")}") + print(f" X-Influxdb-Version: {exception.headers.get("X-Influxdb-Version")}") + print(f" X-Platform-Error-Code: {exception.headers.get("X-Platform-Error-Code")}") + print(f" Retry-After: {exception.headers.get("Retry-After")}") + + @staticmethod + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Write failed but retryable: {conf}, data: {data}, error: {exception}") + + +def report_ping(ping: bool): + if not ping: + raise ValueError("InfluxDB: Failed to ping server") + else: + print("InfluxDB: ready") + + +def use_sync(): + print("Using sync") + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + report_ping(client.ping()) + try: + client.write_api(write_options=SYNCHRONOUS).write(bucket="my-bucket", record="cpu,location=G4 usage=") + except ApiException as ae: + print("\nCaught ae: ", ae.message) + print(" Date: ", ae.headers.get("Date")) + print(" X-Influxdb-Build: ", ae.headers.get("X-Influxdb-Build")) + print(" X-Influxdb-Version: ", ae.headers.get("X-Influxdb-Version")) + print(" X-Platform-Error-Code: ", ae.headers.get("X-Platform-Error-Code")) + print(" Retry-After: ", ae.headers.get("Retry-After")) # Should be None + + print("Sync write done") + + +def use_batch(): + print("Using batch") + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + cb = BatchCB() + with client.write_api(success_callback=cb.success, + error_callback=cb.error, + retry_callback=cb.retry) as write_api: + write_api.write(bucket="my-bucket", record="cpu,location=G9 usage=") + print("Batch write sent") + + +async def use_async(): + print("Using async") + async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: + report_ping(await client.ping()) + try: + await client.write_api().write(bucket="my-bucket", record="cpu,location=G7 usage=") + except InfluxDBError as ie: + print("\nCaught ie: ", ie) + print("Async write done") + + +if __name__ == "__main__": + use_sync() + print("\n Continuing...") + use_batch() + print("\n Continuing...") + asyncio.run(use_async()) diff --git a/influxdb_client/client/exceptions.py b/influxdb_client/client/exceptions.py index 2ca235c8..bfa453e2 100644 --- a/influxdb_client/client/exceptions.py +++ b/influxdb_client/client/exceptions.py @@ -16,8 +16,10 @@ def __init__(self, response: HTTPResponse = None, message: str = None): self.response = response self.message = self._get_message(response) if isinstance(response, HTTPResponse): # response is HTTPResponse + self.headers = response.headers self.retry_after = response.headers.get('Retry-After') else: # response is RESTResponse + self.headers = response.getheaders() self.retry_after = response.getheader('Retry-After') else: self.response = None diff --git a/tests/test_InfluxDBClientAsync.py b/tests/test_InfluxDBClientAsync.py index 20eabd7d..e0289846 100644 --- a/tests/test_InfluxDBClientAsync.py +++ b/tests/test_InfluxDBClientAsync.py @@ -1,5 +1,6 @@ import asyncio import logging +import re import unittest import os from datetime import datetime, timezone @@ -389,6 +390,25 @@ async def test_query_exception_propagation(self): with pytest.raises(InfluxDBError) as e: await self.client.query_api().query("buckets()", "my-org") self.assertEqual("unauthorized access", e.value.message) + print("DEBUG e.headers: ", e.value.headers) + + @async_test + async def test_write_exception_propagation(self): + await self.client.close() + self.client = InfluxDBClientAsync(url="http://localhost:8086", token="wrong", org="my-org") + + with pytest.raises(InfluxDBError) as e: + await self.client.write_api().write(bucket="my_bucket", + record="temperature,location=hic cels=") + self.assertEqual("unauthorized access", e.value.message) + headers = e.value.headers + self.assertIsNotNone(headers) + self.assertIsNotNone(headers.get("Content-Length")) + self.assertIsNotNone(headers.get("Date")) + self.assertIsNotNone(headers.get("X-Platform-Error-Code")) + self.assertIn("application/json", headers.get("Content-Type")) + self.assertTrue(re.compile("^v.*").match(headers.get("X-Influxdb-Version"))) + self.assertEqual("OSS", headers.get("X-Influxdb-Build")) @async_test @aioresponses() diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 474bf394..b2cc7ca7 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -3,12 +3,16 @@ from __future__ import absolute_import import datetime +import json +import logging import os +import re import sys import unittest from collections import namedtuple from datetime import timedelta from multiprocessing.pool import ApplyResult +from types import SimpleNamespace import httpretty import pytest @@ -190,6 +194,17 @@ def test_write_error(self): self.assertEqual(400, exception.status) self.assertEqual("Bad Request", exception.reason) + # assert headers + self.assertIsNotNone(exception.headers) + self.assertIsNotNone(exception.headers.get("Content-Length")) + self.assertIsNotNone(exception.headers.get("Date")) + self.assertIsNotNone(exception.headers.get("X-Platform-Error-Code")) + self.assertIn("application/json", exception.headers.get("Content-Type")) + self.assertTrue(re.compile("^v.*").match(exception.headers.get("X-Influxdb-Version"))) + self.assertEqual("OSS", exception.headers.get("X-Influxdb-Build")) + # assert body + b = json.loads(exception.body, object_hook=lambda d: SimpleNamespace(**d)) + self.assertTrue(re.compile("^unable to parse.*invalid field format").match(b.message)) def test_write_dictionary(self): _bucket = self.create_test_bucket() @@ -609,6 +624,28 @@ def test_write_result(self): self.assertEqual(None, result.get()) self.delete_test_bucket(_bucket) + def test_write_error(self): + _bucket = self.create_test_bucket() + + _record = "h2o_feet,location=coyote_creek level\\ water_level=" + result = self.write_client.write(_bucket.name, self.org, _record) + + with self.assertRaises(ApiException) as cm: + result.get() + self.assertEqual(400, cm.exception.status) + self.assertEqual("Bad Request", cm.exception.reason) + # assert headers + self.assertIsNotNone(cm.exception.headers) + self.assertIsNotNone(cm.exception.headers.get("Content-Length")) + self.assertIsNotNone(cm.exception.headers.get("Date")) + self.assertIsNotNone(cm.exception.headers.get("X-Platform-Error-Code")) + self.assertIn("application/json", cm.exception.headers.get("Content-Type")) + self.assertTrue(re.compile("^v.*").match(cm.exception.headers.get("X-Influxdb-Version"))) + self.assertEqual("OSS", cm.exception.headers.get("X-Influxdb-Build")) + # assert body + b = json.loads(cm.exception.body, object_hook=lambda d: SimpleNamespace(**d)) + self.assertTrue(re.compile("^unable to parse.*missing field value").match(b.message)) + def test_write_dictionaries(self): bucket = self.create_test_bucket() From 5ade22b911f7c5b416f724be44c924dcff9568fe Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 14 Aug 2024 10:41:50 +0200 Subject: [PATCH 2/6] feat: refactor error_handling example --- examples/error_handling.py | 55 +++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/examples/error_handling.py b/examples/error_handling.py index 6f632f19..b36a7aca 100644 --- a/examples/error_handling.py +++ b/examples/error_handling.py @@ -1,37 +1,28 @@ import asyncio -import time -from datetime import datetime, timezone +from typing import MutableMapping +from influxdb_client import InfluxDBClient from influxdb_client.client.exceptions import InfluxDBError - -from influxdb_client.rest import ApiException - -from influxdb_client.client.write_api import SYNCHRONOUS - -from influxdb_client import WritePrecision, InfluxDBClient, Point from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync +from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.rest import ApiException +# To encapsulate functions used in batch writing class BatchCB(object): - @staticmethod def success(self, conf: (str, str, str), data: str): print(f"Write success: {conf}, data: {data}") - @staticmethod def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): - print(f"Write failed: {conf}, data: {data}, error: {exception.message}") - print(f" Date: {exception.headers.get("Date")}") - print(f" X-Influxdb-Build: {exception.headers.get("X-Influxdb-Build")}") - print(f" X-Influxdb-Version: {exception.headers.get("X-Influxdb-Version")}") - print(f" X-Platform-Error-Code: {exception.headers.get("X-Platform-Error-Code")}") - print(f" Retry-After: {exception.headers.get("Retry-After")}") - - @staticmethod + print(f"\nBatch -> Write failed: {conf}, data: {data}, error: {exception.message}") + report_headers(exception.headers) + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Write failed but retryable: {conf}, data: {data}, error: {exception}") +# simple reporter that server is available def report_ping(ping: bool): if not ping: raise ValueError("InfluxDB: Failed to ping server") @@ -39,6 +30,16 @@ def report_ping(ping: bool): print("InfluxDB: ready") +# report some useful expected header fields +def report_headers(headers: MutableMapping[str, str]): + print(" Date: ", headers.get("Date")) + print(" X-Influxdb-Build: ", headers.get("X-Influxdb-Build")) + print(" X-Influxdb-Version: ", headers.get("X-Influxdb-Version")) + print(" X-Platform-Error-Code: ", headers.get("X-Platform-Error-Code")) + print(" Retry-After: ", headers.get("Retry-After")) # Should be None + + +# try a write using a synchronous call def use_sync(): print("Using sync") with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: @@ -46,16 +47,13 @@ def use_sync(): try: client.write_api(write_options=SYNCHRONOUS).write(bucket="my-bucket", record="cpu,location=G4 usage=") except ApiException as ae: - print("\nCaught ae: ", ae.message) - print(" Date: ", ae.headers.get("Date")) - print(" X-Influxdb-Build: ", ae.headers.get("X-Influxdb-Build")) - print(" X-Influxdb-Version: ", ae.headers.get("X-Influxdb-Version")) - print(" X-Platform-Error-Code: ", ae.headers.get("X-Platform-Error-Code")) - print(" Retry-After: ", ae.headers.get("Retry-After")) # Should be None + print("\nSync -> Caught ApiException: ", ae.message) + report_headers(ae.headers) print("Sync write done") +# try a write using batch API def use_batch(): print("Using batch") with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: @@ -65,8 +63,10 @@ def use_batch(): retry_callback=cb.retry) as write_api: write_api.write(bucket="my-bucket", record="cpu,location=G9 usage=") print("Batch write sent") + print("Batch write done") +# try a write using async.io async def use_async(): print("Using async") async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: @@ -74,13 +74,14 @@ async def use_async(): try: await client.write_api().write(bucket="my-bucket", record="cpu,location=G7 usage=") except InfluxDBError as ie: - print("\nCaught ie: ", ie) + print("\nAsync -> Caught InfluxDBError: ", ie.message) + report_headers(ie.headers) print("Async write done") if __name__ == "__main__": use_sync() - print("\n Continuing...") + print("\n Continuing...\n") use_batch() - print("\n Continuing...") + print("\n Continuing...\n") asyncio.run(use_async()) From fd4a895e4a94cda72fc6e644baea20e407c303f2 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 14 Aug 2024 10:48:07 +0200 Subject: [PATCH 3/6] fix: rename example to http_error_handling.py --- examples/README.md | 1 + examples/{error_handling.py => http_error_handling.py} | 0 2 files changed, 1 insertion(+) rename examples/{error_handling.py => http_error_handling.py} (100%) diff --git a/examples/README.md b/examples/README.md index 1678d00e..2b42ffd7 100644 --- a/examples/README.md +++ b/examples/README.md @@ -15,6 +15,7 @@ - manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) - install Apache Arrow `pip install pyarrow` dependency - [write_batching_by_bytes_count.py](write_batching_by_bytes_count.py) - How to use RxPY to prepare batches by maximum bytes count. +- [http_error_handling.py](http_error_handling.py) - How to leverage HttpHeader information when errors are returned on write. ## Queries - [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV` diff --git a/examples/error_handling.py b/examples/http_error_handling.py similarity index 100% rename from examples/error_handling.py rename to examples/http_error_handling.py From 182fbffa682204fdb7dd34b41b41ffb458e0a7c1 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 14 Aug 2024 11:11:17 +0200 Subject: [PATCH 4/6] chore: remove debug message from test --- tests/test_InfluxDBClientAsync.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_InfluxDBClientAsync.py b/tests/test_InfluxDBClientAsync.py index e0289846..7f8c6214 100644 --- a/tests/test_InfluxDBClientAsync.py +++ b/tests/test_InfluxDBClientAsync.py @@ -390,7 +390,6 @@ async def test_query_exception_propagation(self): with pytest.raises(InfluxDBError) as e: await self.client.query_api().query("buckets()", "my-org") self.assertEqual("unauthorized access", e.value.message) - print("DEBUG e.headers: ", e.value.headers) @async_test async def test_write_exception_propagation(self): From 7f3ee2f42e77cc416ac43705e6ef9b0f364675cf Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 14 Aug 2024 13:50:46 +0200 Subject: [PATCH 5/6] docs: updates CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 923317ab..e22bf238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Examples: 1. [#664](https://github.com/influxdata/influxdb-client-python/pull/664/): Multiprocessing example uses new source of data +1. [#665](https://github.com/influxdata/influxdb-client-python/pull/665): Shows how to leverage header fields in errors returned on write. ## 1.45.0 [2024-08-12] From db7df2071d9d122b5984d7519922bcd0d20c0666 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 14 Aug 2024 17:09:50 +0200 Subject: [PATCH 6/6] chore: refactor example http_error_handling to use cloud and header trace-id --- examples/http_error_handling.py | 67 ++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 14 deletions(-) diff --git a/examples/http_error_handling.py b/examples/http_error_handling.py index b36a7aca..c125a7ff 100644 --- a/examples/http_error_handling.py +++ b/examples/http_error_handling.py @@ -1,4 +1,16 @@ +""" +Illustrates getting header values from Errors that may occur on write. + +To test against cloud set the following environment variables: + INFLUX_URL + INFLUX_TOKEN + INFLUX_DATABASE + INFLUX_ORG + +...otherwise will run against a standard OSS endpoint. +""" import asyncio +import os from typing import MutableMapping from influxdb_client import InfluxDBClient @@ -8,6 +20,30 @@ from influxdb_client.rest import ApiException +def get_envar(key, default): + try: + return os.environ[key] + except: + return default + + +class Config(object): + + def __init__(self): + self.url = get_envar("INFLUX_URL", "http://localhost:8086") + self.token = get_envar("INFLUX_TOKEN", "my-token") + self.bucket = get_envar("INFLUX_DATABASE", "my-bucket") + self.org = get_envar("INFLUX_ORG", "my-org") + + def __str__(self): + return (f"config:\n" + f" url: {self.url}\n" + f" token: ****redacted*****\n" + f" bucket: {self.bucket}\n" + f" org: {self.org}\n" + ) + + # To encapsulate functions used in batch writing class BatchCB(object): @@ -34,18 +70,19 @@ def report_ping(ping: bool): def report_headers(headers: MutableMapping[str, str]): print(" Date: ", headers.get("Date")) print(" X-Influxdb-Build: ", headers.get("X-Influxdb-Build")) - print(" X-Influxdb-Version: ", headers.get("X-Influxdb-Version")) - print(" X-Platform-Error-Code: ", headers.get("X-Platform-Error-Code")) + print(" X-Influxdb-Version: ", headers.get("X-Influxdb-Version")) # OSS version, Cloud should be None + print(" X-Platform-Error-Code: ", headers.get("X-Platform-Error-Code")) # OSS invalid, Cloud should be None print(" Retry-After: ", headers.get("Retry-After")) # Should be None + print(" Trace-Id: ", headers.get("Trace-Id")) # OSS should be None, Cloud should return value # try a write using a synchronous call -def use_sync(): +def use_sync(conf: Config): print("Using sync") - with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client: report_ping(client.ping()) try: - client.write_api(write_options=SYNCHRONOUS).write(bucket="my-bucket", record="cpu,location=G4 usage=") + client.write_api(write_options=SYNCHRONOUS).write(bucket=conf.bucket, record="cpu,location=G4 usage=") except ApiException as ae: print("\nSync -> Caught ApiException: ", ae.message) report_headers(ae.headers) @@ -54,25 +91,25 @@ def use_sync(): # try a write using batch API -def use_batch(): +def use_batch(conf: Config): print("Using batch") - with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: + with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client: cb = BatchCB() with client.write_api(success_callback=cb.success, error_callback=cb.error, retry_callback=cb.retry) as write_api: - write_api.write(bucket="my-bucket", record="cpu,location=G9 usage=") + write_api.write(bucket=conf.bucket, record="cpu,location=G9 usage=") print("Batch write sent") print("Batch write done") # try a write using async.io -async def use_async(): +async def use_async(conf: Config): print("Using async") - async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: + async with InfluxDBClientAsync(url=conf.url, token=conf.token, org=conf.org) as client: report_ping(await client.ping()) try: - await client.write_api().write(bucket="my-bucket", record="cpu,location=G7 usage=") + await client.write_api().write(bucket=conf.bucket, record="cpu,location=G7 usage=") except InfluxDBError as ie: print("\nAsync -> Caught InfluxDBError: ", ie.message) report_headers(ie.headers) @@ -80,8 +117,10 @@ async def use_async(): if __name__ == "__main__": - use_sync() + conf = Config() + print(conf) + use_sync(conf) print("\n Continuing...\n") - use_batch() + use_batch(conf) print("\n Continuing...\n") - asyncio.run(use_async()) + asyncio.run(use_async(conf))