Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit bf232a7

Browse files
patrickhoebekexginn8
authored andcommitted
Fix for DataFrameClient issue - seems does not process correctly DateTimeIndex dates (issue #479) (#495)
* [FIX] : compatibility with new version of pandas pd.tseries.period.PeriodIndex has been moved to pd.PeriodIndex since at least pandas 0.18.1 pd.tseries.period.DatetimeIndex has been moved to pd.DatetimeIndex since at least pandas 0.18.1 * [FIX] : Fixes #479 : DateTimeIndex not correctly converted to Unix Epoch (e.g .on (some?) Windows machines) * [FIX] : new fix for #479 : DateTimeIndex not correctly converted to Unix Epoch (e.g .on (some?) Windows machines) * [ENH] : added feature : DataFrame.write_points : NaNs and None values allowed in input DataFrame (corresponding entries are removed from the list of points to push to Influx) * [FIX] : error in unittest dataframe_client test_write_points_from_dataframe_with_all_none
1 parent dda70e5 commit bf232a7

File tree

2 files changed

+101
-8
lines changed

2 files changed

+101
-8
lines changed

influxdb/_dataframe_client.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from collections import defaultdict
1111

1212
import pandas as pd
13+
import numpy as np
1314

1415
from .client import InfluxDBClient
1516
from .line_protocol import _escape_tag
@@ -257,7 +258,7 @@ def _convert_dataframe_to_json(dataframe,
257258
{'measurement': measurement,
258259
'tags': dict(list(tag.items()) + list(tags.items())),
259260
'fields': rec,
260-
'time': int(ts.value / precision_factor)}
261+
'time': np.int64(ts.value / precision_factor)}
261262
for ts, tag, rec in zip(dataframe.index,
262263
dataframe[tag_columns].to_dict('record'),
263264
dataframe[field_columns].to_dict('record'))
@@ -274,6 +275,10 @@ def _convert_dataframe_to_lines(self,
274275
time_precision=None,
275276
numeric_precision=None):
276277

278+
dataframe = dataframe.dropna(how='all').copy()
279+
if len(dataframe) == 0:
280+
return []
281+
277282
if not isinstance(dataframe, pd.DataFrame):
278283
raise TypeError('Must be DataFrame, but type was: {0}.'
279284
.format(type(dataframe)))
@@ -319,11 +324,11 @@ def _convert_dataframe_to_lines(self,
319324

320325
# Make array of timestamp ints
321326
if isinstance(dataframe.index, pd.PeriodIndex):
322-
time = ((dataframe.index.to_timestamp().values.astype(int) /
323-
precision_factor).astype(int).astype(str))
327+
time = ((dataframe.index.to_timestamp().values.astype(np.int64) /
328+
precision_factor).astype(np.int64).astype(str))
324329
else:
325-
time = ((pd.to_datetime(dataframe.index).values.astype(int) /
326-
precision_factor).astype(int).astype(str))
330+
time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) /
331+
precision_factor).astype(np.int64).astype(str))
327332

328333
# If tag columns exist, make an array of formatted tag keys and values
329334
if tag_columns:
@@ -357,12 +362,16 @@ def _convert_dataframe_to_lines(self,
357362

358363
# Make an array of formatted field keys and values
359364
field_df = dataframe[field_columns]
365+
360366
field_df = self._stringify_dataframe(field_df,
361367
numeric_precision,
362368
datatype='field')
363-
field_df = (field_df.columns.values + '=').tolist() + field_df
364-
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
365-
fields = field_df.sum(axis=1)
369+
370+
def format_line(line):
371+
line = line[~line.isnull()] # drop None entries
372+
return ",".join((line.index + '=' + line.values))
373+
374+
fields = field_df.apply(format_line, axis=1)
366375
del field_df
367376

368377
# Generate line protocol string
@@ -371,6 +380,13 @@ def _convert_dataframe_to_lines(self,
371380

372381
@staticmethod
373382
def _stringify_dataframe(dframe, numeric_precision, datatype='field'):
383+
384+
# Prevent modification of input dataframe
385+
dframe = dframe.copy()
386+
387+
# Keep the positions where Null values are found
388+
mask_null = dframe.isnull().values
389+
374390
# Find int and string columns for field-type data
375391
int_columns = dframe.select_dtypes(include=['integer']).columns
376392
string_columns = dframe.select_dtypes(include=['object']).columns
@@ -414,6 +430,8 @@ def _stringify_dataframe(dframe, numeric_precision, datatype='field'):
414430
dframe = dframe.apply(_escape_pandas_series)
415431

416432
dframe.columns = dframe.columns.astype(str)
433+
434+
dframe = dframe.where(~mask_null, None)
417435
return dframe
418436

419437
def _datetime_to_epoch(self, datetime, time_precision='s'):

influxdb/tests/dataframe_client_test.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,81 @@ def test_write_points_from_dataframe(self):
5959
cli.write_points(dataframe, 'foo', tags=None)
6060
self.assertEqual(m.last_request.body, expected)
6161

62+
def test_write_points_from_dataframe_with_none(self):
63+
"""Test write points from df in TestDataFrameClient object."""
64+
now = pd.Timestamp('1970-01-01 00:00+00:00')
65+
dataframe = pd.DataFrame(data=[["1", None, 1.0], ["2", 2.0, 2.0]],
66+
index=[now, now + timedelta(hours=1)],
67+
columns=["column_one", "column_two",
68+
"column_three"])
69+
expected = (
70+
b"foo column_one=\"1\",column_three=1.0 0\n"
71+
b"foo column_one=\"2\",column_two=2.0,column_three=2.0 "
72+
b"3600000000000\n"
73+
)
74+
75+
with requests_mock.Mocker() as m:
76+
m.register_uri(requests_mock.POST,
77+
"http://localhost:8086/write",
78+
status_code=204)
79+
80+
cli = DataFrameClient(database='db')
81+
82+
cli.write_points(dataframe, 'foo')
83+
self.assertEqual(m.last_request.body, expected)
84+
85+
cli.write_points(dataframe, 'foo', tags=None)
86+
self.assertEqual(m.last_request.body, expected)
87+
88+
def test_write_points_from_dataframe_with_line_of_none(self):
89+
"""Test write points from df in TestDataFrameClient object."""
90+
now = pd.Timestamp('1970-01-01 00:00+00:00')
91+
dataframe = pd.DataFrame(data=[[None, None, None], ["2", 2.0, 2.0]],
92+
index=[now, now + timedelta(hours=1)],
93+
columns=["column_one", "column_two",
94+
"column_three"])
95+
expected = (
96+
b"foo column_one=\"2\",column_two=2.0,column_three=2.0 "
97+
b"3600000000000\n"
98+
)
99+
100+
with requests_mock.Mocker() as m:
101+
m.register_uri(requests_mock.POST,
102+
"http://localhost:8086/write",
103+
status_code=204)
104+
105+
cli = DataFrameClient(database='db')
106+
107+
cli.write_points(dataframe, 'foo')
108+
self.assertEqual(m.last_request.body, expected)
109+
110+
cli.write_points(dataframe, 'foo', tags=None)
111+
self.assertEqual(m.last_request.body, expected)
112+
113+
def test_write_points_from_dataframe_with_all_none(self):
114+
"""Test write points from df in TestDataFrameClient object."""
115+
now = pd.Timestamp('1970-01-01 00:00+00:00')
116+
dataframe = pd.DataFrame(data=[[None, None, None], [None, None, None]],
117+
index=[now, now + timedelta(hours=1)],
118+
columns=["column_one", "column_two",
119+
"column_three"])
120+
expected = (
121+
b"\n"
122+
)
123+
124+
with requests_mock.Mocker() as m:
125+
m.register_uri(requests_mock.POST,
126+
"http://localhost:8086/write",
127+
status_code=204)
128+
129+
cli = DataFrameClient(database='db')
130+
131+
cli.write_points(dataframe, 'foo')
132+
self.assertEqual(m.last_request.body, expected)
133+
134+
cli.write_points(dataframe, 'foo', tags=None)
135+
self.assertEqual(m.last_request.body, expected)
136+
62137
def test_write_points_from_dataframe_in_batches(self):
63138
"""Test write points in batch from df in TestDataFrameClient object."""
64139
now = pd.Timestamp('1970-01-01 00:00+00:00')

0 commit comments

Comments
 (0)