7
7
from __future__ import unicode_literals
8
8
9
9
import math
10
+ from collections import defaultdict
10
11
11
12
import pandas as pd
12
13
@@ -137,23 +138,54 @@ def write_points(self,
137
138
138
139
return True
139
140
140
- def query (self , query , chunked = False , database = None ):
141
- """Quering data into a DataFrame.
142
-
143
- :param chunked: [Optional, default=False] True if the data shall be
144
- retrieved in chunks, False otherwise.
141
+ def query (self ,
142
+ query ,
143
+ params = None ,
144
+ epoch = None ,
145
+ expected_response_code = 200 ,
146
+ database = None ,
147
+ raise_errors = True ,
148
+ chunked = False ,
149
+ chunk_size = 0 ,
150
+ dropna = True ):
151
+ """
152
+ Quering data into a DataFrame.
153
+
154
+ :param query: the actual query string
155
+ :param params: additional parameters for the request, defaults to {}
156
+ :param epoch: response timestamps to be in epoch format either 'h',
157
+ 'm', 's', 'ms', 'u', or 'ns',defaults to `None` which is
158
+ RFC3339 UTC format with nanosecond precision
159
+ :param expected_response_code: the expected status code of response,
160
+ defaults to 200
161
+ :param database: database to query, defaults to None
162
+ :param raise_errors: Whether or not to raise exceptions when InfluxDB
163
+ returns errors, defaults to True
164
+ :param chunked: Enable to use chunked responses from InfluxDB.
165
+ With ``chunked`` enabled, one ResultSet is returned per chunk
166
+ containing all results within that chunk
167
+ :param chunk_size: Size of each chunk to tell InfluxDB to use.
168
+ :param dropna: drop columns where all values are missing
169
+ :returns: the queried data
170
+ :rtype: :class:`~.ResultSet`
145
171
"""
146
- results = super (DataFrameClient , self ).query (query , database = database )
172
+ query_args = dict (params = params ,
173
+ epoch = epoch ,
174
+ expected_response_code = expected_response_code ,
175
+ raise_errors = raise_errors ,
176
+ chunked = chunked ,
177
+ chunk_size = chunk_size )
178
+ results = super (DataFrameClient , self ).query (query , ** query_args )
147
179
if query .strip ().upper ().startswith ("SELECT" ):
148
180
if len (results ) > 0 :
149
- return self ._to_dataframe (results )
181
+ return self ._to_dataframe (results , dropna )
150
182
else :
151
183
return {}
152
184
else :
153
185
return results
154
186
155
- def _to_dataframe (self , rs ):
156
- result = {}
187
+ def _to_dataframe (self , rs , dropna = True ):
188
+ result = defaultdict ( list )
157
189
if isinstance (rs , list ):
158
190
return map (self ._to_dataframe , rs )
159
191
@@ -168,6 +200,11 @@ def _to_dataframe(self, rs):
168
200
df .set_index ('time' , inplace = True )
169
201
df .index = df .index .tz_localize ('UTC' )
170
202
df .index .name = None
203
+ result [key ].append (df )
204
+ for key , data in result .items ():
205
+ df = pd .concat (data ).sort_index ()
206
+ if dropna :
207
+ df .dropna (how = 'all' , axis = 1 , inplace = True )
171
208
result [key ] = df
172
209
173
210
return result
0 commit comments