8
8
from __future__ import unicode_literals
9
9
10
10
import math
11
+ from collections import defaultdict
11
12
12
13
import pandas as pd
13
14
@@ -131,25 +132,26 @@ def write_points(self,
131
132
protocol = protocol )
132
133
return True
133
134
134
- def query (self , query , chunked = False , database = None ):
135
+ def query (self , query , dropna = True , ** kwargs ):
135
136
"""
136
137
Quering data into a DataFrame.
137
138
138
- :param chunked: [Optional, default=False] True if the data shall be
139
- retrieved in chunks, False otherwise.
139
+ :param query: the actual query string
140
+ :param dropna: drop columns where all values are missing
141
+ :param **kwargs: additional parameters for ``InfluxDBClient.query``
140
142
141
143
"""
142
- results = super (DataFrameClient , self ).query (query , database = database )
144
+ results = super (DataFrameClient , self ).query (query , ** kwargs )
143
145
if query .strip ().upper ().startswith ("SELECT" ):
144
146
if len (results ) > 0 :
145
- return self ._to_dataframe (results )
147
+ return self ._to_dataframe (results , dropna )
146
148
else :
147
149
return {}
148
150
else :
149
151
return results
150
152
151
- def _to_dataframe (self , rs ):
152
- result = {}
153
+ def _to_dataframe (self , rs , dropna = True ):
154
+ result = defaultdict ( list )
153
155
if isinstance (rs , list ):
154
156
return map (self ._to_dataframe , rs )
155
157
for key , data in rs .items ():
@@ -163,6 +165,11 @@ def _to_dataframe(self, rs):
163
165
df .set_index ('time' , inplace = True )
164
166
df .index = df .index .tz_localize ('UTC' )
165
167
df .index .name = None
168
+ result [key ].append (df )
169
+ for key , data in result .items ():
170
+ df = pd .concat (data ).sort_index ()
171
+ if dropna :
172
+ df .dropna (how = 'all' , axis = 1 , inplace = True )
166
173
result [key ] = df
167
174
return result
168
175
0 commit comments