Skip to content

Commit bdc8875

Browse files
authored
Merge pull request influxdata#366 from tzonghao/master
Fix DataFrameClient tag processing
2 parents 1343ae9 + 978b080 commit bdc8875

File tree

2 files changed

+47
-13
lines changed

2 files changed

+47
-13
lines changed

influxdb/_dataframe_client.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import pandas as pd
1313

1414
from .client import InfluxDBClient
15+
from .line_protocol import _escape_tag
1516

1617

1718
def _pandas_time_unit(time_precision):
@@ -26,6 +27,10 @@ def _pandas_time_unit(time_precision):
2627
return unit
2728

2829

30+
def _escape_pandas_series(s):
31+
return s.apply(lambda v: _escape_tag(v))
32+
33+
2934
class DataFrameClient(InfluxDBClient):
3035
"""
3136
The ``DataFrameClient`` object holds information necessary to connect
@@ -242,6 +247,12 @@ def _convert_dataframe_to_lines(self,
242247
field_columns = list(field_columns) if list(field_columns) else []
243248
tag_columns = list(tag_columns) if list(tag_columns) else []
244249

250+
# Make global_tags as tag_columns
251+
if global_tags:
252+
for tag in global_tags:
253+
dataframe[tag] = global_tags[tag]
254+
tag_columns.append(tag)
255+
245256
# If field columns but no tag columns, assume rest of columns are tags
246257
if field_columns and (not tag_columns):
247258
tag_columns = list(column_series[~column_series.isin(
@@ -268,6 +279,7 @@ def _convert_dataframe_to_lines(self,
268279
# If tag columns exist, make an array of formatted tag keys and values
269280
if tag_columns:
270281
tag_df = dataframe[tag_columns]
282+
tag_df = tag_df.sort_index(axis=1)
271283
tag_df = self._stringify_dataframe(
272284
tag_df, numeric_precision, datatype='tag')
273285
tags = (',' + (
@@ -286,15 +298,6 @@ def _convert_dataframe_to_lines(self,
286298
fields = field_df.sum(axis=1)
287299
del field_df
288300

289-
# Add any global tags to formatted tag strings
290-
if global_tags:
291-
global_tags = ','.join(['='.join([tag, global_tags[tag]])
292-
for tag in global_tags])
293-
if tag_columns:
294-
tags = tags + ',' + global_tags
295-
else:
296-
tags = ',' + global_tags
297-
298301
# Generate line protocol string
299302
points = (measurement + tags + ' ' + fields + ' ' + time).tolist()
300303
return points
@@ -344,6 +347,8 @@ def _stringify_dataframe(self,
344347
# If dealing with fields, format ints and strings correctly
345348
dataframe[int_columns] = dataframe[int_columns] + 'i'
346349
dataframe[string_columns] = '"' + dataframe[string_columns] + '"'
350+
elif datatype == 'tag':
351+
dataframe = dataframe.apply(_escape_pandas_series)
347352

348353
dataframe.columns = dataframe.columns.astype(str)
349354
return dataframe

influxdb/tests/dataframe_client_test.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ def test_write_points_from_dataframe_with_tag_cols_and_global_tags(self):
108108
columns=["tag_one", "tag_two", "column_one",
109109
"column_two", "column_three"])
110110
expected = (
111-
b"foo,tag_one=blue,tag_two=1,global_tag=value "
111+
b"foo,global_tag=value,tag_one=blue,tag_two=1 "
112112
b"column_one=\"1\",column_two=1i,column_three=1.0 "
113113
b"0\n"
114-
b"foo,tag_one=red,tag_two=0,global_tag=value "
114+
b"foo,global_tag=value,tag_one=red,tag_two=0 "
115115
b"column_one=\"2\",column_two=2i,column_three=2.0 "
116116
b"3600000000000\n"
117117
)
@@ -155,10 +155,10 @@ def test_write_points_from_dataframe_with_tag_cols_and_defaults(self):
155155
)
156156

157157
expected_fields_no_tags = (
158-
b"foo,tag_one=blue,tag_two=1,tag_three=hot "
158+
b"foo,tag_one=blue,tag_three=hot,tag_two=1 "
159159
b"column_one=\"1\",column_two=1i,column_three=1.0 "
160160
b"0\n"
161-
b"foo,tag_one=red,tag_two=0,tag_three=cold "
161+
b"foo,tag_one=red,tag_three=cold,tag_two=0 "
162162
b"column_one=\"2\",column_two=2i,column_three=2.0 "
163163
b"3600000000000\n"
164164
)
@@ -198,6 +198,35 @@ def test_write_points_from_dataframe_with_tag_cols_and_defaults(self):
198198
cli.write_points(dataframe, 'foo')
199199
self.assertEqual(m.last_request.body, expected_no_tags_no_fields)
200200

201+
def test_write_points_from_dataframe_with_tag_escaped(self):
202+
now = pd.Timestamp('1970-01-01 00:00+00:00')
203+
dataframe = pd.DataFrame(
204+
data=[['blue', 1, "1", 1, 1.0, 'hot'],
205+
['red,green=orange', 0, "2", 2, 2.0, 'cold']],
206+
index=[now, now + timedelta(hours=1)],
207+
columns=["tag_one", "tag_two", "column_one",
208+
"column_two", "column_three",
209+
"tag_three"])
210+
211+
expected_escaped_tags = (
212+
b"foo,tag_one=blue "
213+
b"column_one=\"1\",column_two=1i "
214+
b"0\n"
215+
b"foo,tag_one=red\\,green\\=orange "
216+
b"column_one=\"2\",column_two=2i "
217+
b"3600000000000\n"
218+
)
219+
220+
with requests_mock.Mocker() as m:
221+
m.register_uri(requests_mock.POST,
222+
"http://localhost:8086/write",
223+
status_code=204)
224+
cli = DataFrameClient(database='db')
225+
cli.write_points(dataframe, 'foo',
226+
field_columns=['column_one', 'column_two'],
227+
tag_columns=['tag_one'])
228+
self.assertEqual(m.last_request.body, expected_escaped_tags)
229+
201230
def test_write_points_from_dataframe_with_numeric_column_names(self):
202231
now = pd.Timestamp('1970-01-01 00:00+00:00')
203232
# df with numeric column names

0 commit comments

Comments
 (0)