Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Fix DataFrameClient tag processing #366

Merged
merged 2 commits into from
Sep 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions influxdb/_dataframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pandas as pd

from .client import InfluxDBClient
from .line_protocol import _escape_tag


def _pandas_time_unit(time_precision):
Expand All @@ -26,6 +27,10 @@ def _pandas_time_unit(time_precision):
return unit


def _escape_pandas_series(s):
return s.apply(lambda v: _escape_tag(v))


class DataFrameClient(InfluxDBClient):
"""
The ``DataFrameClient`` object holds information necessary to connect
Expand Down Expand Up @@ -242,6 +247,12 @@ def _convert_dataframe_to_lines(self,
field_columns = list(field_columns) if list(field_columns) else []
tag_columns = list(tag_columns) if list(tag_columns) else []

# Make global_tags as tag_columns
if global_tags:
for tag in global_tags:
dataframe[tag] = global_tags[tag]
tag_columns.append(tag)

# If field columns but no tag columns, assume rest of columns are tags
if field_columns and (not tag_columns):
tag_columns = list(column_series[~column_series.isin(
Expand All @@ -268,6 +279,7 @@ def _convert_dataframe_to_lines(self,
# If tag columns exist, make an array of formatted tag keys and values
if tag_columns:
tag_df = dataframe[tag_columns]
tag_df = tag_df.sort_index(axis=1)
tag_df = self._stringify_dataframe(
tag_df, numeric_precision, datatype='tag')
tags = (',' + (
Expand All @@ -286,15 +298,6 @@ def _convert_dataframe_to_lines(self,
fields = field_df.sum(axis=1)
del field_df

# Add any global tags to formatted tag strings
if global_tags:
global_tags = ','.join(['='.join([tag, global_tags[tag]])
for tag in global_tags])
if tag_columns:
tags = tags + ',' + global_tags
else:
tags = ',' + global_tags

# Generate line protocol string
points = (measurement + tags + ' ' + fields + ' ' + time).tolist()
return points
Expand Down Expand Up @@ -344,6 +347,8 @@ def _stringify_dataframe(self,
# If dealing with fields, format ints and strings correctly
dataframe[int_columns] = dataframe[int_columns] + 'i'
dataframe[string_columns] = '"' + dataframe[string_columns] + '"'
elif datatype == 'tag':
dataframe = dataframe.apply(_escape_pandas_series)

dataframe.columns = dataframe.columns.astype(str)
return dataframe
Expand Down
37 changes: 33 additions & 4 deletions influxdb/tests/dataframe_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ def test_write_points_from_dataframe_with_tag_cols_and_global_tags(self):
columns=["tag_one", "tag_two", "column_one",
"column_two", "column_three"])
expected = (
b"foo,tag_one=blue,tag_two=1,global_tag=value "
b"foo,global_tag=value,tag_one=blue,tag_two=1 "
b"column_one=\"1\",column_two=1i,column_three=1.0 "
b"0\n"
b"foo,tag_one=red,tag_two=0,global_tag=value "
b"foo,global_tag=value,tag_one=red,tag_two=0 "
b"column_one=\"2\",column_two=2i,column_three=2.0 "
b"3600000000000\n"
)
Expand Down Expand Up @@ -155,10 +155,10 @@ def test_write_points_from_dataframe_with_tag_cols_and_defaults(self):
)

expected_fields_no_tags = (
b"foo,tag_one=blue,tag_two=1,tag_three=hot "
b"foo,tag_one=blue,tag_three=hot,tag_two=1 "
b"column_one=\"1\",column_two=1i,column_three=1.0 "
b"0\n"
b"foo,tag_one=red,tag_two=0,tag_three=cold "
b"foo,tag_one=red,tag_three=cold,tag_two=0 "
b"column_one=\"2\",column_two=2i,column_three=2.0 "
b"3600000000000\n"
)
Expand Down Expand Up @@ -198,6 +198,35 @@ def test_write_points_from_dataframe_with_tag_cols_and_defaults(self):
cli.write_points(dataframe, 'foo')
self.assertEqual(m.last_request.body, expected_no_tags_no_fields)

def test_write_points_from_dataframe_with_tag_escaped(self):
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(
data=[['blue', 1, "1", 1, 1.0, 'hot'],
['red,green=orange', 0, "2", 2, 2.0, 'cold']],
index=[now, now + timedelta(hours=1)],
columns=["tag_one", "tag_two", "column_one",
"column_two", "column_three",
"tag_three"])

expected_escaped_tags = (
b"foo,tag_one=blue "
b"column_one=\"1\",column_two=1i "
b"0\n"
b"foo,tag_one=red\\,green\\=orange "
b"column_one=\"2\",column_two=2i "
b"3600000000000\n"
)

with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)
cli = DataFrameClient(database='db')
cli.write_points(dataframe, 'foo',
field_columns=['column_one', 'column_two'],
tag_columns=['tag_one'])
self.assertEqual(m.last_request.body, expected_escaped_tags)

def test_write_points_from_dataframe_with_numeric_column_names(self):
now = pd.Timestamp('1970-01-01 00:00+00:00')
# df with numeric column names
Expand Down