Convert dataframes directly to line protocol during 'write_points()' operation #363
Description
The current dataframe client's write_points
method is probably not as efficient as it could be: the dataframe is first converted to a json-like dict with _convert_dataframe_to_json
, and then written to line protocol with make_lines
. Converting the dataframe to line protocol directly would avoid a lot of overhead associated with iterating through dicts.
This could be done by implementing a simple _convert_dataframe_to_lines
method in the dataframe client that takes advantage of pandas' vectorized string methods. This could then be passed to the write
method directly.
def _convert_dataframe_to_lines(self, dataframe, measurement, field_columns=[],
tag_columns=[], global_tags={},
time_precision=None, numeric_precision=None):
if not isinstance(dataframe, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(dataframe)))
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
raise TypeError('Must be DataFrame with DatetimeIndex or \
PeriodIndex.')
string_columns = dataframe.select_dtypes(include=['object']).columns
if field_columns is None:
field_columns = []
if tag_columns is None:
tag_columns = []
field_columns = list(field_columns) if list(field_columns) else []
tag_columns = list(tag_columns) if list(tag_columns) else []
# Assume that all columns not listed as tag columns are field columns
if not field_columns:
field_columns = list(set(dataframe.columns).difference(set(tag_columns)))
precision_factor = {
"n": 1,
"u": 1e3,
"ms": 1e6,
"s": 1e9,
"m": 1e9 * 60,
"h": 1e9 * 3600,
}.get(time_precision, 1)
# Make array of timestamp ints
time = ((dataframe.index.astype(int) / precision_factor)
.astype(int).astype(str))
# If tag columns exist, make an array of formatted tag keys and values
if tag_columns:
tag_df = dataframe[tag_columns]
tag_df = self._stringify_dataframe(tag_df, numeric_precision)
tags = (',' + ((tag_df.columns + '=').tolist() + tag_df)).sum(axis=1)
del tag_df
else:
tags = ''
# Make an array of formatted field keys and values
field_df = dataframe[field_columns]
field_df = self._stringify_dataframe(field_df, numeric_precision)
field_df[string_columns] = '"' + field_df[string_columns] + '"'
field_df = (field_df.columns + '=').tolist() + field_df
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
fields = field_df.sum(axis=1)
del field_df
# Add any global tags to formatted tag strings
if global_tags:
global_tags = ','.join(['='.join([tag, global_tags[tag]])
for tag in global_tags])
if tag_columns:
tags = tags + ',' + global_tags
else:
tags = ',' + global_tags
# Generate line protocol string
points = (measurement + tags + ' ' +
fields + ' ' + time).tolist()
return points
def _stringify_dataframe(self, dataframe, numeric_precision):
float_columns = dataframe.select_dtypes(include=['floating']).columns
nonfloat_columns = dataframe.columns[
~dataframe.columns.isin(float_columns)
]
numeric_columns = dataframe.select_dtypes(include=['number']).columns
# Convert dataframe to string
if numeric_precision is None:
dataframe = dataframe.astype(str)
elif numeric_precision == 'full':
dataframe[float_columns] = dataframe[float_columns].applymap(repr)
dataframe[nonfloat_columns] = dataframe[nonfloat_columns].astype(str)
elif isinstance(numeric_precision, int):
dataframe[numeric_columns] = (dataframe[numeric_columns]
.round(numeric_precision))
dataframe = dataframe.astype(str)
else:
raise ValueError('Invalid numeric precision')
dataframe.columns = dataframe.columns.astype(str)
return dataframe
Here's an example usage:
>>> print(df)
color value status
2016-08-12 02:17:28.191727141+00:00 red 9.1 1
2016-08-12 02:17:51.082619738+00:00 blue 8.3 0
2016-08-18 07:05:10.979251109+00:00 green 2.0 0
2016-08-18 07:11:11.434227816+00:00 green 1.0 0
2016-08-09 21:48:05.394423800+00:00 blue 1.0 0
2016-08-09 21:48:16.696269144+00:00 blue 1.3 1
2016-08-09 21:48:28.603980937+00:00 blue 0.9 0
2016-08-09 22:01:56.629486728+00:00 blue 0.8 1
2016-08-09 22:02:24.966032304+00:00 blue 0.7 0
2016-08-09 22:16:46.645324098+00:00 blue 1.9 0
>>> _convert_dataframe_to_lines(None, dataframe=df, measurement='var1', tag_columns=['color', 'status'], field_columns=['value'], global_tags={'global':'tag'})
b'var1,color=red,status=1,global=tag value=9.1 1470968248191727104\nvar1,color=blue,status=0,global=tag value=8.3 1470968271082619648\nvar1,color=green,status=0,global=tag value=2.0 1471503910979251200\nvar1,color=green,status=0,global=tag value=1.0 1471504271434227712\nvar1,color=blue,status=0,global=tag value=1.0 1470779285394423808\nvar1,color=blue,status=1,global=tag value=1.3 1470779296696269056\nvar1,color=blue,status=0,global=tag value=0.9 1470779308603981056\nvar1,color=blue,status=1,global=tag value=0.8 1470780116629486848\nvar1,color=blue,status=0,global=tag value=0.7 1470780144966032384\nvar1,color=blue,status=0,global=tag value=1.9 1470781006645324032\n'
This approach also uses the tag_columns
functionality I proposed in #362
This implementation could use some more thorough testing for edge cases, but I can start a PR if anyone is interested.