Skip to content

Commit b8206ed

Browse files
author
aviau
committed
Merge PR#86 (Thanks @timtroendle!)
Added support for Pandas DataFrames
2 parents 2610cb1 + 3a00f29 commit b8206ed

File tree

6 files changed

+299
-9
lines changed

6 files changed

+299
-9
lines changed

dev-requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
requests
22
nose
33
mock
4+
pandas
45
Sphinx==1.2.3
5-
sphinx_rtd_theme
6+
sphinx_rtd_theme

examples/tutorial_pandas.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import argparse
2+
import pandas as pd
3+
4+
from influxdb.misc import DataFrameClient
5+
6+
7+
def main(host='localhost', port=8086):
8+
user = 'root'
9+
password = 'root'
10+
dbname = 'example'
11+
12+
client = DataFrameClient(host, port, user, password, dbname)
13+
14+
print("Create pandas DataFrame")
15+
df = pd.DataFrame(data=list(range(30)),
16+
index=pd.date_range(start='2014-11-16',
17+
periods=30, freq='H'))
18+
19+
print("Create database: " + dbname)
20+
client.create_database(dbname)
21+
22+
print("Write DataFrame")
23+
client.write_points({'demo':df})
24+
25+
print("Read DataFrame")
26+
client.query("select * from demo")
27+
28+
print("Delete database: " + dbname)
29+
client.delete_database(dbname)
30+
31+
32+
def parse_args():
33+
parser = argparse.ArgumentParser(
34+
description='example code to play with InfluxDB')
35+
parser.add_argument('--host', type=str, required=False,
36+
default='localhost',
37+
help='hostname of InfluxDB http API')
38+
parser.add_argument('--port', type=int, required=False, default=8086,
39+
help='port of InfluxDB http API')
40+
return parser.parse_args()
41+
42+
43+
if __name__ == '__main__':
44+
args = parse_args()
45+
main(host=args.host, port=args.port)

influxdb/client.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,13 @@ def request(self, url, method='GET', params=None, data=None,
166166
# by doing a POST to /db/foo_production/series?u=some_user&p=some_password
167167
# with a JSON body of points.
168168

169-
def write_points(self, *args, **kwargs):
169+
def write_points(self, data, *args, **kwargs):
170170
"""
171171
write_points()
172172
173173
Write to multiple time series names.
174174
175+
:param data: A list of dicts.
175176
:param batch_size: [Optional] Value to write the points in batches
176177
instead of all at one time. Useful for when doing data dumps from
177178
one database to another or when doing a massive write operation
@@ -186,25 +187,25 @@ def list_chunks(l, n):
186187

187188
batch_size = kwargs.get('batch_size')
188189
if batch_size:
189-
for data in kwargs.get('data'):
190-
name = data.get('name')
191-
columns = data.get('columns')
192-
point_list = data.get('points')
190+
for item in data:
191+
name = item.get('name')
192+
columns = item.get('columns')
193+
point_list = item.get('points')
193194

194195
for batch in list_chunks(point_list, batch_size):
195-
data = [{
196+
item = [{
196197
"points": batch,
197198
"name": name,
198199
"columns": columns
199200
}]
200201
time_precision = kwargs.get('time_precision', 's')
201202
self.write_points_with_precision(
202-
data=data,
203+
data=item,
203204
time_precision=time_precision)
204205

205206
return True
206207

207-
return self.write_points_with_precision(*args, **kwargs)
208+
return self.write_points_with_precision(data, *args, **kwargs)
208209

209210
def write_points_with_precision(self, data, time_precision='s'):
210211
"""
@@ -298,6 +299,11 @@ def remove_scheduled_delete(self, delete_id):
298299
def query(self, query, time_precision='s', chunked=False):
299300
"""
300301
Quering data
302+
303+
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
304+
or 'u'.
305+
:param chunked: [Optional, default=False] True if the data shall be
306+
retrieved in chunks, False otherwise.
301307
"""
302308
if time_precision not in ['s', 'm', 'ms', 'u']:
303309
raise Exception(

influxdb/misc.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Miscellaneous
4+
"""
5+
from time import mktime
6+
7+
from .client import InfluxDBClient
8+
9+
10+
class DataFrameClient(InfluxDBClient):
11+
"""
12+
The ``DataFrameClient`` object holds information necessary to connect
13+
to InfluxDB. Requests can be made to InfluxDB directly through the client.
14+
The client reads and writes from pandas DataFrames.
15+
"""
16+
17+
def write_points(self, data, *args, **kwargs):
18+
"""
19+
write_points()
20+
21+
Write to multiple time series names.
22+
23+
:param data: A dictionary mapping series names to pandas DataFrames
24+
:param batch_size: [Optional] Value to write the points in batches
25+
instead of all at one time. Useful for when doing data dumps from
26+
one database to another or when doing a massive write operation
27+
:type batch_size: int
28+
"""
29+
30+
data = [self._convert_dataframe_to_json(name=key, dataframe=value)
31+
for key, value in data.items()]
32+
return InfluxDBClient.write_points_with_precision(self, data,
33+
*args, **kwargs)
34+
35+
def write_points_with_precision(self, data, time_precision='s'):
36+
"""
37+
Write to multiple time series names
38+
"""
39+
return self.write_points(data, time_precision='s')
40+
41+
def query(self, query, time_precision='s', chunked=False):
42+
"""
43+
Quering data into a DataFrame.
44+
45+
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
46+
or 'u'.
47+
:param chunked: [Optional, default=False] True if the data shall be
48+
retrieved in chunks, False otherwise.
49+
50+
"""
51+
result = InfluxDBClient.query(self, query=query,
52+
time_precision=time_precision,
53+
chunked=chunked)
54+
return self._to_dataframe(result[0], time_precision)
55+
56+
def _to_dataframe(self, json_result, time_precision):
57+
try:
58+
import pandas as pd
59+
except ImportError:
60+
raise ImportError('pandas required for retrieving as dataframe.')
61+
dataframe = pd.DataFrame(data=json_result['points'],
62+
columns=json_result['columns'])
63+
pandas_time_unit = time_precision
64+
if time_precision == 'm':
65+
pandas_time_unit = 'ms'
66+
elif time_precision == 'u':
67+
pandas_time_unit = 'us'
68+
dataframe.index = pd.to_datetime(list(dataframe['time']),
69+
unit=pandas_time_unit,
70+
utc=True)
71+
del dataframe['time']
72+
return dataframe
73+
74+
def _convert_dataframe_to_json(self, dataframe, name):
75+
try:
76+
import pandas as pd
77+
except ImportError:
78+
raise ImportError('pandas required for writing as dataframe.')
79+
if not isinstance(dataframe, pd.DataFrame):
80+
raise TypeError('Must be DataFrame, but type was: {}.'
81+
.format(type(dataframe)))
82+
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
83+
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
84+
raise TypeError('Must be DataFrame with DatetimeIndex or \
85+
PeriodIndex.')
86+
dataframe.index = dataframe.index.to_datetime()
87+
dataframe['time'] = [mktime(dt.timetuple()) for dt in dataframe.index]
88+
data = {'name': name,
89+
'columns': [str(column) for column in dataframe.columns],
90+
'points': list([list(x) for x in dataframe.values])}
91+
return data

test-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
nose
22
mock
33
requests-mock
4+
pandas

tests/influxdb/misc_test.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
unit tests for misc module
4+
"""
5+
import unittest
6+
import json
7+
import requests_mock
8+
from nose.tools import raises
9+
from datetime import datetime, timedelta
10+
import time
11+
import pandas as pd
12+
from pandas.util.testing import assert_frame_equal
13+
14+
from influxdb.misc import DataFrameClient
15+
from .client_test import _mocked_session
16+
17+
18+
class TestDataFrameClient(unittest.TestCase):
19+
20+
def test_write_points_from_dataframe(self):
21+
now = datetime(2014, 11, 15, 15, 42, 44, 543)
22+
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
23+
index=[now, now + timedelta(hours=1)],
24+
columns=["column_one", "column_two",
25+
"column_three"])
26+
points = [
27+
{
28+
"points": [
29+
["1", 1, 1.0, time.mktime(now.timetuple())],
30+
["2", 2, 2.0, time.mktime((now + timedelta(hours=1))
31+
.timetuple())]
32+
],
33+
"name": "foo",
34+
"columns": ["column_one", "column_two", "column_three", "time"]
35+
}
36+
]
37+
38+
with requests_mock.Mocker() as m:
39+
m.register_uri(requests_mock.POST,
40+
"http://localhost:8086/db/db/series")
41+
42+
cli = DataFrameClient(database='db')
43+
cli.write_points({"foo": dataframe})
44+
45+
self.assertListEqual(json.loads(m.last_request.body), points)
46+
47+
def test_write_points_from_dataframe_with_numeric_column_names(self):
48+
now = datetime(2014, 11, 15, 15, 42, 44, 543)
49+
# df with numeric column names
50+
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
51+
index=[now, now + timedelta(hours=1)])
52+
points = [
53+
{
54+
"points": [
55+
["1", 1, 1.0, time.mktime(now.timetuple())],
56+
["2", 2, 2.0, time.mktime((now + timedelta(hours=1))
57+
.timetuple())]
58+
],
59+
"name": "foo",
60+
"columns": ['0', '1', '2', "time"]
61+
}
62+
]
63+
64+
with requests_mock.Mocker() as m:
65+
m.register_uri(requests_mock.POST,
66+
"http://localhost:8086/db/db/series")
67+
68+
cli = DataFrameClient(database='db')
69+
cli.write_points({"foo": dataframe})
70+
71+
self.assertListEqual(json.loads(m.last_request.body), points)
72+
73+
def test_write_points_from_dataframe_with_period_index(self):
74+
now = datetime(2014, 11, 16)
75+
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
76+
index=[pd.Period('2014-11-16'),
77+
pd.Period('2014-11-17')],
78+
columns=["column_one", "column_two",
79+
"column_three"])
80+
points = [
81+
{
82+
"points": [
83+
["1", 1, 1.0, time.mktime(now.timetuple())],
84+
["2", 2, 2.0, time.mktime((now + timedelta(hours=24))
85+
.timetuple())]
86+
],
87+
"name": "foo",
88+
"columns": ["column_one", "column_two", "column_three", "time"]
89+
}
90+
]
91+
92+
with requests_mock.Mocker() as m:
93+
m.register_uri(requests_mock.POST,
94+
"http://localhost:8086/db/db/series")
95+
96+
cli = DataFrameClient(database='db')
97+
cli.write_points({"foo": dataframe})
98+
99+
self.assertListEqual(json.loads(m.last_request.body), points)
100+
101+
@raises(TypeError)
102+
def test_write_points_from_dataframe_fails_without_time_index(self):
103+
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
104+
columns=["column_one", "column_two",
105+
"column_three"])
106+
107+
with requests_mock.Mocker() as m:
108+
m.register_uri(requests_mock.POST,
109+
"http://localhost:8086/db/db/series")
110+
111+
cli = DataFrameClient(database='db')
112+
cli.write_points({"foo": dataframe})
113+
114+
@raises(TypeError)
115+
def test_write_points_from_dataframe_fails_with_series(self):
116+
now = datetime(2014, 11, 16)
117+
dataframe = pd.Series(data=[1.0, 2.0],
118+
index=[now, now + timedelta(hours=1)])
119+
120+
with requests_mock.Mocker() as m:
121+
m.register_uri(requests_mock.POST,
122+
"http://localhost:8086/db/db/series")
123+
124+
cli = DataFrameClient(database='db')
125+
cli.write_points({"foo": dataframe})
126+
127+
def test_query_into_dataframe(self):
128+
data = [
129+
{
130+
"name": "foo",
131+
"columns": ["time", "sequence_number", "column_one"],
132+
"points": [
133+
[1383876043, 16, 2], [1383876043, 15, 1],
134+
[1383876035, 14, 2], [1383876035, 13, 1]
135+
]
136+
}
137+
]
138+
dataframe = pd.DataFrame(data=[[16, 2], [15, 1], [14, 2], [13, 1]],
139+
index=pd.to_datetime([1383876043, 1383876043,
140+
1383876035, 1383876035],
141+
unit='s', utc=True),
142+
columns=['sequence_number', 'column_one'])
143+
with _mocked_session('get', 200, data):
144+
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
145+
result = cli.query('select column_one from foo;')
146+
assert_frame_equal(dataframe, result)

0 commit comments

Comments
 (0)