Skip to content

The error asyncio.exceptions.CancelledError occurred while parsing the CSV. #671

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
kivdev opened this issue Sep 17, 2024 · 5 comments · Fixed by #679
Closed

The error asyncio.exceptions.CancelledError occurred while parsing the CSV. #671

kivdev opened this issue Sep 17, 2024 · 5 comments · Fixed by #679
Assignees
Labels
bug Something isn't working
Milestone

Comments

@kivdev
Copy link

kivdev commented Sep 17, 2024

Specifications

  • Client Version: 1.46.0
  • InfluxDB Version: 1.8.10
  • Platform: Debian (docker)

Code sample to reproduce problem

async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
        # Stream of FluxRecords
        query_api = client.query_api()
        records = await query_api.query_stream(
        (
            'from(bucket:"{bucket}") |> range(start: {start}, stop: {stop})'
            '|> filter(fn: (r) => r["_measurement"] == "{measurement}")'
            '|> filter(fn: (r) => r.name == "{metric}")'
        ).format(
            bucket='metrics', start='2023-01-01T00:00:00Z', stop='2023-12-31T23:59:59Z', metric='http_request_total_count', measurement='http')
        )
        async for record in records:
            print(record)

Expected behavior

Executed without errors.

Actual behavior

File "/app/my_app/service.py", line 51, in get_influxdb_data
    async for record in records:
  File "/usr/local/lib/python3.11/site-packages/influxdb_client/client/flux_csv_parser.py", line 141, in _parse_flux_response_async
    async for csv in self._reader:
  File "/usr/local/lib/python3.11/site-packages/aiocsv/readers.py", line 54, in __anext__
    return await self._parser.__anext__()
asyncio.exceptions.CancelledError

Additional info

The error occurs only if there is a lot of data. (251,395 records with JSON content)

@kivdev kivdev added the bug Something isn't working label Sep 17, 2024
@bednar
Copy link
Contributor

bednar commented Sep 25, 2024

Hi @kivdev,

Thank you for reaching out with your issue. To better understand and address the problem you're experiencing, it would be incredibly helpful if you could share an example of how your data looks. An anonymized export from the InfluxDB UI would be ideal.

This information will allow us to accurately simulate your scenario and work towards a resolution.

Thanks a lot for your cooperation. Looking forward to your response.

Best Regards.

@kivdev
Copy link
Author

kivdev commented Sep 27, 2024

Hi @bednar,

The data is stored as a string in the format [{"client": str, "requests": int, "date_report": str}, ....]. Unfortunately, I cannot provide a slice from the database.

@nastynaz
Copy link

nastynaz commented Oct 22, 2024

I'm also having this error. I'm returning a query with 300k rows. Any updates? @bednar

The exact code I'm using is:

result = await influxdb_connector.query_data_frame(query, use_extension_dtypes=True)

The flux query is:

  bucket = from(bucket: "quotes")
    |> range(start: {start_rfc3339}, stop: {end_rfc3339})
    |> filter(fn: (r) => r["_measurement"] == "quote")
    |> filter(fn: (r) => r["coin"] == "{coin}")
    |> filter(fn: (r) => r["_field"] =~ /^(mid_price|bid_price|ask_price)$/ )

  bucket
    |> group(columns: ["exchange"])
    |> drop(columns: ["_start", "_stop", "_measurement", "fiat", "coin"])
    |> rename(columns: {{_time: "time"}})
    |> yield(name: "result")

The bottom of the exception output:

File [~/dev/brain/analysis/src/data_loader/influxdb_loader.py:16](http://localhost:8888/doc/tree/~/dev/brain/analysis/src/data_loader/influxdb_loader.py#line=15), in InfluxDBLoader.load_data(query)
    14 try:
    15   logger.debug(f"executing influxdb query {query}")
---> 16   result = await influxdb_connector.query_data_frame(query, use_extension_dtypes=True)
    17   return result
    18 except Exception as e:

File [~/dev/brain/analysis/.venv/lib/python3.11/site-packages/influxdb_client/client/query_api_async.py:161](http://localhost:8888/doc/tree/~/dev/brain/analysis/.venv/lib/python3.11/site-packages/influxdb_client/client/query_api_async.py#line=160), in QueryApiAsync.query_data_frame(self, query, org, data_frame_index, params, use_extension_dtypes)
   157 _generator = await self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index,
   158                                                 params=params, use_extension_dtypes=use_extension_dtypes)
   160 dataframes = []
--> 161 async for dataframe in _generator:
   162     dataframes.append(dataframe)
   164 return self._to_data_frames(dataframes)

File [~/dev/brain/analysis/.venv/lib/python3.11/site-packages/influxdb_client/client/flux_csv_parser.py:141](http://localhost:8888/doc/tree/~/dev/brain/analysis/.venv/lib/python3.11/site-packages/influxdb_client/client/flux_csv_parser.py#line=140), in FluxCsvParser._parse_flux_response_async(self)
   138 metadata = _FluxCsvParserMetadata()
   140 try:
--> 141     async for csv in self._reader:
   142         for val in self._parse_flux_response_row(metadata, csv):
   143             yield val

File [~/dev/brain/analysis/.venv/lib/python3.11/site-packages/aiocsv/readers.py:54](http://localhost:8888/doc/tree/~/dev/brain/analysis/.venv/lib/python3.11/site-packages/aiocsv/readers.py#line=53), in AsyncReader.__anext__(self)
    53 async def __anext__(self) -> List[str]:
---> 54     return await self._parser.__anext__()

CancelledError:

Edit:
By breaking down my query into smaller time ranges it works. The upper limit for a single fetch seems to be around 2.4m rows before it fails

@karel-rehor
Copy link
Contributor

I've managed to reproduce this issue with the attached script. exploreIssue671.py.txt

$ python examples/exploreIssue671.py seed size 100000
$ python examples/exploreIssue671.py query

Will investigate further.

@karel-rehor
Copy link
Contributor

After further investigation. It appears the root cause is in the aiohttp library, which raises this exception when the timeout for the transaction has been exceeded. A simple solution is to add a timeout parameter when instantiating the async client, whenever a large batch of response records is anticipated. The default timeout value is currently 10_000 ms.

For example

async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org", timeout=60_000) as client:
   ...

I'm opening a PR to more clearly handle this exception in this situation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
4 participants