@@ -167,12 +167,14 @@ def request(self, url, method='GET', params=None, data=None,
167
167
# by doing a POST to /db/foo_production/series?u=some_user&p=some_password
168
168
# with a JSON body of points.
169
169
170
- def write_points (self , * args , ** kwargs ):
170
+ def write_points (self , data , * args , ** kwargs ):
171
171
"""
172
172
write_points()
173
173
174
174
Write to multiple time series names.
175
175
176
+ :param data: A list of dicts, or a dictionary mapping series names to
177
+ pandas DataFrames
176
178
:param batch_size: [Optional] Value to write the points in batches
177
179
instead of all at one time. Useful for when doing data dumps from
178
180
one database to another or when doing a massive write operation
@@ -185,27 +187,30 @@ def list_chunks(l, n):
185
187
for i in xrange (0 , len (l ), n ):
186
188
yield l [i :i + n ]
187
189
190
+ # check for pandas dataframe
191
+ if isinstance (data , dict ):
192
+ data = [self ._convert_dataframe_to_json (name = key , dataframe = value ) for key , value in data .items ()]
188
193
batch_size = kwargs .get ('batch_size' )
189
194
if batch_size :
190
- for data in kwargs . get ( ' data' ) :
191
- name = data .get ('name' )
192
- columns = data .get ('columns' )
193
- point_list = data .get ('points' )
195
+ for item in data :
196
+ name = item .get ('name' )
197
+ columns = item .get ('columns' )
198
+ point_list = item .get ('points' )
194
199
195
200
for batch in list_chunks (point_list , batch_size ):
196
- data = [{
201
+ item = [{
197
202
"points" : batch ,
198
203
"name" : name ,
199
204
"columns" : columns
200
205
}]
201
206
time_precision = kwargs .get ('time_precision' , 's' )
202
207
self .write_points_with_precision (
203
- data = data ,
208
+ data = item ,
204
209
time_precision = time_precision )
205
210
206
211
return True
207
212
208
- return self .write_points_with_precision (* args , ** kwargs )
213
+ return self .write_points_with_precision (data , * args , ** kwargs )
209
214
210
215
def write_points_with_precision (self , data , time_precision = 's' ):
211
216
"""
@@ -220,6 +225,10 @@ def write_points_with_precision(self, data, time_precision='s'):
220
225
"InfluxDB only supports seconds precision for udp writes"
221
226
)
222
227
228
+ # check for pandas dataframe
229
+ if isinstance (data , dict ):
230
+ data = [self ._convert_dataframe_to_json (name = key , dataframe = value ) for key , value in data .items ()]
231
+
223
232
url = "db/{0}/series" .format (self ._database )
224
233
225
234
params = {
@@ -239,6 +248,23 @@ def write_points_with_precision(self, data, time_precision='s'):
239
248
240
249
return True
241
250
251
+ def _convert_dataframe_to_json (self , dataframe , name ):
252
+ try :
253
+ import pandas as pd
254
+ except ImportError :
255
+ raise ImportError ('pandas required for writing as dataframe.' )
256
+ if not isinstance (dataframe , pd .DataFrame ):
257
+ raise TypeError ('Must be DataFrame, but type was: {}.' .format (type (dataframe )))
258
+ if not (isinstance (dataframe .index , pd .tseries .period .PeriodIndex ) or
259
+ isinstance (dataframe .index , pd .tseries .index .DatetimeIndex )):
260
+ raise TypeError ('Must be DataFrame with DatetimeIndex or PeriodIndex.' )
261
+ dataframe .index = dataframe .index .to_datetime ()
262
+ dataframe ['time' ] = [time .mktime (dt .timetuple ()) for dt in dataframe .index ]
263
+ data = {'name' :name ,
264
+ 'columns' :list (dataframe .columns ),
265
+ 'points' :list ([list (x ) for x in dataframe .values ])}
266
+ return data
267
+
242
268
# One Time Deletes
243
269
244
270
def delete_points (self , name ):
@@ -299,6 +325,13 @@ def remove_scheduled_delete(self, delete_id):
299
325
def query (self , query , time_precision = 's' , chunked = False , output_format = 'json' ):
300
326
"""
301
327
Quering data
328
+
329
+ :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms' or 'u'.
330
+ :param chunked: [Optional, default=False] True if the data shall be retrieved
331
+ in chunks, False otherwise.
332
+ :param output_format: [Optional, default 'json'] Format of the resulting
333
+ output. Can be 'json' or 'dataframe' for a pandas DataFrame.
334
+
302
335
"""
303
336
if time_precision not in ['s' , 'm' , 'ms' , 'u' ]:
304
337
raise Exception (
@@ -740,22 +773,3 @@ def send_packet(self, packet):
740
773
data = json .dumps (packet )
741
774
byte = data .encode ('utf-8' )
742
775
self .udp_socket .sendto (byte , (self ._host , self .udp_port ))
743
-
744
- def write_points_from_dataframe (self , dataframe , name ):
745
- try :
746
- import pandas as pd
747
- except ImportError :
748
- raise ImportError ('pandas required for writing as dataframe.' )
749
- if not isinstance (dataframe , pd .DataFrame ):
750
- raise TypeError ('Must be DataFrame, but type was: {}.' .format (type (dataframe )))
751
- if not (isinstance (dataframe .index , pd .tseries .period .PeriodIndex ) or
752
- isinstance (dataframe .index , pd .tseries .index .DatetimeIndex )):
753
- raise TypeError ('Must be DataFrame with DatetimeIndex or PeriodIndex.' )
754
- dataframe .index = dataframe .index .to_datetime ()
755
- dataframe ['time' ] = [time .mktime (dt .timetuple ()) for dt in dataframe .index ]
756
- data = dict ()
757
- data ['name' ] = name
758
- data ['columns' ] = list (dataframe .columns )
759
- data ['points' ] = list ([list (x ) for x in dataframe .values ])
760
- print (data )
761
- self .write_points (data = [data ], time_precision = 's' )
0 commit comments