diff --git a/influxdb/influxdb08/dataframe_client.py b/influxdb/influxdb08/dataframe_client.py index f73fc269..197422a6 100644 --- a/influxdb/influxdb08/dataframe_client.py +++ b/influxdb/influxdb08/dataframe_client.py @@ -15,18 +15,17 @@ class DataFrameClient(InfluxDBClient): The client reads and writes from pandas DataFrames. """ - def __init__(self, *args, **kwargs): + def __init__(self, ignore_nan=True, *args, **kwargs): super(DataFrameClient, self).__init__(*args, **kwargs) + try: global pd import pandas as pd except ImportError as ex: - raise ImportError( - 'DataFrameClient requires Pandas, "{ex}" problem importing' - .format(ex=str(ex)) - ) - + raise ImportError('DataFrameClient requires Pandas, ' + '"{ex}" problem importing'.format(ex=str(ex))) self.EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00') + self.ignore_nan = ignore_nan def write_points(self, data, *args, **kwargs): """ @@ -135,9 +134,24 @@ def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'): for dt in dataframe.index] data = {'name': name, 'columns': [str(column) for column in dataframe.columns], - 'points': list([list(x) for x in dataframe.values])} + 'points': [self._convert_array(x) for x in dataframe.values]} return data + def _convert_array(self, array): + try: + global np + import numpy as np + except ImportError as ex: + raise ImportError('DataFrameClient requires Numpy, ' + '"{ex}" problem importing'.format(ex=str(ex))) + if self.ignore_nan: + number_types = (int, float, np.number) + condition = (all(isinstance(el, number_types) for el in array) and + np.isnan(array)) + return list(np.where(condition, None, array)) + else: + return list(array) + def _datetime_to_epoch(self, datetime, time_precision='s'): seconds = (datetime - self.EPOCH).total_seconds() if time_precision == 's': diff --git a/tests/influxdb/influxdb08/dataframe_client_test.py b/tests/influxdb/influxdb08/dataframe_client_test.py index ebea3616..dd8955db 100644 --- a/tests/influxdb/influxdb08/dataframe_client_test.py +++ b/tests/influxdb/influxdb08/dataframe_client_test.py @@ -52,6 +52,32 @@ def test_write_points_from_dataframe(self): self.assertListEqual(json.loads(m.last_request.body), points) + def test_write_points_from_dataframe_with_float_nan(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[[1, float("NaN"), 1.0], [2, 2, 2.0]], + index=[now, now + timedelta(hours=1)], + columns=["column_one", "column_two", + "column_three"]) + points = [ + { + "points": [ + [1, None, 1.0, 0], + [2, 2, 2.0, 3600] + ], + "name": "foo", + "columns": ["column_one", "column_two", "column_three", "time"] + } + ] + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/db/db/series") + + cli = DataFrameClient(database='db') + cli.write_points({"foo": dataframe}) + + self.assertListEqual(json.loads(m.last_request.body), points) + def test_write_points_from_dataframe_in_batches(self): now = pd.Timestamp('1970-01-01 00:00+00:00') dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],