Skip to content

Commit 171536c

Browse files
author
aviau
committed
Improved and fixed DataFrame client
2 parents d1adcce + f8a72cd commit 171536c

File tree

3 files changed

+100
-34
lines changed

3 files changed

+100
-34
lines changed

influxdb/client.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,6 @@ def remove_scheduled_delete(self, delete_id):
293293
"""
294294
raise NotImplementedError()
295295

296-
# Querying Data
297-
#
298-
# GET db/:name/series. It takes five parameters
299296
def query(self, query, time_precision='s', chunked=False):
300297
"""
301298
Quering data
@@ -305,6 +302,13 @@ def query(self, query, time_precision='s', chunked=False):
305302
:param chunked: [Optional, default=False] True if the data shall be
306303
retrieved in chunks, False otherwise.
307304
"""
305+
return self._query(query, time_precision=time_precision,
306+
chunked=chunked)
307+
308+
# Querying Data
309+
#
310+
# GET db/:name/series. It takes five parameters
311+
def _query(self, query, time_precision='s', chunked=False):
308312
if time_precision not in ['s', 'm', 'ms', 'u']:
309313
raise Exception(
310314
"Invalid time precision is given. (use 's', 'm', 'ms' or 'u')")
@@ -431,7 +435,7 @@ def get_list_series(self):
431435
Get a list of all time series in a database
432436
"""
433437

434-
response = self.query('list series')
438+
response = self._query('list series')
435439

436440
series_list = []
437441
for series in response[0]['points']:
@@ -444,7 +448,7 @@ def get_list_continuous_queries(self):
444448
Get a list of continuous queries
445449
"""
446450

447-
response = self.query('list continuous queries')
451+
response = self._query('list continuous queries')
448452
queries_list = []
449453
for query in response[0]['points']:
450454
queries_list.append(query[2])

influxdb/misc.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"""
33
Miscellaneous
44
"""
5-
from time import mktime
5+
import math
66

77
from .client import InfluxDBClient
88

@@ -14,6 +14,9 @@ class DataFrameClient(InfluxDBClient):
1414
The client reads and writes from pandas DataFrames.
1515
"""
1616

17+
import pandas as pd
18+
EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
19+
1720
def write_points(self, data, *args, **kwargs):
1821
"""
1922
write_points()
@@ -27,10 +30,27 @@ def write_points(self, data, *args, **kwargs):
2730
:type batch_size: int
2831
"""
2932

30-
data = [self._convert_dataframe_to_json(name=key, dataframe=value)
31-
for key, value in data.items()]
32-
return InfluxDBClient.write_points_with_precision(self, data,
33-
*args, **kwargs)
33+
batch_size = kwargs.get('batch_size')
34+
if batch_size:
35+
kwargs.pop('batch_size') # don't hand over to InfluxDBClient
36+
for key, data_frame in data.items():
37+
number_batches = int(math.ceil(len(data_frame)
38+
/ float(batch_size)))
39+
for batch in range(number_batches):
40+
start_index = batch * batch_size
41+
end_index = (batch + 1) * batch_size
42+
data = [self._convert_dataframe_to_json(
43+
name=key,
44+
dataframe=data_frame.ix[start_index:end_index].copy())]
45+
InfluxDBClient.write_points_with_precision(self, data,
46+
*args, **kwargs)
47+
return True
48+
else:
49+
data = [self._convert_dataframe_to_json(name=key,
50+
dataframe=dataframe)
51+
for key, dataframe in data.items()]
52+
return InfluxDBClient.write_points_with_precision(self, data,
53+
*args, **kwargs)
3454

3555
def write_points_with_precision(self, data, time_precision='s'):
3656
"""
@@ -51,7 +71,10 @@ def query(self, query, time_precision='s', chunked=False):
5171
result = InfluxDBClient.query(self, query=query,
5272
time_precision=time_precision,
5373
chunked=chunked)
54-
return self._to_dataframe(result[0], time_precision)
74+
if len(result) > 0:
75+
return self._to_dataframe(result[0], time_precision)
76+
else:
77+
return result
5578

5679
def _to_dataframe(self, json_result, time_precision):
5780
try:
@@ -60,6 +83,10 @@ def _to_dataframe(self, json_result, time_precision):
6083
raise ImportError('pandas required for retrieving as dataframe.')
6184
dataframe = pd.DataFrame(data=json_result['points'],
6285
columns=json_result['columns'])
86+
if 'sequence_number' in dataframe.keys():
87+
dataframe.sort(['time', 'sequence_number'], inplace=True)
88+
else:
89+
dataframe.sort(['time'], inplace=True)
6390
pandas_time_unit = time_precision
6491
if time_precision == 'm':
6592
pandas_time_unit = 'ms'
@@ -84,8 +111,14 @@ def _convert_dataframe_to_json(self, dataframe, name):
84111
raise TypeError('Must be DataFrame with DatetimeIndex or \
85112
PeriodIndex.')
86113
dataframe.index = dataframe.index.to_datetime()
87-
dataframe['time'] = [mktime(dt.timetuple()) for dt in dataframe.index]
114+
if dataframe.index.tzinfo is None:
115+
dataframe.index = dataframe.index.tz_localize('UTC')
116+
dataframe['time'] = [self._datetime_to_epoch(dt)
117+
for dt in dataframe.index]
88118
data = {'name': name,
89119
'columns': [str(column) for column in dataframe.columns],
90120
'points': list([list(x) for x in dataframe.values])}
91121
return data
122+
123+
def _datetime_to_epoch(self, datetime):
124+
return (datetime - DataFrameClient.EPOCH).total_seconds()

tests/influxdb/misc_test.py

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66
import json
77
import requests_mock
88
from nose.tools import raises
9-
from datetime import datetime, timedelta
10-
import time
9+
from datetime import timedelta
1110
from tests import skipIfPYpy, using_pypy
1211

1312
if not using_pypy:
@@ -22,17 +21,16 @@
2221
class TestDataFrameClient(unittest.TestCase):
2322

2423
def test_write_points_from_dataframe(self):
25-
now = datetime(2014, 11, 15, 15, 42, 44, 543)
24+
now = pd.Timestamp('1970-01-01 00:00+00:00')
2625
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
2726
index=[now, now + timedelta(hours=1)],
2827
columns=["column_one", "column_two",
2928
"column_three"])
3029
points = [
3130
{
3231
"points": [
33-
["1", 1, 1.0, time.mktime(now.timetuple())],
34-
["2", 2, 2.0, time.mktime((now + timedelta(hours=1))
35-
.timetuple())]
32+
["1", 1, 1.0, 0],
33+
["2", 2, 2.0, 3600]
3634
],
3735
"name": "foo",
3836
"columns": ["column_one", "column_two", "column_three", "time"]
@@ -48,17 +46,30 @@ def test_write_points_from_dataframe(self):
4846

4947
self.assertListEqual(json.loads(m.last_request.body), points)
5048

49+
def test_write_points_from_dataframe_in_batches(self):
50+
now = pd.Timestamp('1970-01-01 00:00+00:00')
51+
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
52+
index=[now, now + timedelta(hours=1)],
53+
columns=["column_one", "column_two",
54+
"column_three"])
55+
with requests_mock.Mocker() as m:
56+
m.register_uri(requests_mock.POST,
57+
"http://localhost:8086/db/db/series")
58+
59+
cli = DataFrameClient(database='db')
60+
assert cli.write_points({"foo": dataframe},
61+
batch_size=1) is True
62+
5163
def test_write_points_from_dataframe_with_numeric_column_names(self):
52-
now = datetime(2014, 11, 15, 15, 42, 44, 543)
64+
now = pd.Timestamp('1970-01-01 00:00+00:00')
5365
# df with numeric column names
5466
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
5567
index=[now, now + timedelta(hours=1)])
5668
points = [
5769
{
5870
"points": [
59-
["1", 1, 1.0, time.mktime(now.timetuple())],
60-
["2", 2, 2.0, time.mktime((now + timedelta(hours=1))
61-
.timetuple())]
71+
["1", 1, 1.0, 0],
72+
["2", 2, 2.0, 3600]
6273
],
6374
"name": "foo",
6475
"columns": ['0', '1', '2', "time"]
@@ -75,18 +86,16 @@ def test_write_points_from_dataframe_with_numeric_column_names(self):
7586
self.assertListEqual(json.loads(m.last_request.body), points)
7687

7788
def test_write_points_from_dataframe_with_period_index(self):
78-
now = datetime(2014, 11, 16)
7989
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
80-
index=[pd.Period('2014-11-16'),
81-
pd.Period('2014-11-17')],
90+
index=[pd.Period('1970-01-01'),
91+
pd.Period('1970-01-02')],
8292
columns=["column_one", "column_two",
8393
"column_three"])
8494
points = [
8595
{
8696
"points": [
87-
["1", 1, 1.0, time.mktime(now.timetuple())],
88-
["2", 2, 2.0, time.mktime((now + timedelta(hours=24))
89-
.timetuple())]
97+
["1", 1, 1.0, 0],
98+
["2", 2, 2.0, 86400]
9099
],
91100
"name": "foo",
92101
"columns": ["column_one", "column_two", "column_three", "time"]
@@ -117,7 +126,7 @@ def test_write_points_from_dataframe_fails_without_time_index(self):
117126

118127
@raises(TypeError)
119128
def test_write_points_from_dataframe_fails_with_series(self):
120-
now = datetime(2014, 11, 16)
129+
now = pd.Timestamp('1970-01-01 00:00+00:00')
121130
dataframe = pd.Series(data=[1.0, 2.0],
122131
index=[now, now + timedelta(hours=1)])
123132

@@ -134,17 +143,37 @@ def test_query_into_dataframe(self):
134143
"name": "foo",
135144
"columns": ["time", "sequence_number", "column_one"],
136145
"points": [
137-
[1383876043, 16, 2], [1383876043, 15, 1],
138-
[1383876035, 14, 2], [1383876035, 13, 1]
146+
[3600, 16, 2], [3600, 15, 1],
147+
[0, 14, 2], [0, 13, 1]
139148
]
140149
}
141150
]
142-
dataframe = pd.DataFrame(data=[[16, 2], [15, 1], [14, 2], [13, 1]],
143-
index=pd.to_datetime([1383876043, 1383876043,
144-
1383876035, 1383876035],
151+
# dataframe sorted ascending by time first, then sequence_number
152+
dataframe = pd.DataFrame(data=[[13, 1], [14, 2], [15, 1], [16, 2]],
153+
index=pd.to_datetime([0, 0,
154+
3600, 3600],
145155
unit='s', utc=True),
146156
columns=['sequence_number', 'column_one'])
147157
with _mocked_session('get', 200, data):
148158
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
149159
result = cli.query('select column_one from foo;')
150160
assert_frame_equal(dataframe, result)
161+
162+
def test_query_with_empty_result(self):
163+
with _mocked_session('get', 200, []):
164+
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
165+
result = cli.query('select column_one from foo;')
166+
assert result == []
167+
168+
def test_list_series(self):
169+
response = [
170+
{
171+
'columns': ['time', 'name'],
172+
'name': 'list_series_result',
173+
'points': [[0, 'seriesA'], [0, 'seriesB']]
174+
}
175+
]
176+
with _mocked_session('get', 200, response):
177+
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
178+
series_list = cli.get_list_series()
179+
assert series_list == ['seriesA', 'seriesB']

0 commit comments

Comments
 (0)