Skip to content

WriteApi should use ThreadPoolScheduler for batching #561

Closed
@goznauk

Description

@goznauk

When creating WriteApi instance, window_with_time_or_count is called, which is usingTimeoutScheduler for default. This creates new thread for every flush interval.

    def __init__(self,
                 influxdb_client,
                 write_options: WriteOptions = WriteOptions(),
                 point_settings: PointSettings = PointSettings(),
                 **kwargs) -> None:
   
        ...

        if self._write_options.write_type is WriteType.batching:
            # Define Subject that listen incoming data and produces writes into InfluxDB
            self._subject = Subject()

            self._disposable = self._subject.pipe(
                # Split incoming data to windows by batch_size or flush_interval
                ops.window_with_time_or_count(count=write_options.batch_size,
                                              timespan=timedelta(milliseconds=write_options.flush_interval)),
                ...
  

Instead of TimeoutScheduler creating millions of threads, we can just use ThreadPoolScheduler(1) for handling window. TimeoutScheduler might not make sense to use for python. It can easily changed by just a line of code.

   ops.window_with_time_or_count(count=write_options.batch_size,
                                              timespan=timedelta(milliseconds=write_options.flush_interval), scheduler=ThreadPoolScheduler(1)),
         

Activity

added a commit that references this issue on Feb 22, 2023
goznauk

goznauk commented on Mar 20, 2023

@goznauk
ContributorAuthor

Quick Fix

# noinspection PyProtectedMember
def monkey_patch_no_timeout_scheduler(write_api):
    # https://github.com/influxdata/influxdb-client-python/pull/562
    if write_api._write_options.write_type is WriteType.batching:
        write_api._subject.on_completed()
        # Define Subject that listen incoming data and produces writes into InfluxDB
        write_api._subject = Subject()
        write_api._disposable = write_api._subject.pipe(
            # Split incoming data to windows by batch_size or flush_interval
            ops.window_with_time_or_count(count=write_api._write_options.batch_size,
                                          timespan=timedelta(milliseconds=write_api._write_options.flush_interval),
                                          scheduler=ThreadPoolScheduler(1)),
            # Map  window into groups defined by 'organization', 'bucket' and 'precision'
            ops.flat_map(lambda window: window.pipe(
                # Group window by 'organization', 'bucket' and 'precision'
                ops.group_by(lambda batch_item: batch_item.key),
                # Create batch (concatenation line protocols by \n)
                ops.map(lambda group: group.pipe(
                    ops.to_iterable(),
                    ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs)))
                )),
                ops.merge_all()
            )),
            # Write data into InfluxDB (possibility to retry if its fail)
            ops.filter(lambda batch: batch.size > 0),
            ops.map(mapper=lambda batch: write_api._to_response(data=batch, delay=write_api._jitter_delay())),
            ops.merge_all()
        ).subscribe(write_api._on_next, write_api._on_error, write_api._on_complete)
write_api = client.write_api(write_options=WriteOptions(...))
monkey_patch_no_timeout_scheduler(write_api)
added a commit that references this issue on Aug 5, 2023
64a3df2
added a commit that references this issue on Jan 4, 2024
88098ae
added this to the 1.40.0 milestone on Jan 4, 2024
added a commit that references this issue on Jan 4, 2024
8286f45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      WriteApi should use ThreadPoolScheduler for batching · Issue #561 · influxdata/influxdb-client-python