Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit f7865a7

Browse files
authored
Merge branch 'master' into master
2 parents 275d33c + 35732cd commit f7865a7

File tree

5 files changed

+136
-11
lines changed

5 files changed

+136
-11
lines changed

influxdb/_dataframe_client.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from collections import defaultdict
1111

1212
import pandas as pd
13+
import numpy as np
1314

1415
from .client import InfluxDBClient
1516
from .line_protocol import _escape_tag
@@ -174,6 +175,7 @@ def query(self,
174175
expected_response_code=expected_response_code,
175176
raise_errors=raise_errors,
176177
chunked=chunked,
178+
database=database,
177179
chunk_size=chunk_size)
178180
results = super(DataFrameClient, self).query(query, **query_args)
179181
if query.strip().upper().startswith("SELECT"):
@@ -257,7 +259,7 @@ def _convert_dataframe_to_json(dataframe,
257259
{'measurement': measurement,
258260
'tags': dict(list(tag.items()) + list(tags.items())),
259261
'fields': rec,
260-
'time': int(ts.value / precision_factor)}
262+
'time': np.int64(ts.value / precision_factor)}
261263
for ts, tag, rec in zip(dataframe.index,
262264
dataframe[tag_columns].to_dict('record'),
263265
dataframe[field_columns].to_dict('record'))
@@ -274,6 +276,10 @@ def _convert_dataframe_to_lines(self,
274276
time_precision=None,
275277
numeric_precision=None):
276278

279+
dataframe = dataframe.dropna(how='all').copy()
280+
if len(dataframe) == 0:
281+
return []
282+
277283
if not isinstance(dataframe, pd.DataFrame):
278284
raise TypeError('Must be DataFrame, but type was: {0}.'
279285
.format(type(dataframe)))
@@ -319,11 +325,11 @@ def _convert_dataframe_to_lines(self,
319325

320326
# Make array of timestamp ints
321327
if isinstance(dataframe.index, pd.PeriodIndex):
322-
time = ((dataframe.index.to_timestamp().values.astype(int) /
323-
precision_factor).astype(int).astype(str))
328+
time = ((dataframe.index.to_timestamp().values.astype(np.int64) /
329+
precision_factor).astype(np.int64).astype(str))
324330
else:
325-
time = ((pd.to_datetime(dataframe.index).values.astype(int) /
326-
precision_factor).astype(int).astype(str))
331+
time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) /
332+
precision_factor).astype(np.int64).astype(str))
327333

328334
# If tag columns exist, make an array of formatted tag keys and values
329335
if tag_columns:
@@ -357,20 +363,32 @@ def _convert_dataframe_to_lines(self,
357363

358364
# Make an array of formatted field keys and values
359365
field_df = dataframe[field_columns]
366+
360367
field_df = self._stringify_dataframe(field_df,
361368
numeric_precision,
362369
datatype='field')
363-
field_df = (field_df.columns.values + '=').tolist() + field_df
364-
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
365-
fields = field_df.sum(axis=1)
370+
371+
def format_line(line):
372+
line = line[~line.isnull()] # drop None entries
373+
return ",".join((line.index + '=' + line.values))
374+
375+
fields = field_df.apply(format_line, axis=1)
366376
del field_df
367377

368378
# Generate line protocol string
379+
measurement = _escape_tag(measurement)
369380
points = (measurement + tags + ' ' + fields + ' ' + time).tolist()
370381
return points
371382

372383
@staticmethod
373384
def _stringify_dataframe(dframe, numeric_precision, datatype='field'):
385+
386+
# Prevent modification of input dataframe
387+
dframe = dframe.copy()
388+
389+
# Keep the positions where Null values are found
390+
mask_null = dframe.isnull().values
391+
374392
# Find int and string columns for field-type data
375393
int_columns = dframe.select_dtypes(include=['integer']).columns
376394
string_columns = dframe.select_dtypes(include=['object']).columns
@@ -414,6 +432,8 @@ def _stringify_dataframe(dframe, numeric_precision, datatype='field'):
414432
dframe = dframe.apply(_escape_pandas_series)
415433

416434
dframe.columns = dframe.columns.astype(str)
435+
436+
dframe = dframe.where(~mask_null, None)
417437
return dframe
418438

419439
def _datetime_to_epoch(self, datetime, time_precision='s'):

influxdb/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ def send_packet(self, packet, protocol='json'):
869869
870870
:param packet: the packet to be sent
871871
:type packet: (if protocol is 'json') dict
872-
(if protocol is 'line') sequence of line protocol strings
872+
(if protocol is 'line') list of line protocol strings
873873
:param protocol: protocol of input data, either 'json' or 'line'
874874
:type protocol: str
875875
"""

influxdb/line_protocol.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ def _escape_tag(tag):
5757
)
5858

5959

60+
def _escape_tag_value(value):
61+
ret = _escape_tag(value)
62+
if ret.endswith('\\'):
63+
ret += ' '
64+
return ret
65+
66+
6067
def quote_ident(value):
6168
"""Indent the quotes."""
6269
return "\"{}\"".format(value
@@ -135,7 +142,7 @@ def make_lines(data, precision=None):
135142
# tags should be sorted client-side to take load off server
136143
for tag_key, tag_value in sorted(iteritems(tags)):
137144
key = _escape_tag(tag_key)
138-
value = _escape_tag(tag_value)
145+
value = _escape_tag_value(tag_value)
139146

140147
if key != '' and value != '':
141148
key_values.append(key + "=" + value)

influxdb/tests/dataframe_client_test.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,103 @@ def test_write_points_from_dataframe(self):
5959
cli.write_points(dataframe, 'foo', tags=None)
6060
self.assertEqual(m.last_request.body, expected)
6161

62+
def test_dataframe_write_points_with_whitespace_measurement(self):
63+
"""write_points should escape white space in measurements."""
64+
now = pd.Timestamp('1970-01-01 00:00+00:00')
65+
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
66+
index=[now, now + timedelta(hours=1)],
67+
columns=["column_one", "column_two",
68+
"column_three"])
69+
expected = (
70+
b"meas\\ with\\ space "
71+
b"column_one=\"1\",column_two=1i,column_three=1.0 0\n"
72+
b"meas\\ with\\ space "
73+
b"column_one=\"2\",column_two=2i,column_three=2.0 "
74+
b"3600000000000\n"
75+
)
76+
with requests_mock.Mocker() as m:
77+
m.register_uri(requests_mock.POST,
78+
"http://localhost:8086/write",
79+
status_code=204)
80+
cli = DataFrameClient(database='db')
81+
cli.write_points(dataframe, 'meas with space')
82+
self.assertEqual(m.last_request.body, expected)
83+
84+
def test_write_points_from_dataframe_with_none(self):
85+
"""Test write points from df in TestDataFrameClient object."""
86+
now = pd.Timestamp('1970-01-01 00:00+00:00')
87+
dataframe = pd.DataFrame(data=[["1", None, 1.0], ["2", 2.0, 2.0]],
88+
index=[now, now + timedelta(hours=1)],
89+
columns=["column_one", "column_two",
90+
"column_three"])
91+
expected = (
92+
b"foo column_one=\"1\",column_three=1.0 0\n"
93+
b"foo column_one=\"2\",column_two=2.0,column_three=2.0 "
94+
b"3600000000000\n"
95+
)
96+
97+
with requests_mock.Mocker() as m:
98+
m.register_uri(requests_mock.POST,
99+
"http://localhost:8086/write",
100+
status_code=204)
101+
102+
cli = DataFrameClient(database='db')
103+
104+
cli.write_points(dataframe, 'foo')
105+
self.assertEqual(m.last_request.body, expected)
106+
107+
cli.write_points(dataframe, 'foo', tags=None)
108+
self.assertEqual(m.last_request.body, expected)
109+
110+
def test_write_points_from_dataframe_with_line_of_none(self):
111+
"""Test write points from df in TestDataFrameClient object."""
112+
now = pd.Timestamp('1970-01-01 00:00+00:00')
113+
dataframe = pd.DataFrame(data=[[None, None, None], ["2", 2.0, 2.0]],
114+
index=[now, now + timedelta(hours=1)],
115+
columns=["column_one", "column_two",
116+
"column_three"])
117+
expected = (
118+
b"foo column_one=\"2\",column_two=2.0,column_three=2.0 "
119+
b"3600000000000\n"
120+
)
121+
122+
with requests_mock.Mocker() as m:
123+
m.register_uri(requests_mock.POST,
124+
"http://localhost:8086/write",
125+
status_code=204)
126+
127+
cli = DataFrameClient(database='db')
128+
129+
cli.write_points(dataframe, 'foo')
130+
self.assertEqual(m.last_request.body, expected)
131+
132+
cli.write_points(dataframe, 'foo', tags=None)
133+
self.assertEqual(m.last_request.body, expected)
134+
135+
def test_write_points_from_dataframe_with_all_none(self):
136+
"""Test write points from df in TestDataFrameClient object."""
137+
now = pd.Timestamp('1970-01-01 00:00+00:00')
138+
dataframe = pd.DataFrame(data=[[None, None, None], [None, None, None]],
139+
index=[now, now + timedelta(hours=1)],
140+
columns=["column_one", "column_two",
141+
"column_three"])
142+
expected = (
143+
b"\n"
144+
)
145+
146+
with requests_mock.Mocker() as m:
147+
m.register_uri(requests_mock.POST,
148+
"http://localhost:8086/write",
149+
status_code=204)
150+
151+
cli = DataFrameClient(database='db')
152+
153+
cli.write_points(dataframe, 'foo')
154+
self.assertEqual(m.last_request.body, expected)
155+
156+
cli.write_points(dataframe, 'foo', tags=None)
157+
self.assertEqual(m.last_request.body, expected)
158+
62159
def test_write_points_from_dataframe_in_batches(self):
63160
"""Test write points in batch from df in TestDataFrameClient object."""
64161
now = pd.Timestamp('1970-01-01 00:00+00:00')

influxdb/tests/test_line_protocol.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def test_make_lines(self):
2222
"tags": {
2323
"empty_tag": "",
2424
"none_tag": None,
25+
"backslash_tag": "C:\\",
2526
"integer_tag": 2,
2627
"string_tag": "hello"
2728
},
@@ -41,7 +42,7 @@ def test_make_lines(self):
4142

4243
self.assertEqual(
4344
line_protocol.make_lines(data),
44-
'test,integer_tag=2,string_tag=hello '
45+
'test,backslash_tag=C:\\\\ ,integer_tag=2,string_tag=hello '
4546
'bool_val=True,float_val=1.1,int_val=1i,string_val="hello!"\n'
4647
)
4748

0 commit comments

Comments
 (0)