Closed
Description
When creating WriteApi instance, window_with_time_or_count
is called, which is usingTimeoutScheduler
for default. This creates new thread for every flush interval.
def __init__(self,
influxdb_client,
write_options: WriteOptions = WriteOptions(),
point_settings: PointSettings = PointSettings(),
**kwargs) -> None:
...
if self._write_options.write_type is WriteType.batching:
# Define Subject that listen incoming data and produces writes into InfluxDB
self._subject = Subject()
self._disposable = self._subject.pipe(
# Split incoming data to windows by batch_size or flush_interval
ops.window_with_time_or_count(count=write_options.batch_size,
timespan=timedelta(milliseconds=write_options.flush_interval)),
...
Instead of TimeoutScheduler
creating millions of threads, we can just use ThreadPoolScheduler(1) for handling window. TimeoutScheduler might not make sense to use for python. It can easily changed by just a line of code.
ops.window_with_time_or_count(count=write_options.batch_size,
timespan=timedelta(milliseconds=write_options.flush_interval), scheduler=ThreadPoolScheduler(1)),
Metadata
Metadata
Assignees
Labels
No labels
Activity
Use ThreadPoolScheduler for WriteApi batch subject
goznauk commentedon Mar 20, 2023
Quick Fix
fix: prevent creating unnecessary threads repeatedly
fix: prevent creating unnecessary threads repeatedly
fix: prevent creating unnecessary threads repeatedly (#562)