Skip to content

Commit ae45809

Browse files
committed
Merge pull request influxdata#163 from tzonghao/master
DataFrameClient for InfluxDB 0.9 (Thanks @tzonghao !)
2 parents 4934fd1 + aa9aace commit ae45809

File tree

5 files changed

+409
-188
lines changed

5 files changed

+409
-188
lines changed

examples/tutorial_pandas.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ def main(host='localhost', port=8086):
2020
client.create_database(dbname)
2121

2222
print("Write DataFrame")
23-
client.write_points({'demo': df})
23+
client.write_points(df, 'demo')
24+
25+
print("Write DataFrame with Tags")
26+
client.write_points(df, 'demo', {'k1': 'v1', 'k2': 'v2'})
2427

2528
print("Read DataFrame")
2629
client.query("select * from demo")

influxdb/_dataframe_client.py

Lines changed: 98 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,22 @@
33
DataFrame client for InfluxDB
44
"""
55
import math
6-
import warnings
6+
7+
import pandas as pd
78

89
from .client import InfluxDBClient
910

10-
import pandas as pd
11+
12+
def _pandas_time_unit(time_precision):
13+
unit = time_precision
14+
if time_precision == 'm':
15+
unit = 'ms'
16+
elif time_precision == 'u':
17+
unit = 'us'
18+
elif time_precision == 'n':
19+
unit = 'ns'
20+
assert unit in ('s', 'ms', 'us', 'ns')
21+
return unit
1122

1223

1324
class DataFrameClient(InfluxDBClient):
@@ -19,112 +30,128 @@ class DataFrameClient(InfluxDBClient):
1930

2031
EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
2132

22-
def write_points(self, data, *args, **kwargs):
33+
def write_points(self, dataframe, measurement, tags=None,
34+
time_precision=None, database=None, retention_policy=None,
35+
batch_size=None):
2336
"""
2437
Write to multiple time series names.
2538
26-
:param data: A dictionary mapping series names to pandas DataFrames
27-
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
28-
or 'u'.
39+
:param dataframe: data points in a DataFrame
40+
:param measurement: name of measurement
41+
:param tags: dictionary of tags, with string key-values
42+
:param time_precision: [Optional, default 's'] Either 's', 'ms', 'u'
43+
or 'n'.
2944
:param batch_size: [Optional] Value to write the points in batches
3045
instead of all at one time. Useful for when doing data dumps from
3146
one database to another or when doing a massive write operation
3247
:type batch_size: int
33-
"""
3448
35-
batch_size = kwargs.get('batch_size')
36-
time_precision = kwargs.get('time_precision', 's')
49+
"""
3750
if batch_size:
38-
kwargs.pop('batch_size') # don't hand over to InfluxDBClient
39-
for key, data_frame in data.items():
40-
number_batches = int(math.ceil(
41-
len(data_frame) / float(batch_size)))
42-
for batch in range(number_batches):
43-
start_index = batch * batch_size
44-
end_index = (batch + 1) * batch_size
45-
data = [self._convert_dataframe_to_json(
46-
name=key,
47-
dataframe=data_frame.ix[start_index:end_index].copy(),
48-
time_precision=time_precision)]
49-
super(DataFrameClient, self).write_points(data,
50-
*args, **kwargs)
51+
number_batches = int(math.ceil(
52+
len(dataframe) / float(batch_size)))
53+
for batch in range(number_batches):
54+
start_index = batch * batch_size
55+
end_index = (batch + 1) * batch_size
56+
points = self._convert_dataframe_to_json(
57+
dataframe.ix[start_index:end_index].copy(),
58+
measurement,
59+
tags
60+
)
61+
super(DataFrameClient, self).write_points(
62+
points, time_precision, database, retention_policy)
5163
return True
5264
else:
53-
data = [self._convert_dataframe_to_json(
54-
name=key, dataframe=dataframe, time_precision=time_precision)
55-
for key, dataframe in data.items()]
56-
return super(DataFrameClient, self).write_points(data,
57-
*args, **kwargs)
58-
59-
def write_points_with_precision(self, data, time_precision='s'):
60-
"""
61-
DEPRECATED. Write to multiple time series names
65+
points = self._convert_dataframe_to_json(
66+
dataframe, measurement, tags
67+
)
68+
super(DataFrameClient, self).write_points(
69+
points, time_precision, database, retention_policy)
70+
return True
6271

63-
"""
64-
warnings.warn(
65-
"write_points_with_precision is deprecated, and will be removed "
66-
"in future versions. Please use "
67-
"``DataFrameClient.write_points(time_precision='..')`` instead.",
68-
FutureWarning)
69-
return self.write_points(data, time_precision='s')
70-
71-
def query(self, query, time_precision='s', chunked=False, database=None):
72+
def query(self, query, chunked=False, database=None):
7273
"""
7374
Quering data into a DataFrame.
7475
75-
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
76-
or 'u'.
7776
:param chunked: [Optional, default=False] True if the data shall be
7877
retrieved in chunks, False otherwise.
7978
8079
"""
8180
results = super(DataFrameClient, self).query(query, database=database)
82-
if len(results) > 0:
83-
return self._to_dataframe(results, time_precision)
81+
if query.upper().startswith("SELECT"):
82+
if len(results) > 0:
83+
return self._to_dataframe(results)
84+
else:
85+
return {}
8486
else:
8587
return results
8688

87-
def _to_dataframe(self, json_result, time_precision):
88-
dataframe = pd.DataFrame(data=json_result['points'],
89-
columns=json_result['columns'])
90-
if 'sequence_number' in dataframe:
91-
dataframe.sort(['time', 'sequence_number'], inplace=True)
89+
def get_list_series(self, database=None):
90+
"""
91+
Get the list of series, in DataFrame
92+
93+
"""
94+
results = super(DataFrameClient, self)\
95+
.query("SHOW SERIES", database=database)
96+
if len(results):
97+
return dict(
98+
(key[0], pd.DataFrame(data)) for key, data in results.items()
99+
)
92100
else:
93-
dataframe.sort(['time'], inplace=True)
94-
pandas_time_unit = time_precision
95-
if time_precision == 'm':
96-
pandas_time_unit = 'ms'
97-
elif time_precision == 'u':
98-
pandas_time_unit = 'us'
99-
dataframe.index = pd.to_datetime(list(dataframe['time']),
100-
unit=pandas_time_unit,
101-
utc=True)
102-
del dataframe['time']
103-
return dataframe
104-
105-
def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'):
101+
return {}
102+
103+
def _to_dataframe(self, rs):
104+
result = {}
105+
for key, data in rs.items():
106+
name, tags = key
107+
if tags is None:
108+
key = name
109+
else:
110+
key = (name, tuple(sorted(tags.items())))
111+
df = pd.DataFrame(data)
112+
df.time = pd.to_datetime(df.time)
113+
df.set_index('time', inplace=True)
114+
df.index = df.index.tz_localize('UTC')
115+
df.index.name = None
116+
result[key] = df
117+
return result
118+
119+
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
120+
106121
if not isinstance(dataframe, pd.DataFrame):
107122
raise TypeError('Must be DataFrame, but type was: {}.'
108123
.format(type(dataframe)))
109124
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
110125
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
111126
raise TypeError('Must be DataFrame with DatetimeIndex or \
112127
PeriodIndex.')
128+
113129
dataframe.index = dataframe.index.to_datetime()
114130
if dataframe.index.tzinfo is None:
115131
dataframe.index = dataframe.index.tz_localize('UTC')
116-
dataframe['time'] = [self._datetime_to_epoch(dt, time_precision)
117-
for dt in dataframe.index]
118-
data = {'name': name,
119-
'columns': [str(column) for column in dataframe.columns],
120-
'points': list([list(x) for x in dataframe.values])}
121-
return data
132+
133+
# Convert column to strings
134+
dataframe.columns = dataframe.columns.astype('str')
135+
136+
# Convert dtype for json serialization
137+
dataframe = dataframe.astype('object')
138+
139+
points = [
140+
{'name': measurement,
141+
'tags': tags if tags else {},
142+
'fields': rec,
143+
'timestamp': ts.isoformat()
144+
}
145+
for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))]
146+
return points
122147

123148
def _datetime_to_epoch(self, datetime, time_precision='s'):
124149
seconds = (datetime - self.EPOCH).total_seconds()
125150
if time_precision == 's':
126151
return seconds
127-
elif time_precision == 'm' or time_precision == 'ms':
128-
return seconds * 1000
152+
elif time_precision == 'ms':
153+
return seconds * 10 ** 3
129154
elif time_precision == 'u':
130-
return seconds * 1000000
155+
return seconds * 10 ** 6
156+
elif time_precision == 'n':
157+
return seconds * 10 ** 9

0 commit comments

Comments
 (0)