From 1e2f774b1be90577cf9188acf81507b63fb7cfb7 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 2 Nov 2021 14:32:30 +0100 Subject: [PATCH 1/3] feat: add MultiprocessingWriter to help user write data in independent OS process --- .gitignore | 1 + docs/api.rst | 12 + .../client/util/multiprocessing_helper.py | 205 ++++++++++++++++++ influxdb_client/client/write_api.py | 7 +- tests/test_MultiprocessingWriter.py | 89 ++++++++ 5 files changed, 313 insertions(+), 1 deletion(-) create mode 100644 influxdb_client/client/util/multiprocessing_helper.py create mode 100644 tests/test_MultiprocessingWriter.py diff --git a/.gitignore b/.gitignore index 3be131de..7634342b 100644 --- a/.gitignore +++ b/.gitignore @@ -47,6 +47,7 @@ coverage.xml .hypothesis/ .pytest_cache/ influxdb2_test/*.csv +tests/test_MultiprocessingWriter.txt # Translations *.mo diff --git a/docs/api.rst b/docs/api.rst index 92a161a5..4d17efec 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -69,3 +69,15 @@ DeleteApi .. autoclass:: influxdb_client.domain.DeletePredicateRequest :members: + +Helpers +""""""" +.. autoclass:: influxdb_client.client.util.date_utils.DateHelper + :members: + +.. autoclass:: influxdb_client.client.util.date_utils_pandas.PandasDateTimeHelper + :members: + +.. autoclass:: influxdb_client.client.util.multiprocessing_helper.MultiprocessingWriter + :members: + diff --git a/influxdb_client/client/util/multiprocessing_helper.py b/influxdb_client/client/util/multiprocessing_helper.py new file mode 100644 index 00000000..025a9368 --- /dev/null +++ b/influxdb_client/client/util/multiprocessing_helper.py @@ -0,0 +1,205 @@ +""" +Helpers classes to make easier use the client in multiprocessing environment. + +For more information how the multiprocessing works see Python's +`reference docs `_. +""" +import logging +import multiprocessing + +from influxdb_client import InfluxDBClient, WriteOptions +from influxdb_client.client.exceptions import InfluxDBError + +logger = logging.getLogger(__name__) + + +def _success_callback(conf: (str, str, str), data: str): + """Successfully writen batch.""" + logger.debug(f"Written batch: {conf}, data: {data}") + + +def _error_callback(conf: (str, str, str), data: str, exception: InfluxDBError): + """Unsuccessfully writen batch.""" + logger.debug(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + +def _retry_callback(conf: (str, str, str), data: str, exception: InfluxDBError): + """Retryable error.""" + logger.debug(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + + +class _PoisonPill: + """To notify process to terminate.""" + + pass + + +class MultiprocessingWriter(multiprocessing.Process): + """ + The Helper class to write data into InfluxDB in independent OS process. + + Example: + .. code-block:: python + + from influxdb_client import WriteOptions + from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter + + + def main(): + writer = MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", + write_options=WriteOptions(batch_size=100)) + writer.start() + + for x in range(1, 1000): + writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") + + writer.__del__() + + + if __name__ == '__main__': + main() + + + How to use with context_manager: + .. code-block:: python + + from influxdb_client import WriteOptions + from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter + + + def main(): + with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", + write_options=WriteOptions(batch_size=100)) as writer: + for x in range(1, 1000): + writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") + + + if __name__ == '__main__': + main() + + + How to handle batch events: + .. code-block:: python + + from influxdb_client import WriteOptions + from influxdb_client.client.exceptions import InfluxDBError + from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter + + + class BatchingCallback(object): + + def success(self, conf: (str, str, str), data: str): + print(f"Written batch: {conf}, data: {data}") + + def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Cannot write batch: {conf}, data: {data} due: {exception}") + + def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): + print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") + + + def main(): + callback = BatchingCallback() + with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", + success_callback=callback.success, + error_callback=callback.error, + retry_callback=callback.retry) as writer: + + for x in range(1, 1000): + writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") + + + if __name__ == '__main__': + main() + + + """ + + __started__ = False + __disposed__ = False + + def __init__(self, **kwargs) -> None: + """ + Initialize defaults. + + For more information how to initialize the writer see the examples above. + + :param kwargs: arguments are passed into ``__init__`` function of ``InfluxDBClient`` and ``write_api``. + """ + multiprocessing.Process.__init__(self) + self.kwargs = kwargs + self.client = None + self.write_api = None + self.queue_ = multiprocessing.Manager().Queue() + + def write(self, **kwargs) -> None: + """ + Append time-series data into underlying queue. + + For more information how to pass arguments see the examples above. + + :param kwargs: arguments are passed into ``write`` function of ``WriteApi`` + :return: None + """ + assert self.__disposed__ is False, 'Cannot write data: the writer is closed.' + assert self.__started__ is True, 'Cannot write data: the writer is not started.' + self.queue_.put(kwargs) + + def run(self): + """Initialize ``InfluxDBClient`` and waits for data to writes into InfluxDB.""" + # Initialize Client and Write API + self.client = InfluxDBClient(**self.kwargs) + self.write_api = self.client.write_api(write_options=self.kwargs.get('write_options', WriteOptions()), + success_callback=self.kwargs.get('success_callback', _success_callback), + error_callback=self.kwargs.get('error_callback', _error_callback), + retry_callback=self.kwargs.get('retry_callback', _retry_callback)) + # Infinite loop - until poison pill + while True: + next_record = self.queue_.get() + if type(next_record) is _PoisonPill: + # Poison pill means break the loop + self.terminate() + self.queue_.task_done() + break + self.write_api.write(**next_record) + self.queue_.task_done() + + def start(self) -> None: + """Start independent process for writing data into InfluxDB.""" + super().start() + self.__started__ = True + + def terminate(self) -> None: + """ + Cleanup resources in independent process. + + This function **cannot be used** to terminate the ``MultiprocessingWriter``. + If you want to finish your writes please call: ``__del__``. + """ + if self.write_api: + logger.info("flushing data...") + self.write_api.__del__() + self.write_api = None + if self.client: + self.client.__del__() + self.client = None + logger.info("closed") + + def __enter__(self): + """Enter the runtime context related to this object.""" + self.start() + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Exit the runtime context related to this object.""" + self.__del__() + + def __del__(self): + """Dispose the client and write_api.""" + if self.__started__: + self.queue_.put(_PoisonPill()) + self.queue_.join() + self.join() + self.queue_ = None + self.__started__ = False + self.__disposed__ = True diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index af7c5b03..86245924 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -565,4 +565,9 @@ def __setstate__(self, state): """Set your object with the provided dict.""" self.__dict__.update(state) # Init Rx - self.__init__(self._influxdb_client, self._write_options, self._point_settings) + self.__init__(self._influxdb_client, + self._write_options, + self._point_settings, + success_callback=self._success_callback, + error_callback=self._error_callback, + retry_callback=self._retry_callback) diff --git a/tests/test_MultiprocessingWriter.py b/tests/test_MultiprocessingWriter.py new file mode 100644 index 00000000..98f9b44d --- /dev/null +++ b/tests/test_MultiprocessingWriter.py @@ -0,0 +1,89 @@ +import os +import unittest +from datetime import datetime + +from influxdb_client import WritePrecision, InfluxDBClient +from influxdb_client.client.exceptions import InfluxDBError +from influxdb_client.client.util.date_utils import get_date_helper +from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter +from influxdb_client.client.write_api import SYNCHRONOUS + + +# noinspection PyUnusedLocal +def _error_callback(conf: (str, str, str), data: str, error: InfluxDBError): + with open("test_MultiprocessingWriter.txt", "w+") as file: + file.write(error.message) + + +# noinspection PyMethodMayBeStatic +class MultiprocessingWriterTest(unittest.TestCase): + + def setUp(self) -> None: + self.url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086") + self.token = os.getenv('INFLUXDB_V2_TOKEN', "my-token") + self.org = os.getenv('INFLUXDB_V2_ORG', "my-org") + self.writer = None + if os.path.exists("test_MultiprocessingWriter.txt"): + os.remove("test_MultiprocessingWriter.txt") + + def tearDown(self) -> None: + if self.writer: + self.writer.__del__() + + def test_write_without_start(self): + self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org, + write_options=SYNCHRONOUS) + + with self.assertRaises(AssertionError) as ve: + self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5") + + self.assertEqual('Cannot write data: the writer is not started.', f'{ve.exception}') + + def test_write_after_terminate(self): + self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org, + write_options=SYNCHRONOUS) + self.writer.start() + self.writer.__del__() + + with self.assertRaises(AssertionError) as ve: + self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5") + + self.assertEqual('Cannot write data: the writer is closed.', f'{ve.exception}') + + def test_terminate_twice(self): + with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer: + writer.__del__() + writer.terminate() + writer.terminate() + writer.__del__() + + def test_use_context_manager(self): + with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer: + self.assertIsNotNone(writer) + + def test_pass_parameters(self): + unique = get_date_helper().to_nanoseconds(datetime.utcnow() - datetime.utcfromtimestamp(0)) + + # write data + with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer: + print(f"write: {os.getpid()}") + writer.write(bucket="my-bucket", record=f"mem_{unique},tag=a value=5i 10", write_precision=WritePrecision.S) + + # query data + with InfluxDBClient(url=self.url, token=self.token, org=self.org) as client: + query_api = client.query_api() + tables = query_api.query( + f'from(bucket: "my-bucket") |> range(start: 0) |> filter(fn: (r) => r._measurement == "mem_{unique}")', + self.org) + record = tables[0].records[0] + self.assertIsNotNone(record) + self.assertEqual("a", record["tag"]) + self.assertEqual(5, record["_value"]) + self.assertEqual(get_date_helper().to_utc(datetime.utcfromtimestamp(10)), record["_time"]) + + def test_wrong_configuration(self): + + with MultiprocessingWriter(url=self.url, token="ddd", org=self.org, error_callback=_error_callback) as writer: + writer.write(bucket="my-bucket", record=f"mem,tag=a value=5i 10", write_precision=WritePrecision.S) + + self.assertTrue(os.path.exists("test_MultiprocessingWriter.txt")) From 9d3bc45907c56aea83ee3b60afb532ef0147c1c8 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 2 Nov 2021 15:28:36 +0100 Subject: [PATCH 2/3] docs: update CHANGELOG.md --- CHANGELOG.md | 1 + tests/test_MultiprocessingWriter.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b558a9b1..16da2ffa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - `BucketsApi` - add possibility to: `update` - `OrganizationsApi` - add possibility to: `update` - `UsersApi` - add possibility to: `update`, `delete`, `find` +1. [#356](https://github.com/influxdata/influxdb-client-python/pull/356): Add `MultiprocessingWriter` to write data in independent OS process ### Bug Fixes 1. [#359](https://github.com/influxdata/influxdb-client-python/pull/359): Correct serialization empty columns into LineProtocol [DataFrame] diff --git a/tests/test_MultiprocessingWriter.py b/tests/test_MultiprocessingWriter.py index 98f9b44d..bf74df5e 100644 --- a/tests/test_MultiprocessingWriter.py +++ b/tests/test_MultiprocessingWriter.py @@ -66,7 +66,6 @@ def test_pass_parameters(self): # write data with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer: - print(f"write: {os.getpid()}") writer.write(bucket="my-bucket", record=f"mem_{unique},tag=a value=5i 10", write_precision=WritePrecision.S) # query data From b91b292cafdafa5b54f9c109569bbbc8a0a0e1bc Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 3 Nov 2021 10:04:22 +0100 Subject: [PATCH 3/3] fix: tests --- .gitignore | 1 - tests/test_MultiprocessingWriter.py | 16 ---------------- 2 files changed, 17 deletions(-) diff --git a/.gitignore b/.gitignore index 7634342b..3be131de 100644 --- a/.gitignore +++ b/.gitignore @@ -47,7 +47,6 @@ coverage.xml .hypothesis/ .pytest_cache/ influxdb2_test/*.csv -tests/test_MultiprocessingWriter.txt # Translations *.mo diff --git a/tests/test_MultiprocessingWriter.py b/tests/test_MultiprocessingWriter.py index bf74df5e..940ae6ec 100644 --- a/tests/test_MultiprocessingWriter.py +++ b/tests/test_MultiprocessingWriter.py @@ -3,18 +3,11 @@ from datetime import datetime from influxdb_client import WritePrecision, InfluxDBClient -from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.util.date_utils import get_date_helper from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter from influxdb_client.client.write_api import SYNCHRONOUS -# noinspection PyUnusedLocal -def _error_callback(conf: (str, str, str), data: str, error: InfluxDBError): - with open("test_MultiprocessingWriter.txt", "w+") as file: - file.write(error.message) - - # noinspection PyMethodMayBeStatic class MultiprocessingWriterTest(unittest.TestCase): @@ -23,8 +16,6 @@ def setUp(self) -> None: self.token = os.getenv('INFLUXDB_V2_TOKEN', "my-token") self.org = os.getenv('INFLUXDB_V2_ORG', "my-org") self.writer = None - if os.path.exists("test_MultiprocessingWriter.txt"): - os.remove("test_MultiprocessingWriter.txt") def tearDown(self) -> None: if self.writer: @@ -79,10 +70,3 @@ def test_pass_parameters(self): self.assertEqual("a", record["tag"]) self.assertEqual(5, record["_value"]) self.assertEqual(get_date_helper().to_utc(datetime.utcfromtimestamp(10)), record["_time"]) - - def test_wrong_configuration(self): - - with MultiprocessingWriter(url=self.url, token="ddd", org=self.org, error_callback=_error_callback) as writer: - writer.write(bucket="my-bucket", record=f"mem,tag=a value=5i 10", write_precision=WritePrecision.S) - - self.assertTrue(os.path.exists("test_MultiprocessingWriter.txt"))