Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Fix for DataFrameClient issue - seems does not process correctly DateTimeIndex dates (issue #479) #495

Merged
merged 11 commits into from
Nov 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions influxdb/_dataframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from collections import defaultdict

import pandas as pd
import numpy as np

from .client import InfluxDBClient
from .line_protocol import _escape_tag
Expand Down Expand Up @@ -257,7 +258,7 @@ def _convert_dataframe_to_json(dataframe,
{'measurement': measurement,
'tags': dict(list(tag.items()) + list(tags.items())),
'fields': rec,
'time': int(ts.value / precision_factor)}
'time': np.int64(ts.value / precision_factor)}
for ts, tag, rec in zip(dataframe.index,
dataframe[tag_columns].to_dict('record'),
dataframe[field_columns].to_dict('record'))
Expand All @@ -274,6 +275,10 @@ def _convert_dataframe_to_lines(self,
time_precision=None,
numeric_precision=None):

dataframe = dataframe.dropna(how='all').copy()
if len(dataframe) == 0:
return []

if not isinstance(dataframe, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(dataframe)))
Expand Down Expand Up @@ -319,11 +324,11 @@ def _convert_dataframe_to_lines(self,

# Make array of timestamp ints
if isinstance(dataframe.index, pd.PeriodIndex):
time = ((dataframe.index.to_timestamp().values.astype(int) /
precision_factor).astype(int).astype(str))
time = ((dataframe.index.to_timestamp().values.astype(np.int64) /
precision_factor).astype(np.int64).astype(str))
else:
time = ((pd.to_datetime(dataframe.index).values.astype(int) /
precision_factor).astype(int).astype(str))
time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) /
precision_factor).astype(np.int64).astype(str))

# If tag columns exist, make an array of formatted tag keys and values
if tag_columns:
Expand Down Expand Up @@ -357,12 +362,16 @@ def _convert_dataframe_to_lines(self,

# Make an array of formatted field keys and values
field_df = dataframe[field_columns]

field_df = self._stringify_dataframe(field_df,
numeric_precision,
datatype='field')
field_df = (field_df.columns.values + '=').tolist() + field_df
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
fields = field_df.sum(axis=1)

def format_line(line):
line = line[~line.isnull()] # drop None entries
return ",".join((line.index + '=' + line.values))

fields = field_df.apply(format_line, axis=1)
del field_df

# Generate line protocol string
Expand All @@ -371,6 +380,13 @@ def _convert_dataframe_to_lines(self,

@staticmethod
def _stringify_dataframe(dframe, numeric_precision, datatype='field'):

# Prevent modification of input dataframe
dframe = dframe.copy()

# Keep the positions where Null values are found
mask_null = dframe.isnull().values

# Find int and string columns for field-type data
int_columns = dframe.select_dtypes(include=['integer']).columns
string_columns = dframe.select_dtypes(include=['object']).columns
Expand Down Expand Up @@ -414,6 +430,8 @@ def _stringify_dataframe(dframe, numeric_precision, datatype='field'):
dframe = dframe.apply(_escape_pandas_series)

dframe.columns = dframe.columns.astype(str)

dframe = dframe.where(~mask_null, None)
return dframe

def _datetime_to_epoch(self, datetime, time_precision='s'):
Expand Down
75 changes: 75 additions & 0 deletions influxdb/tests/dataframe_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,81 @@ def test_write_points_from_dataframe(self):
cli.write_points(dataframe, 'foo', tags=None)
self.assertEqual(m.last_request.body, expected)

def test_write_points_from_dataframe_with_none(self):
"""Test write points from df in TestDataFrameClient object."""
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[["1", None, 1.0], ["2", 2.0, 2.0]],
index=[now, now + timedelta(hours=1)],
columns=["column_one", "column_two",
"column_three"])
expected = (
b"foo column_one=\"1\",column_three=1.0 0\n"
b"foo column_one=\"2\",column_two=2.0,column_three=2.0 "
b"3600000000000\n"
)

with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)

cli = DataFrameClient(database='db')

cli.write_points(dataframe, 'foo')
self.assertEqual(m.last_request.body, expected)

cli.write_points(dataframe, 'foo', tags=None)
self.assertEqual(m.last_request.body, expected)

def test_write_points_from_dataframe_with_line_of_none(self):
"""Test write points from df in TestDataFrameClient object."""
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[[None, None, None], ["2", 2.0, 2.0]],
index=[now, now + timedelta(hours=1)],
columns=["column_one", "column_two",
"column_three"])
expected = (
b"foo column_one=\"2\",column_two=2.0,column_three=2.0 "
b"3600000000000\n"
)

with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)

cli = DataFrameClient(database='db')

cli.write_points(dataframe, 'foo')
self.assertEqual(m.last_request.body, expected)

cli.write_points(dataframe, 'foo', tags=None)
self.assertEqual(m.last_request.body, expected)

def test_write_points_from_dataframe_with_all_none(self):
"""Test write points from df in TestDataFrameClient object."""
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[[None, None, None], [None, None, None]],
index=[now, now + timedelta(hours=1)],
columns=["column_one", "column_two",
"column_three"])
expected = (
b"\n"
)

with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)

cli = DataFrameClient(database='db')

cli.write_points(dataframe, 'foo')
self.assertEqual(m.last_request.body, expected)

cli.write_points(dataframe, 'foo', tags=None)
self.assertEqual(m.last_request.body, expected)

def test_write_points_from_dataframe_in_batches(self):
"""Test write points in batch from df in TestDataFrameClient object."""
now = pd.Timestamp('1970-01-01 00:00+00:00')
Expand Down