Skip to content

Commit de75e73

Browse files
committed
Add support for custom indexes for query in the DataFrameClient (influxdata#785)
1 parent 95e0efb commit de75e73

File tree

3 files changed

+49
-7
lines changed

3 files changed

+49
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
88

99
### Added
1010
- Add support for custom headers in the InfluxDBClient (#710 thx @nathanielatom)
11+
- Add support for custom indexes for query in the DataFrameClient (#785)
1112

1213
### Changed
1314
- Amend retry to avoid sleep after last retry before raising exception (#790 thx @krzysbaranski)

influxdb/_dataframe_client.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import math
1010
from collections import defaultdict
11+
from typing import List
1112

1213
import pandas as pd
1314
import numpy as np
@@ -152,7 +153,8 @@ def query(self,
152153
chunked=False,
153154
chunk_size=0,
154155
method="GET",
155-
dropna=True):
156+
dropna=True,
157+
data_frame_index: List[str] = None):
156158
"""
157159
Query data into a DataFrame.
158160
@@ -181,6 +183,7 @@ def query(self,
181183
containing all results within that chunk
182184
:param chunk_size: Size of each chunk to tell InfluxDB to use.
183185
:param dropna: drop columns where all values are missing
186+
:param data_frame_index: the list of columns that are used as DataFrame index
184187
:returns: the queried data
185188
:rtype: :class:`~.ResultSet`
186189
"""
@@ -196,13 +199,13 @@ def query(self,
196199
results = super(DataFrameClient, self).query(query, **query_args)
197200
if query.strip().upper().startswith("SELECT"):
198201
if len(results) > 0:
199-
return self._to_dataframe(results, dropna)
202+
return self._to_dataframe(results, dropna, data_frame_index=data_frame_index)
200203
else:
201204
return {}
202205
else:
203206
return results
204207

205-
def _to_dataframe(self, rs, dropna=True):
208+
def _to_dataframe(self, rs, dropna=True, data_frame_index: List[str] = None):
206209
result = defaultdict(list)
207210
if isinstance(rs, list):
208211
return map(self._to_dataframe, rs,
@@ -216,10 +219,15 @@ def _to_dataframe(self, rs, dropna=True):
216219
key = (name, tuple(sorted(tags.items())))
217220
df = pd.DataFrame(data)
218221
df.time = pd.to_datetime(df.time)
219-
df.set_index('time', inplace=True)
220-
if df.index.tzinfo is None:
221-
df.index = df.index.tz_localize('UTC')
222-
df.index.name = None
222+
223+
if data_frame_index:
224+
df.set_index(data_frame_index, inplace=True)
225+
else:
226+
df.set_index('time', inplace=True)
227+
if df.index.tzinfo is None:
228+
df.index = df.index.tz_localize('UTC')
229+
df.index.name = 'time'
230+
223231
result[key].append(df)
224232
for key, data in result.items():
225233
df = pd.concat(data).sort_index()

influxdb/tests/dataframe_client_test.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1240,3 +1240,36 @@ def test_write_points_from_dataframe_with_tags_and_nan_json(self):
12401240
cli.write_points(dataframe, 'foo', tags=None, protocol='json',
12411241
tag_columns=['tag_one', 'tag_two'])
12421242
self.assertEqual(m.last_request.body, expected)
1243+
1244+
def test_query_custom_index(self):
1245+
data = {
1246+
"results": [
1247+
{
1248+
"series": [
1249+
{
1250+
"name": "cpu_load_short",
1251+
"columns": ["time", "value", "host"],
1252+
"values": [
1253+
[1, 0.55, "local"],
1254+
[2, 23422, "local"],
1255+
[3, 0.64, "local"]
1256+
]
1257+
}
1258+
]
1259+
}
1260+
]
1261+
}
1262+
1263+
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
1264+
iql = "SELECT value FROM cpu_load_short WHERE region=$region;" \
1265+
"SELECT count(value) FROM cpu_load_short WHERE region=$region"
1266+
bind_params = {'region': 'us-west'}
1267+
with _mocked_session(cli, 'GET', 200, data):
1268+
result = cli.query(iql, bind_params=bind_params, data_frame_index=["time", "host"])
1269+
1270+
_data_frame = result['cpu_load_short']
1271+
print(_data_frame)
1272+
1273+
self.assertListEqual(["time", "host"], list(_data_frame.index.names))
1274+
1275+

0 commit comments

Comments
 (0)