Skip to content

Commit 1343ae9

Browse files
mdbartosaviau
authored andcommitted
Dataframe client support for (i) tag columns and (ii) direct conversion to line protocol (influxdata#364)
* Addressed issues 362 and 363 * Added unit tests for tag columns. All tests working. * Added more comments and docstrings * Rolled back changes to retention policy duration. * Added comments to _dataframe_client. Re-pushing to try and fix travis build. * Try rebuilding without cache * Minor changes to _stringify_dataframe. Added test for numeric precision. * Incorporated fixes from @tzonghao. Fixed docstrings.
1 parent 7debaca commit 1343ae9

File tree

4 files changed

+457
-56
lines changed

4 files changed

+457
-56
lines changed

.travis.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ notifications:
3333
sudo: false
3434

3535
# Travis caching
36-
cache:
37-
directories:
38-
- $HOME/.cache/pip
39-
before_cache:
40-
- rm -f $HOME/.cache/pip/log/debug.log
36+
cache: false
37+
# directories:
38+
# - $HOME/.cache/pip
39+
#before_cache:
40+
# - rm -f $HOME/.cache/pip/log/debug.log

influxdb/_dataframe_client.py

Lines changed: 220 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,18 @@ class DataFrameClient(InfluxDBClient):
3535

3636
EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
3737

38-
def write_points(self, dataframe, measurement, tags=None,
39-
time_precision=None, database=None, retention_policy=None,
40-
batch_size=None):
38+
def write_points(self,
39+
dataframe,
40+
measurement,
41+
tags=None,
42+
tag_columns=[],
43+
field_columns=[],
44+
time_precision=None,
45+
database=None,
46+
retention_policy=None,
47+
batch_size=None,
48+
protocol='line',
49+
numeric_precision=None):
4150
"""
4251
Write to multiple time series names.
4352
@@ -50,27 +59,67 @@ def write_points(self, dataframe, measurement, tags=None,
5059
instead of all at one time. Useful for when doing data dumps from
5160
one database to another or when doing a massive write operation
5261
:type batch_size: int
62+
:param protocol: Protocol for writing data. Either 'line' or 'json'.
63+
:param numeric_precision: Precision for floating point values.
64+
Either None, 'full' or some int, where int is the desired decimal
65+
precision. 'full' preserves full precision for int and float
66+
datatypes. Defaults to None, which preserves 14-15 significant
67+
figures for float and all significant figures for int datatypes.
5368
5469
"""
5570
if batch_size:
56-
number_batches = int(math.ceil(
57-
len(dataframe) / float(batch_size)))
71+
number_batches = int(math.ceil(len(dataframe) / float(batch_size)))
5872
for batch in range(number_batches):
5973
start_index = batch * batch_size
6074
end_index = (batch + 1) * batch_size
61-
points = self._convert_dataframe_to_json(
62-
dataframe.ix[start_index:end_index].copy(),
63-
measurement, tags, time_precision
64-
)
75+
if protocol == 'line':
76+
points = self._convert_dataframe_to_lines(
77+
dataframe.ix[start_index:end_index].copy(),
78+
measurement=measurement,
79+
global_tags=tags,
80+
time_precision=time_precision,
81+
tag_columns=tag_columns,
82+
field_columns=field_columns,
83+
numeric_precision=numeric_precision)
84+
else:
85+
points = self._convert_dataframe_to_json(
86+
dataframe.ix[start_index:end_index].copy(),
87+
measurement=measurement,
88+
tags=tags,
89+
time_precision=time_precision,
90+
tag_columns=tag_columns,
91+
field_columns=field_columns)
6592
super(DataFrameClient, self).write_points(
66-
points, time_precision, database, retention_policy)
93+
points,
94+
time_precision,
95+
database,
96+
retention_policy,
97+
protocol=protocol)
6798
return True
6899
else:
69-
points = self._convert_dataframe_to_json(
70-
dataframe, measurement, tags, time_precision
71-
)
100+
if protocol == 'line':
101+
points = self._convert_dataframe_to_lines(
102+
dataframe,
103+
measurement=measurement,
104+
global_tags=tags,
105+
tag_columns=tag_columns,
106+
field_columns=field_columns,
107+
time_precision=time_precision,
108+
numeric_precision=numeric_precision)
109+
else:
110+
points = self._convert_dataframe_to_json(
111+
dataframe,
112+
measurement=measurement,
113+
tags=tags,
114+
time_precision=time_precision,
115+
tag_columns=tag_columns,
116+
field_columns=field_columns)
72117
super(DataFrameClient, self).write_points(
73-
points, time_precision, database, retention_policy)
118+
points,
119+
time_precision,
120+
database,
121+
retention_policy,
122+
protocol=protocol)
74123
return True
75124

76125
def query(self, query, chunked=False, database=None):
@@ -108,7 +157,12 @@ def _to_dataframe(self, rs):
108157
result[key] = df
109158
return result
110159

111-
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
160+
def _convert_dataframe_to_json(self,
161+
dataframe,
162+
measurement,
163+
tags=None,
164+
tag_columns=[],
165+
field_columns=[],
112166
time_precision=None):
113167

114168
if not isinstance(dataframe, pd.DataFrame):
@@ -119,6 +173,15 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
119173
raise TypeError('Must be DataFrame with DatetimeIndex or \
120174
PeriodIndex.')
121175

176+
# Make sure tags and tag columns are correctly typed
177+
tag_columns = tag_columns if tag_columns else []
178+
field_columns = field_columns if field_columns else []
179+
tags = tags if tags else {}
180+
# Assume field columns are all columns not included in tag columns
181+
if not field_columns:
182+
field_columns = list(
183+
set(dataframe.columns).difference(set(tag_columns)))
184+
122185
dataframe.index = dataframe.index.to_datetime()
123186
if dataframe.index.tzinfo is None:
124187
dataframe.index = dataframe.index.tz_localize('UTC')
@@ -140,13 +203,151 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
140203

141204
points = [
142205
{'measurement': measurement,
143-
'tags': tags if tags else {},
206+
'tags': dict(list(tag.items()) + list(tags.items())),
144207
'fields': rec,
145-
'time': int(ts.value / precision_factor)
146-
}
147-
for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))]
208+
'time': int(ts.value / precision_factor)}
209+
for ts, tag, rec in zip(dataframe.index,
210+
dataframe[tag_columns].to_dict('record'),
211+
dataframe[field_columns].to_dict('record'))
212+
]
213+
214+
return points
215+
216+
def _convert_dataframe_to_lines(self,
217+
dataframe,
218+
measurement,
219+
field_columns=[],
220+
tag_columns=[],
221+
global_tags={},
222+
time_precision=None,
223+
numeric_precision=None):
224+
225+
if not isinstance(dataframe, pd.DataFrame):
226+
raise TypeError('Must be DataFrame, but type was: {0}.'
227+
.format(type(dataframe)))
228+
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
229+
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
230+
raise TypeError('Must be DataFrame with DatetimeIndex or \
231+
PeriodIndex.')
232+
233+
# Create a Series of columns for easier indexing
234+
column_series = pd.Series(dataframe.columns)
235+
236+
if field_columns is None:
237+
field_columns = []
238+
if tag_columns is None:
239+
tag_columns = []
240+
241+
# Make sure field_columns and tag_columns are lists
242+
field_columns = list(field_columns) if list(field_columns) else []
243+
tag_columns = list(tag_columns) if list(tag_columns) else []
244+
245+
# If field columns but no tag columns, assume rest of columns are tags
246+
if field_columns and (not tag_columns):
247+
tag_columns = list(column_series[~column_series.isin(
248+
field_columns)])
249+
250+
# If no field columns, assume non-tag columns are fields
251+
if not field_columns:
252+
field_columns = list(column_series[~column_series.isin(
253+
tag_columns)])
254+
255+
precision_factor = {
256+
"n": 1,
257+
"u": 1e3,
258+
"ms": 1e6,
259+
"s": 1e9,
260+
"m": 1e9 * 60,
261+
"h": 1e9 * 3600,
262+
}.get(time_precision, 1)
263+
264+
# Make array of timestamp ints
265+
time = ((dataframe.index.to_datetime().values.astype(int) /
266+
precision_factor).astype(int).astype(str))
267+
268+
# If tag columns exist, make an array of formatted tag keys and values
269+
if tag_columns:
270+
tag_df = dataframe[tag_columns]
271+
tag_df = self._stringify_dataframe(
272+
tag_df, numeric_precision, datatype='tag')
273+
tags = (',' + (
274+
(tag_df.columns.values + '=').tolist() + tag_df)).sum(axis=1)
275+
del tag_df
276+
277+
else:
278+
tags = ''
279+
280+
# Make an array of formatted field keys and values
281+
field_df = dataframe[field_columns]
282+
field_df = self._stringify_dataframe(
283+
field_df, numeric_precision, datatype='field')
284+
field_df = (field_df.columns.values + '=').tolist() + field_df
285+
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
286+
fields = field_df.sum(axis=1)
287+
del field_df
288+
289+
# Add any global tags to formatted tag strings
290+
if global_tags:
291+
global_tags = ','.join(['='.join([tag, global_tags[tag]])
292+
for tag in global_tags])
293+
if tag_columns:
294+
tags = tags + ',' + global_tags
295+
else:
296+
tags = ',' + global_tags
297+
298+
# Generate line protocol string
299+
points = (measurement + tags + ' ' + fields + ' ' + time).tolist()
148300
return points
149301

302+
def _stringify_dataframe(self,
303+
dataframe,
304+
numeric_precision,
305+
datatype='field'):
306+
307+
# Find int and string columns for field-type data
308+
int_columns = dataframe.select_dtypes(include=['integer']).columns
309+
string_columns = dataframe.select_dtypes(include=['object']).columns
310+
311+
# Convert dataframe to string
312+
if numeric_precision is None:
313+
# If no precision specified, convert directly to string (fast)
314+
dataframe = dataframe.astype(str)
315+
elif numeric_precision == 'full':
316+
# If full precision, use repr to get full float precision
317+
float_columns = (dataframe.select_dtypes(include=['floating'])
318+
.columns)
319+
nonfloat_columns = dataframe.columns[~dataframe.columns.isin(
320+
float_columns)]
321+
dataframe[float_columns] = dataframe[float_columns].applymap(repr)
322+
dataframe[nonfloat_columns] = (dataframe[nonfloat_columns]
323+
.astype(str))
324+
elif isinstance(numeric_precision, int):
325+
# If precision is specified, round to appropriate precision
326+
float_columns = (dataframe.select_dtypes(include=['floating'])
327+
.columns)
328+
nonfloat_columns = dataframe.columns[~dataframe.columns.isin(
329+
float_columns)]
330+
dataframe[float_columns] = (dataframe[float_columns]
331+
.round(numeric_precision))
332+
# If desired precision is > 10 decimal places, need to use repr
333+
if numeric_precision > 10:
334+
dataframe[float_columns] = (dataframe[float_columns]
335+
.applymap(repr))
336+
dataframe[nonfloat_columns] = (dataframe[nonfloat_columns]
337+
.astype(str))
338+
else:
339+
dataframe = dataframe.astype(str)
340+
else:
341+
raise ValueError('Invalid numeric precision.')
342+
343+
if datatype == 'field':
344+
# If dealing with fields, format ints and strings correctly
345+
dataframe[int_columns] = dataframe[int_columns] + 'i'
346+
dataframe[string_columns] = '"' + dataframe[string_columns] + '"'
347+
348+
dataframe.columns = dataframe.columns.astype(str)
349+
return dataframe
350+
150351
def _datetime_to_epoch(self, datetime, time_precision='s'):
151352
seconds = (datetime - self.EPOCH).total_seconds()
152353
if time_precision == 'h':

0 commit comments

Comments
 (0)