Skip to content

feat: add headers field to InfluxDBError and add example of use #665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Examples:
1. [#664](https://github.com/influxdata/influxdb-client-python/pull/664/): Multiprocessing example uses new source of data
1. [#665](https://github.com/influxdata/influxdb-client-python/pull/665): Shows how to leverage header fields in errors returned on write.

## 1.45.0 [2024-08-12]

Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
- install Apache Arrow `pip install pyarrow` dependency
- [write_batching_by_bytes_count.py](write_batching_by_bytes_count.py) - How to use RxPY to prepare batches by maximum bytes count.
- [http_error_handling.py](http_error_handling.py) - How to leverage HttpHeader information when errors are returned on write.

## Queries
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
Expand Down
126 changes: 126 additions & 0 deletions examples/http_error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
Illustrates getting header values from Errors that may occur on write.

To test against cloud set the following environment variables:
INFLUX_URL
INFLUX_TOKEN
INFLUX_DATABASE
INFLUX_ORG

...otherwise will run against a standard OSS endpoint.
"""
import asyncio
import os
from typing import MutableMapping

from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException


def get_envar(key, default):
try:
return os.environ[key]
except:
return default


class Config(object):

def __init__(self):
self.url = get_envar("INFLUX_URL", "http://localhost:8086")
self.token = get_envar("INFLUX_TOKEN", "my-token")
self.bucket = get_envar("INFLUX_DATABASE", "my-bucket")
self.org = get_envar("INFLUX_ORG", "my-org")

def __str__(self):
return (f"config:\n"
f" url: {self.url}\n"
f" token: ****redacted*****\n"
f" bucket: {self.bucket}\n"
f" org: {self.org}\n"
)


# To encapsulate functions used in batch writing
class BatchCB(object):

def success(self, conf: (str, str, str), data: str):
print(f"Write success: {conf}, data: {data}")

def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"\nBatch -> Write failed: {conf}, data: {data}, error: {exception.message}")
report_headers(exception.headers)

def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Write failed but retryable: {conf}, data: {data}, error: {exception}")


# simple reporter that server is available
def report_ping(ping: bool):
if not ping:
raise ValueError("InfluxDB: Failed to ping server")
else:
print("InfluxDB: ready")


# report some useful expected header fields
def report_headers(headers: MutableMapping[str, str]):
print(" Date: ", headers.get("Date"))
print(" X-Influxdb-Build: ", headers.get("X-Influxdb-Build"))
print(" X-Influxdb-Version: ", headers.get("X-Influxdb-Version")) # OSS version, Cloud should be None
print(" X-Platform-Error-Code: ", headers.get("X-Platform-Error-Code")) # OSS invalid, Cloud should be None
print(" Retry-After: ", headers.get("Retry-After")) # Should be None
print(" Trace-Id: ", headers.get("Trace-Id")) # OSS should be None, Cloud should return value


# try a write using a synchronous call
def use_sync(conf: Config):
print("Using sync")
with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client:
report_ping(client.ping())
try:
client.write_api(write_options=SYNCHRONOUS).write(bucket=conf.bucket, record="cpu,location=G4 usage=")
except ApiException as ae:
print("\nSync -> Caught ApiException: ", ae.message)
report_headers(ae.headers)

print("Sync write done")


# try a write using batch API
def use_batch(conf: Config):
print("Using batch")
with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client:
cb = BatchCB()
with client.write_api(success_callback=cb.success,
error_callback=cb.error,
retry_callback=cb.retry) as write_api:
write_api.write(bucket=conf.bucket, record="cpu,location=G9 usage=")
print("Batch write sent")
print("Batch write done")


# try a write using async.io
async def use_async(conf: Config):
print("Using async")
async with InfluxDBClientAsync(url=conf.url, token=conf.token, org=conf.org) as client:
report_ping(await client.ping())
try:
await client.write_api().write(bucket=conf.bucket, record="cpu,location=G7 usage=")
except InfluxDBError as ie:
print("\nAsync -> Caught InfluxDBError: ", ie.message)
report_headers(ie.headers)
print("Async write done")


if __name__ == "__main__":
conf = Config()
print(conf)
use_sync(conf)
print("\n Continuing...\n")
use_batch(conf)
print("\n Continuing...\n")
asyncio.run(use_async(conf))
2 changes: 2 additions & 0 deletions influxdb_client/client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ def __init__(self, response: HTTPResponse = None, message: str = None):
self.response = response
self.message = self._get_message(response)
if isinstance(response, HTTPResponse): # response is HTTPResponse
self.headers = response.headers
self.retry_after = response.headers.get('Retry-After')
else: # response is RESTResponse
self.headers = response.getheaders()
self.retry_after = response.getheader('Retry-After')
else:
self.response = None
Expand Down
19 changes: 19 additions & 0 deletions tests/test_InfluxDBClientAsync.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import re
import unittest
import os
from datetime import datetime, timezone
Expand Down Expand Up @@ -390,6 +391,24 @@ async def test_query_exception_propagation(self):
await self.client.query_api().query("buckets()", "my-org")
self.assertEqual("unauthorized access", e.value.message)

@async_test
async def test_write_exception_propagation(self):
await self.client.close()
self.client = InfluxDBClientAsync(url="http://localhost:8086", token="wrong", org="my-org")

with pytest.raises(InfluxDBError) as e:
await self.client.write_api().write(bucket="my_bucket",
record="temperature,location=hic cels=")
self.assertEqual("unauthorized access", e.value.message)
headers = e.value.headers
self.assertIsNotNone(headers)
self.assertIsNotNone(headers.get("Content-Length"))
self.assertIsNotNone(headers.get("Date"))
self.assertIsNotNone(headers.get("X-Platform-Error-Code"))
self.assertIn("application/json", headers.get("Content-Type"))
self.assertTrue(re.compile("^v.*").match(headers.get("X-Influxdb-Version")))
self.assertEqual("OSS", headers.get("X-Influxdb-Build"))

@async_test
@aioresponses()
async def test_parse_utf8_two_bytes_character(self, mocked):
Expand Down
37 changes: 37 additions & 0 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
from __future__ import absolute_import

import datetime
import json
import logging
import os
import re
import sys
import unittest
from collections import namedtuple
from datetime import timedelta
from multiprocessing.pool import ApplyResult
from types import SimpleNamespace

import httpretty
import pytest
Expand Down Expand Up @@ -190,6 +194,17 @@ def test_write_error(self):

self.assertEqual(400, exception.status)
self.assertEqual("Bad Request", exception.reason)
# assert headers
self.assertIsNotNone(exception.headers)
self.assertIsNotNone(exception.headers.get("Content-Length"))
self.assertIsNotNone(exception.headers.get("Date"))
self.assertIsNotNone(exception.headers.get("X-Platform-Error-Code"))
self.assertIn("application/json", exception.headers.get("Content-Type"))
self.assertTrue(re.compile("^v.*").match(exception.headers.get("X-Influxdb-Version")))
self.assertEqual("OSS", exception.headers.get("X-Influxdb-Build"))
# assert body
b = json.loads(exception.body, object_hook=lambda d: SimpleNamespace(**d))
self.assertTrue(re.compile("^unable to parse.*invalid field format").match(b.message))

def test_write_dictionary(self):
_bucket = self.create_test_bucket()
Expand Down Expand Up @@ -609,6 +624,28 @@ def test_write_result(self):
self.assertEqual(None, result.get())
self.delete_test_bucket(_bucket)

def test_write_error(self):
_bucket = self.create_test_bucket()

_record = "h2o_feet,location=coyote_creek level\\ water_level="
result = self.write_client.write(_bucket.name, self.org, _record)

with self.assertRaises(ApiException) as cm:
result.get()
self.assertEqual(400, cm.exception.status)
self.assertEqual("Bad Request", cm.exception.reason)
# assert headers
self.assertIsNotNone(cm.exception.headers)
self.assertIsNotNone(cm.exception.headers.get("Content-Length"))
self.assertIsNotNone(cm.exception.headers.get("Date"))
self.assertIsNotNone(cm.exception.headers.get("X-Platform-Error-Code"))
self.assertIn("application/json", cm.exception.headers.get("Content-Type"))
self.assertTrue(re.compile("^v.*").match(cm.exception.headers.get("X-Influxdb-Version")))
self.assertEqual("OSS", cm.exception.headers.get("X-Influxdb-Build"))
# assert body
b = json.loads(cm.exception.body, object_hook=lambda d: SimpleNamespace(**d))
self.assertTrue(re.compile("^unable to parse.*missing field value").match(b.message))

def test_write_dictionaries(self):
bucket = self.create_test_bucket()

Expand Down