diff --git a/CHANGELOG.md b/CHANGELOG.md index 923317ab..e22bf238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Examples: 1. [#664](https://github.com/influxdata/influxdb-client-python/pull/664/): Multiprocessing example uses new source of data +1. [#665](https://github.com/influxdata/influxdb-client-python/pull/665): Shows how to leverage header fields in errors returned on write. ## 1.45.0 [2024-08-12] diff --git a/examples/README.md b/examples/README.md index 1678d00e..2b42ffd7 100644 --- a/examples/README.md +++ b/examples/README.md @@ -15,6 +15,7 @@ - manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) - install Apache Arrow `pip install pyarrow` dependency - [write_batching_by_bytes_count.py](write_batching_by_bytes_count.py) - How to use RxPY to prepare batches by maximum bytes count. +- [http_error_handling.py](http_error_handling.py) - How to leverage HttpHeader information when errors are returned on write. ## Queries - [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV` diff --git a/examples/http_error_handling.py b/examples/http_error_handling.py new file mode 100644 index 00000000..c125a7ff --- /dev/null +++ b/examples/http_error_handling.py @@ -0,0 +1,126 @@ +""" +Illustrates getting header values from Errors that may occur on write. + +To test against cloud set the following environment variables: + INFLUX_URL + INFLUX_TOKEN + INFLUX_DATABASE + INFLUX_ORG + +...otherwise will run against a standard OSS endpoint. +""" +import asyncio +import os +from typing import MutableMapping + +from influxdb_client import InfluxDBClient +from influxdb_client.client.exceptions import InfluxDBError +from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync +from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.rest import ApiException + + +def get_envar(key, default): + try: + return os.environ[key] + except: + return default + + +class Config(object): + + def __init__(self): + self.url = get_envar("INFLUX_URL", "http://localhost:8086") + self.token = get_envar("INFLUX_TOKEN", "my-token") + self.bucket = get_envar("INFLUX_DATABASE", "my-bucket") + self.org = get_envar("INFLUX_ORG", "my-org") + + def __str__(self): + return (f"config:\n" + f" url: {self.url}\n" + f" token: ****redacted*****\n" + f" bucket: {self.bucket}\n" + f" org: {self.org}\n" + ) + + +# To encapsulate functions used in batch writing +class BatchCB(object): + + def success(self, conf: (str, str, str), data: str): + print(f"Write success: {conf}, data: {data}") + + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"\nBatch -> Write failed: {conf}, data: {data}, error: {exception.message}") + report_headers(exception.headers) + + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Write failed but retryable: {conf}, data: {data}, error: {exception}") + + +# simple reporter that server is available +def report_ping(ping: bool): + if not ping: + raise ValueError("InfluxDB: Failed to ping server") + else: + print("InfluxDB: ready") + + +# report some useful expected header fields +def report_headers(headers: MutableMapping[str, str]): + print(" Date: ", headers.get("Date")) + print(" X-Influxdb-Build: ", headers.get("X-Influxdb-Build")) + print(" X-Influxdb-Version: ", headers.get("X-Influxdb-Version")) # OSS version, Cloud should be None + print(" X-Platform-Error-Code: ", headers.get("X-Platform-Error-Code")) # OSS invalid, Cloud should be None + print(" Retry-After: ", headers.get("Retry-After")) # Should be None + print(" Trace-Id: ", headers.get("Trace-Id")) # OSS should be None, Cloud should return value + + +# try a write using a synchronous call +def use_sync(conf: Config): + print("Using sync") + with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client: + report_ping(client.ping()) + try: + client.write_api(write_options=SYNCHRONOUS).write(bucket=conf.bucket, record="cpu,location=G4 usage=") + except ApiException as ae: + print("\nSync -> Caught ApiException: ", ae.message) + report_headers(ae.headers) + + print("Sync write done") + + +# try a write using batch API +def use_batch(conf: Config): + print("Using batch") + with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client: + cb = BatchCB() + with client.write_api(success_callback=cb.success, + error_callback=cb.error, + retry_callback=cb.retry) as write_api: + write_api.write(bucket=conf.bucket, record="cpu,location=G9 usage=") + print("Batch write sent") + print("Batch write done") + + +# try a write using async.io +async def use_async(conf: Config): + print("Using async") + async with InfluxDBClientAsync(url=conf.url, token=conf.token, org=conf.org) as client: + report_ping(await client.ping()) + try: + await client.write_api().write(bucket=conf.bucket, record="cpu,location=G7 usage=") + except InfluxDBError as ie: + print("\nAsync -> Caught InfluxDBError: ", ie.message) + report_headers(ie.headers) + print("Async write done") + + +if __name__ == "__main__": + conf = Config() + print(conf) + use_sync(conf) + print("\n Continuing...\n") + use_batch(conf) + print("\n Continuing...\n") + asyncio.run(use_async(conf)) diff --git a/influxdb_client/client/exceptions.py b/influxdb_client/client/exceptions.py index 2ca235c8..bfa453e2 100644 --- a/influxdb_client/client/exceptions.py +++ b/influxdb_client/client/exceptions.py @@ -16,8 +16,10 @@ def __init__(self, response: HTTPResponse = None, message: str = None): self.response = response self.message = self._get_message(response) if isinstance(response, HTTPResponse): # response is HTTPResponse + self.headers = response.headers self.retry_after = response.headers.get('Retry-After') else: # response is RESTResponse + self.headers = response.getheaders() self.retry_after = response.getheader('Retry-After') else: self.response = None diff --git a/tests/test_InfluxDBClientAsync.py b/tests/test_InfluxDBClientAsync.py index 20eabd7d..7f8c6214 100644 --- a/tests/test_InfluxDBClientAsync.py +++ b/tests/test_InfluxDBClientAsync.py @@ -1,5 +1,6 @@ import asyncio import logging +import re import unittest import os from datetime import datetime, timezone @@ -390,6 +391,24 @@ async def test_query_exception_propagation(self): await self.client.query_api().query("buckets()", "my-org") self.assertEqual("unauthorized access", e.value.message) + @async_test + async def test_write_exception_propagation(self): + await self.client.close() + self.client = InfluxDBClientAsync(url="http://localhost:8086", token="wrong", org="my-org") + + with pytest.raises(InfluxDBError) as e: + await self.client.write_api().write(bucket="my_bucket", + record="temperature,location=hic cels=") + self.assertEqual("unauthorized access", e.value.message) + headers = e.value.headers + self.assertIsNotNone(headers) + self.assertIsNotNone(headers.get("Content-Length")) + self.assertIsNotNone(headers.get("Date")) + self.assertIsNotNone(headers.get("X-Platform-Error-Code")) + self.assertIn("application/json", headers.get("Content-Type")) + self.assertTrue(re.compile("^v.*").match(headers.get("X-Influxdb-Version"))) + self.assertEqual("OSS", headers.get("X-Influxdb-Build")) + @async_test @aioresponses() async def test_parse_utf8_two_bytes_character(self, mocked): diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 474bf394..b2cc7ca7 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -3,12 +3,16 @@ from __future__ import absolute_import import datetime +import json +import logging import os +import re import sys import unittest from collections import namedtuple from datetime import timedelta from multiprocessing.pool import ApplyResult +from types import SimpleNamespace import httpretty import pytest @@ -190,6 +194,17 @@ def test_write_error(self): self.assertEqual(400, exception.status) self.assertEqual("Bad Request", exception.reason) + # assert headers + self.assertIsNotNone(exception.headers) + self.assertIsNotNone(exception.headers.get("Content-Length")) + self.assertIsNotNone(exception.headers.get("Date")) + self.assertIsNotNone(exception.headers.get("X-Platform-Error-Code")) + self.assertIn("application/json", exception.headers.get("Content-Type")) + self.assertTrue(re.compile("^v.*").match(exception.headers.get("X-Influxdb-Version"))) + self.assertEqual("OSS", exception.headers.get("X-Influxdb-Build")) + # assert body + b = json.loads(exception.body, object_hook=lambda d: SimpleNamespace(**d)) + self.assertTrue(re.compile("^unable to parse.*invalid field format").match(b.message)) def test_write_dictionary(self): _bucket = self.create_test_bucket() @@ -609,6 +624,28 @@ def test_write_result(self): self.assertEqual(None, result.get()) self.delete_test_bucket(_bucket) + def test_write_error(self): + _bucket = self.create_test_bucket() + + _record = "h2o_feet,location=coyote_creek level\\ water_level=" + result = self.write_client.write(_bucket.name, self.org, _record) + + with self.assertRaises(ApiException) as cm: + result.get() + self.assertEqual(400, cm.exception.status) + self.assertEqual("Bad Request", cm.exception.reason) + # assert headers + self.assertIsNotNone(cm.exception.headers) + self.assertIsNotNone(cm.exception.headers.get("Content-Length")) + self.assertIsNotNone(cm.exception.headers.get("Date")) + self.assertIsNotNone(cm.exception.headers.get("X-Platform-Error-Code")) + self.assertIn("application/json", cm.exception.headers.get("Content-Type")) + self.assertTrue(re.compile("^v.*").match(cm.exception.headers.get("X-Influxdb-Version"))) + self.assertEqual("OSS", cm.exception.headers.get("X-Influxdb-Build")) + # assert body + b = json.loads(cm.exception.body, object_hook=lambda d: SimpleNamespace(**d)) + self.assertTrue(re.compile("^unable to parse.*missing field value").match(b.message)) + def test_write_dictionaries(self): bucket = self.create_test_bucket()