Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Fix: add support for custom indexes for query in the DataFrameClient (#785) #827

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added
- Add support for custom headers in the InfluxDBClient (#710 thx @nathanielatom)
- Add support for custom indexes for query in the DataFrameClient (#785)

### Changed
- Amend retry to avoid sleep after last retry before raising exception (#790 thx @krzysbaranski)
Expand Down
23 changes: 16 additions & 7 deletions influxdb/_dataframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def query(self,
chunked=False,
chunk_size=0,
method="GET",
dropna=True):
dropna=True,
data_frame_index=None):
"""
Query data into a DataFrame.

Expand Down Expand Up @@ -181,6 +182,8 @@ def query(self,
containing all results within that chunk
:param chunk_size: Size of each chunk to tell InfluxDB to use.
:param dropna: drop columns where all values are missing
:param data_frame_index: the list of columns that
are used as DataFrame index
:returns: the queried data
:rtype: :class:`~.ResultSet`
"""
Expand All @@ -196,13 +199,14 @@ def query(self,
results = super(DataFrameClient, self).query(query, **query_args)
if query.strip().upper().startswith("SELECT"):
if len(results) > 0:
return self._to_dataframe(results, dropna)
return self._to_dataframe(results, dropna,
data_frame_index=data_frame_index)
else:
return {}
else:
return results

def _to_dataframe(self, rs, dropna=True):
def _to_dataframe(self, rs, dropna=True, data_frame_index=None):
result = defaultdict(list)
if isinstance(rs, list):
return map(self._to_dataframe, rs,
Expand All @@ -216,10 +220,15 @@ def _to_dataframe(self, rs, dropna=True):
key = (name, tuple(sorted(tags.items())))
df = pd.DataFrame(data)
df.time = pd.to_datetime(df.time)
df.set_index('time', inplace=True)
if df.index.tzinfo is None:
df.index = df.index.tz_localize('UTC')
df.index.name = None

if data_frame_index:
df.set_index(data_frame_index, inplace=True)
else:
df.set_index('time', inplace=True)
if df.index.tzinfo is None:
df.index = df.index.tz_localize('UTC')
df.index.name = None

result[key].append(df)
for key, data in result.items():
df = pd.concat(data).sort_index()
Expand Down
4 changes: 2 additions & 2 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def alter_retention_policy(self, name, database=None,
query_string = (
"ALTER RETENTION POLICY {0} ON {1}"
).format(quote_ident(name),
quote_ident(database or self._database), shard_duration)
quote_ident(database or self._database))
if duration:
query_string += " DURATION {0}".format(duration)
if shard_duration:
Expand Down Expand Up @@ -958,7 +958,7 @@ def drop_user(self, username):
:param username: the username to drop
:type username: str
"""
text = "DROP USER {0}".format(quote_ident(username), method="POST")
text = "DROP USER {0}".format(quote_ident(username))
self.query(text, method="POST")

def set_user_password(self, username, password):
Expand Down
2 changes: 1 addition & 1 deletion influxdb/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __new__(cls, *args, **kwargs):
allowed_time_precisions = ['h', 'm', 's', 'ms', 'u', 'ns', None]
if cls._time_precision not in allowed_time_precisions:
raise AttributeError(
'In {0}, time_precision is set, but invalid use any of {}.'
'In {}, time_precision is set, but invalid use any of {}.'
.format(cls.__name__, ','.join(allowed_time_precisions)))

cls._retention_policy = getattr(_meta, 'retention_policy', None)
Expand Down
6 changes: 3 additions & 3 deletions influxdb/influxdb08/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,10 @@ def write_points(self, data, time_precision='s', *args, **kwargs):
:type batch_size: int

"""
def list_chunks(l, n):
def list_chunks(data_list, n):
"""Yield successive n-sized chunks from l."""
for i in xrange(0, len(l), n):
yield l[i:i + n]
for i in xrange(0, len(data_list), n):
yield data_list[i:i + n]

batch_size = kwargs.get('batch_size')
if batch_size and batch_size > 0:
Expand Down
34 changes: 34 additions & 0 deletions influxdb/tests/dataframe_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,3 +1240,37 @@ def test_write_points_from_dataframe_with_tags_and_nan_json(self):
cli.write_points(dataframe, 'foo', tags=None, protocol='json',
tag_columns=['tag_one', 'tag_two'])
self.assertEqual(m.last_request.body, expected)

def test_query_custom_index(self):
"""Test query with custom indexes."""
data = {
"results": [
{
"series": [
{
"name": "cpu_load_short",
"columns": ["time", "value", "host"],
"values": [
[1, 0.55, "local"],
[2, 23422, "local"],
[3, 0.64, "local"]
]
}
]
}
]
}

cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
iql = "SELECT value FROM cpu_load_short WHERE region=$region;" \
"SELECT count(value) FROM cpu_load_short WHERE region=$region"
bind_params = {'region': 'us-west'}
with _mocked_session(cli, 'GET', 200, data):
result = cli.query(iql, bind_params=bind_params,
data_frame_index=["time", "host"])

_data_frame = result['cpu_load_short']
print(_data_frame)

self.assertListEqual(["time", "host"],
list(_data_frame.index.names))