Skip to content

Commit e07dafc

Browse files
committed
Merge branch 'gusutabopb-dataframeclient_chunked_queries'
2 parents d8907d3 + 678e5ed commit e07dafc

File tree

1 file changed

+46
-9
lines changed

1 file changed

+46
-9
lines changed

influxdb/_dataframe_client.py

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from __future__ import unicode_literals
88

99
import math
10+
from collections import defaultdict
1011

1112
import pandas as pd
1213

@@ -137,23 +138,54 @@ def write_points(self,
137138

138139
return True
139140

140-
def query(self, query, chunked=False, database=None):
141-
"""Quering data into a DataFrame.
142-
143-
:param chunked: [Optional, default=False] True if the data shall be
144-
retrieved in chunks, False otherwise.
141+
def query(self,
142+
query,
143+
params=None,
144+
epoch=None,
145+
expected_response_code=200,
146+
database=None,
147+
raise_errors=True,
148+
chunked=False,
149+
chunk_size=0,
150+
dropna=True):
151+
"""
152+
Quering data into a DataFrame.
153+
154+
:param query: the actual query string
155+
:param params: additional parameters for the request, defaults to {}
156+
:param epoch: response timestamps to be in epoch format either 'h',
157+
'm', 's', 'ms', 'u', or 'ns',defaults to `None` which is
158+
RFC3339 UTC format with nanosecond precision
159+
:param expected_response_code: the expected status code of response,
160+
defaults to 200
161+
:param database: database to query, defaults to None
162+
:param raise_errors: Whether or not to raise exceptions when InfluxDB
163+
returns errors, defaults to True
164+
:param chunked: Enable to use chunked responses from InfluxDB.
165+
With ``chunked`` enabled, one ResultSet is returned per chunk
166+
containing all results within that chunk
167+
:param chunk_size: Size of each chunk to tell InfluxDB to use.
168+
:param dropna: drop columns where all values are missing
169+
:returns: the queried data
170+
:rtype: :class:`~.ResultSet`
145171
"""
146-
results = super(DataFrameClient, self).query(query, database=database)
172+
query_args = dict(params=params,
173+
epoch=epoch,
174+
expected_response_code=expected_response_code,
175+
raise_errors=raise_errors,
176+
chunked=chunked,
177+
chunk_size=chunk_size)
178+
results = super(DataFrameClient, self).query(query, **query_args)
147179
if query.strip().upper().startswith("SELECT"):
148180
if len(results) > 0:
149-
return self._to_dataframe(results)
181+
return self._to_dataframe(results, dropna)
150182
else:
151183
return {}
152184
else:
153185
return results
154186

155-
def _to_dataframe(self, rs):
156-
result = {}
187+
def _to_dataframe(self, rs, dropna=True):
188+
result = defaultdict(list)
157189
if isinstance(rs, list):
158190
return map(self._to_dataframe, rs)
159191

@@ -168,6 +200,11 @@ def _to_dataframe(self, rs):
168200
df.set_index('time', inplace=True)
169201
df.index = df.index.tz_localize('UTC')
170202
df.index.name = None
203+
result[key].append(df)
204+
for key, data in result.items():
205+
df = pd.concat(data).sort_index()
206+
if dropna:
207+
df.dropna(how='all', axis=1, inplace=True)
171208
result[key] = df
172209

173210
return result

0 commit comments

Comments
 (0)