Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Dataframe client support for (i) tag columns and (ii) direct conversion to line protocol #364

Merged
merged 8 commits into from
Sep 6, 2016
Merged
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ notifications:
sudo: false

# Travis caching
cache:
directories:
- $HOME/.cache/pip
before_cache:
- rm -f $HOME/.cache/pip/log/debug.log
cache: false
# directories:
# - $HOME/.cache/pip
#before_cache:
# - rm -f $HOME/.cache/pip/log/debug.log
239 changes: 220 additions & 19 deletions influxdb/_dataframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@ class DataFrameClient(InfluxDBClient):

EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')

def write_points(self, dataframe, measurement, tags=None,
time_precision=None, database=None, retention_policy=None,
batch_size=None):
def write_points(self,
dataframe,
measurement,
tags=None,
tag_columns=[],
field_columns=[],
time_precision=None,
database=None,
retention_policy=None,
batch_size=None,
protocol='line',
numeric_precision=None):
"""
Write to multiple time series names.

Expand All @@ -50,27 +59,67 @@ def write_points(self, dataframe, measurement, tags=None,
instead of all at one time. Useful for when doing data dumps from
one database to another or when doing a massive write operation
:type batch_size: int
:param protocol: Protocol for writing data. Either 'line' or 'json'.
:param numeric_precision: Precision for floating point values.
Either None, 'full' or some int, where int is the desired decimal
precision. 'full' preserves full precision for int and float
datatypes. Defaults to None, which preserves 14-15 significant
figures for float and all significant figures for int datatypes.

"""
if batch_size:
number_batches = int(math.ceil(
len(dataframe) / float(batch_size)))
number_batches = int(math.ceil(len(dataframe) / float(batch_size)))
for batch in range(number_batches):
start_index = batch * batch_size
end_index = (batch + 1) * batch_size
points = self._convert_dataframe_to_json(
dataframe.ix[start_index:end_index].copy(),
measurement, tags, time_precision
)
if protocol == 'line':
points = self._convert_dataframe_to_lines(
dataframe.ix[start_index:end_index].copy(),
measurement=measurement,
global_tags=tags,
time_precision=time_precision,
tag_columns=tag_columns,
field_columns=field_columns,
numeric_precision=numeric_precision)
else:
points = self._convert_dataframe_to_json(
dataframe.ix[start_index:end_index].copy(),
measurement=measurement,
tags=tags,
time_precision=time_precision,
tag_columns=tag_columns,
field_columns=field_columns)
super(DataFrameClient, self).write_points(
points, time_precision, database, retention_policy)
points,
time_precision,
database,
retention_policy,
protocol=protocol)
return True
else:
points = self._convert_dataframe_to_json(
dataframe, measurement, tags, time_precision
)
if protocol == 'line':
points = self._convert_dataframe_to_lines(
dataframe,
measurement=measurement,
global_tags=tags,
tag_columns=tag_columns,
field_columns=field_columns,
time_precision=time_precision,
numeric_precision=numeric_precision)
else:
points = self._convert_dataframe_to_json(
dataframe,
measurement=measurement,
tags=tags,
time_precision=time_precision,
tag_columns=tag_columns,
field_columns=field_columns)
super(DataFrameClient, self).write_points(
points, time_precision, database, retention_policy)
points,
time_precision,
database,
retention_policy,
protocol=protocol)
return True

def query(self, query, chunked=False, database=None):
Expand Down Expand Up @@ -108,7 +157,12 @@ def _to_dataframe(self, rs):
result[key] = df
return result

def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
def _convert_dataframe_to_json(self,
dataframe,
measurement,
tags=None,
tag_columns=[],
field_columns=[],
time_precision=None):

if not isinstance(dataframe, pd.DataFrame):
Expand All @@ -119,6 +173,15 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
raise TypeError('Must be DataFrame with DatetimeIndex or \
PeriodIndex.')

# Make sure tags and tag columns are correctly typed
tag_columns = tag_columns if tag_columns else []
field_columns = field_columns if field_columns else []
tags = tags if tags else {}
# Assume field columns are all columns not included in tag columns
if not field_columns:
field_columns = list(
set(dataframe.columns).difference(set(tag_columns)))

dataframe.index = dataframe.index.to_datetime()
if dataframe.index.tzinfo is None:
dataframe.index = dataframe.index.tz_localize('UTC')
Expand All @@ -140,13 +203,151 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,

points = [
{'measurement': measurement,
'tags': tags if tags else {},
'tags': dict(list(tag.items()) + list(tags.items())),
'fields': rec,
'time': int(ts.value / precision_factor)
}
for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))]
'time': int(ts.value / precision_factor)}
for ts, tag, rec in zip(dataframe.index,
dataframe[tag_columns].to_dict('record'),
dataframe[field_columns].to_dict('record'))
]

return points

def _convert_dataframe_to_lines(self,
dataframe,
measurement,
field_columns=[],
tag_columns=[],
global_tags={},
time_precision=None,
numeric_precision=None):

if not isinstance(dataframe, pd.DataFrame):
raise TypeError('Must be DataFrame, but type was: {0}.'
.format(type(dataframe)))
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
raise TypeError('Must be DataFrame with DatetimeIndex or \
PeriodIndex.')

# Create a Series of columns for easier indexing
column_series = pd.Series(dataframe.columns)

if field_columns is None:
field_columns = []
if tag_columns is None:
tag_columns = []

# Make sure field_columns and tag_columns are lists
field_columns = list(field_columns) if list(field_columns) else []
tag_columns = list(tag_columns) if list(tag_columns) else []

# If field columns but no tag columns, assume rest of columns are tags
if field_columns and (not tag_columns):
tag_columns = list(column_series[~column_series.isin(
field_columns)])

# If no field columns, assume non-tag columns are fields
if not field_columns:
field_columns = list(column_series[~column_series.isin(
tag_columns)])

precision_factor = {
"n": 1,
"u": 1e3,
"ms": 1e6,
"s": 1e9,
"m": 1e9 * 60,
"h": 1e9 * 3600,
}.get(time_precision, 1)

# Make array of timestamp ints
time = ((dataframe.index.to_datetime().values.astype(int) /
precision_factor).astype(int).astype(str))

# If tag columns exist, make an array of formatted tag keys and values
if tag_columns:
tag_df = dataframe[tag_columns]
tag_df = self._stringify_dataframe(
tag_df, numeric_precision, datatype='tag')
tags = (',' + (
(tag_df.columns.values + '=').tolist() + tag_df)).sum(axis=1)
del tag_df

else:
tags = ''

# Make an array of formatted field keys and values
field_df = dataframe[field_columns]
field_df = self._stringify_dataframe(
field_df, numeric_precision, datatype='field')
field_df = (field_df.columns.values + '=').tolist() + field_df
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
fields = field_df.sum(axis=1)
del field_df

# Add any global tags to formatted tag strings
if global_tags:
global_tags = ','.join(['='.join([tag, global_tags[tag]])
for tag in global_tags])
if tag_columns:
tags = tags + ',' + global_tags
else:
tags = ',' + global_tags

# Generate line protocol string
points = (measurement + tags + ' ' + fields + ' ' + time).tolist()
return points

def _stringify_dataframe(self,
dataframe,
numeric_precision,
datatype='field'):

# Find int and string columns for field-type data
int_columns = dataframe.select_dtypes(include=['integer']).columns
string_columns = dataframe.select_dtypes(include=['object']).columns

# Convert dataframe to string
if numeric_precision is None:
# If no precision specified, convert directly to string (fast)
dataframe = dataframe.astype(str)
elif numeric_precision == 'full':
# If full precision, use repr to get full float precision
float_columns = (dataframe.select_dtypes(include=['floating'])
.columns)
nonfloat_columns = dataframe.columns[~dataframe.columns.isin(
float_columns)]
dataframe[float_columns] = dataframe[float_columns].applymap(repr)
dataframe[nonfloat_columns] = (dataframe[nonfloat_columns]
.astype(str))
elif isinstance(numeric_precision, int):
# If precision is specified, round to appropriate precision
float_columns = (dataframe.select_dtypes(include=['floating'])
.columns)
nonfloat_columns = dataframe.columns[~dataframe.columns.isin(
float_columns)]
dataframe[float_columns] = (dataframe[float_columns]
.round(numeric_precision))
# If desired precision is > 10 decimal places, need to use repr
if numeric_precision > 10:
dataframe[float_columns] = (dataframe[float_columns]
.applymap(repr))
dataframe[nonfloat_columns] = (dataframe[nonfloat_columns]
.astype(str))
else:
dataframe = dataframe.astype(str)
else:
raise ValueError('Invalid numeric precision.')

if datatype == 'field':
# If dealing with fields, format ints and strings correctly
dataframe[int_columns] = dataframe[int_columns] + 'i'
dataframe[string_columns] = '"' + dataframe[string_columns] + '"'

dataframe.columns = dataframe.columns.astype(str)
return dataframe

def _datetime_to_epoch(self, datetime, time_precision='s'):
seconds = (datetime - self.EPOCH).total_seconds()
if time_precision == 'h':
Expand Down
Loading