@@ -35,9 +35,18 @@ class DataFrameClient(InfluxDBClient):
35
35
36
36
EPOCH = pd .Timestamp ('1970-01-01 00:00:00.000+00:00' )
37
37
38
- def write_points (self , dataframe , measurement , tags = None ,
39
- time_precision = None , database = None , retention_policy = None ,
40
- batch_size = None ):
38
+ def write_points (self ,
39
+ dataframe ,
40
+ measurement ,
41
+ tags = None ,
42
+ tag_columns = [],
43
+ field_columns = [],
44
+ time_precision = None ,
45
+ database = None ,
46
+ retention_policy = None ,
47
+ batch_size = None ,
48
+ protocol = 'line' ,
49
+ numeric_precision = None ):
41
50
"""
42
51
Write to multiple time series names.
43
52
@@ -50,27 +59,67 @@ def write_points(self, dataframe, measurement, tags=None,
50
59
instead of all at one time. Useful for when doing data dumps from
51
60
one database to another or when doing a massive write operation
52
61
:type batch_size: int
62
+ :param protocol: Protocol for writing data. Either 'line' or 'json'.
63
+ :param numeric_precision: Precision for floating point values.
64
+ Either None, 'full' or some int, where int is the desired decimal
65
+ precision. 'full' preserves full precision for int and float
66
+ datatypes. Defaults to None, which preserves 14-15 significant
67
+ figures for float and all significant figures for int datatypes.
53
68
54
69
"""
55
70
if batch_size :
56
- number_batches = int (math .ceil (
57
- len (dataframe ) / float (batch_size )))
71
+ number_batches = int (math .ceil (len (dataframe ) / float (batch_size )))
58
72
for batch in range (number_batches ):
59
73
start_index = batch * batch_size
60
74
end_index = (batch + 1 ) * batch_size
61
- points = self ._convert_dataframe_to_json (
62
- dataframe .ix [start_index :end_index ].copy (),
63
- measurement , tags , time_precision
64
- )
75
+ if protocol == 'line' :
76
+ points = self ._convert_dataframe_to_lines (
77
+ dataframe .ix [start_index :end_index ].copy (),
78
+ measurement = measurement ,
79
+ global_tags = tags ,
80
+ time_precision = time_precision ,
81
+ tag_columns = tag_columns ,
82
+ field_columns = field_columns ,
83
+ numeric_precision = numeric_precision )
84
+ else :
85
+ points = self ._convert_dataframe_to_json (
86
+ dataframe .ix [start_index :end_index ].copy (),
87
+ measurement = measurement ,
88
+ tags = tags ,
89
+ time_precision = time_precision ,
90
+ tag_columns = tag_columns ,
91
+ field_columns = field_columns )
65
92
super (DataFrameClient , self ).write_points (
66
- points , time_precision , database , retention_policy )
93
+ points ,
94
+ time_precision ,
95
+ database ,
96
+ retention_policy ,
97
+ protocol = protocol )
67
98
return True
68
99
else :
69
- points = self ._convert_dataframe_to_json (
70
- dataframe , measurement , tags , time_precision
71
- )
100
+ if protocol == 'line' :
101
+ points = self ._convert_dataframe_to_lines (
102
+ dataframe ,
103
+ measurement = measurement ,
104
+ global_tags = tags ,
105
+ tag_columns = tag_columns ,
106
+ field_columns = field_columns ,
107
+ time_precision = time_precision ,
108
+ numeric_precision = numeric_precision )
109
+ else :
110
+ points = self ._convert_dataframe_to_json (
111
+ dataframe ,
112
+ measurement = measurement ,
113
+ tags = tags ,
114
+ time_precision = time_precision ,
115
+ tag_columns = tag_columns ,
116
+ field_columns = field_columns )
72
117
super (DataFrameClient , self ).write_points (
73
- points , time_precision , database , retention_policy )
118
+ points ,
119
+ time_precision ,
120
+ database ,
121
+ retention_policy ,
122
+ protocol = protocol )
74
123
return True
75
124
76
125
def query (self , query , chunked = False , database = None ):
@@ -108,7 +157,12 @@ def _to_dataframe(self, rs):
108
157
result [key ] = df
109
158
return result
110
159
111
- def _convert_dataframe_to_json (self , dataframe , measurement , tags = None ,
160
+ def _convert_dataframe_to_json (self ,
161
+ dataframe ,
162
+ measurement ,
163
+ tags = None ,
164
+ tag_columns = [],
165
+ field_columns = [],
112
166
time_precision = None ):
113
167
114
168
if not isinstance (dataframe , pd .DataFrame ):
@@ -119,6 +173,15 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
119
173
raise TypeError ('Must be DataFrame with DatetimeIndex or \
120
174
PeriodIndex.' )
121
175
176
+ # Make sure tags and tag columns are correctly typed
177
+ tag_columns = tag_columns if tag_columns else []
178
+ field_columns = field_columns if field_columns else []
179
+ tags = tags if tags else {}
180
+ # Assume field columns are all columns not included in tag columns
181
+ if not field_columns :
182
+ field_columns = list (
183
+ set (dataframe .columns ).difference (set (tag_columns )))
184
+
122
185
dataframe .index = dataframe .index .to_datetime ()
123
186
if dataframe .index .tzinfo is None :
124
187
dataframe .index = dataframe .index .tz_localize ('UTC' )
@@ -140,13 +203,151 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
140
203
141
204
points = [
142
205
{'measurement' : measurement ,
143
- 'tags' : tags if tags else {} ,
206
+ 'tags' : dict ( list ( tag . items ()) + list ( tags . items ())) ,
144
207
'fields' : rec ,
145
- 'time' : int (ts .value / precision_factor )
146
- }
147
- for ts , rec in zip (dataframe .index , dataframe .to_dict ('record' ))]
208
+ 'time' : int (ts .value / precision_factor )}
209
+ for ts , tag , rec in zip (dataframe .index ,
210
+ dataframe [tag_columns ].to_dict ('record' ),
211
+ dataframe [field_columns ].to_dict ('record' ))
212
+ ]
213
+
214
+ return points
215
+
216
+ def _convert_dataframe_to_lines (self ,
217
+ dataframe ,
218
+ measurement ,
219
+ field_columns = [],
220
+ tag_columns = [],
221
+ global_tags = {},
222
+ time_precision = None ,
223
+ numeric_precision = None ):
224
+
225
+ if not isinstance (dataframe , pd .DataFrame ):
226
+ raise TypeError ('Must be DataFrame, but type was: {0}.'
227
+ .format (type (dataframe )))
228
+ if not (isinstance (dataframe .index , pd .tseries .period .PeriodIndex ) or
229
+ isinstance (dataframe .index , pd .tseries .index .DatetimeIndex )):
230
+ raise TypeError ('Must be DataFrame with DatetimeIndex or \
231
+ PeriodIndex.' )
232
+
233
+ # Create a Series of columns for easier indexing
234
+ column_series = pd .Series (dataframe .columns )
235
+
236
+ if field_columns is None :
237
+ field_columns = []
238
+ if tag_columns is None :
239
+ tag_columns = []
240
+
241
+ # Make sure field_columns and tag_columns are lists
242
+ field_columns = list (field_columns ) if list (field_columns ) else []
243
+ tag_columns = list (tag_columns ) if list (tag_columns ) else []
244
+
245
+ # If field columns but no tag columns, assume rest of columns are tags
246
+ if field_columns and (not tag_columns ):
247
+ tag_columns = list (column_series [~ column_series .isin (
248
+ field_columns )])
249
+
250
+ # If no field columns, assume non-tag columns are fields
251
+ if not field_columns :
252
+ field_columns = list (column_series [~ column_series .isin (
253
+ tag_columns )])
254
+
255
+ precision_factor = {
256
+ "n" : 1 ,
257
+ "u" : 1e3 ,
258
+ "ms" : 1e6 ,
259
+ "s" : 1e9 ,
260
+ "m" : 1e9 * 60 ,
261
+ "h" : 1e9 * 3600 ,
262
+ }.get (time_precision , 1 )
263
+
264
+ # Make array of timestamp ints
265
+ time = ((dataframe .index .to_datetime ().values .astype (int ) /
266
+ precision_factor ).astype (int ).astype (str ))
267
+
268
+ # If tag columns exist, make an array of formatted tag keys and values
269
+ if tag_columns :
270
+ tag_df = dataframe [tag_columns ]
271
+ tag_df = self ._stringify_dataframe (
272
+ tag_df , numeric_precision , datatype = 'tag' )
273
+ tags = (',' + (
274
+ (tag_df .columns .values + '=' ).tolist () + tag_df )).sum (axis = 1 )
275
+ del tag_df
276
+
277
+ else :
278
+ tags = ''
279
+
280
+ # Make an array of formatted field keys and values
281
+ field_df = dataframe [field_columns ]
282
+ field_df = self ._stringify_dataframe (
283
+ field_df , numeric_precision , datatype = 'field' )
284
+ field_df = (field_df .columns .values + '=' ).tolist () + field_df
285
+ field_df [field_df .columns [1 :]] = ',' + field_df [field_df .columns [1 :]]
286
+ fields = field_df .sum (axis = 1 )
287
+ del field_df
288
+
289
+ # Add any global tags to formatted tag strings
290
+ if global_tags :
291
+ global_tags = ',' .join (['=' .join ([tag , global_tags [tag ]])
292
+ for tag in global_tags ])
293
+ if tag_columns :
294
+ tags = tags + ',' + global_tags
295
+ else :
296
+ tags = ',' + global_tags
297
+
298
+ # Generate line protocol string
299
+ points = (measurement + tags + ' ' + fields + ' ' + time ).tolist ()
148
300
return points
149
301
302
+ def _stringify_dataframe (self ,
303
+ dataframe ,
304
+ numeric_precision ,
305
+ datatype = 'field' ):
306
+
307
+ # Find int and string columns for field-type data
308
+ int_columns = dataframe .select_dtypes (include = ['integer' ]).columns
309
+ string_columns = dataframe .select_dtypes (include = ['object' ]).columns
310
+
311
+ # Convert dataframe to string
312
+ if numeric_precision is None :
313
+ # If no precision specified, convert directly to string (fast)
314
+ dataframe = dataframe .astype (str )
315
+ elif numeric_precision == 'full' :
316
+ # If full precision, use repr to get full float precision
317
+ float_columns = (dataframe .select_dtypes (include = ['floating' ])
318
+ .columns )
319
+ nonfloat_columns = dataframe .columns [~ dataframe .columns .isin (
320
+ float_columns )]
321
+ dataframe [float_columns ] = dataframe [float_columns ].applymap (repr )
322
+ dataframe [nonfloat_columns ] = (dataframe [nonfloat_columns ]
323
+ .astype (str ))
324
+ elif isinstance (numeric_precision , int ):
325
+ # If precision is specified, round to appropriate precision
326
+ float_columns = (dataframe .select_dtypes (include = ['floating' ])
327
+ .columns )
328
+ nonfloat_columns = dataframe .columns [~ dataframe .columns .isin (
329
+ float_columns )]
330
+ dataframe [float_columns ] = (dataframe [float_columns ]
331
+ .round (numeric_precision ))
332
+ # If desired precision is > 10 decimal places, need to use repr
333
+ if numeric_precision > 10 :
334
+ dataframe [float_columns ] = (dataframe [float_columns ]
335
+ .applymap (repr ))
336
+ dataframe [nonfloat_columns ] = (dataframe [nonfloat_columns ]
337
+ .astype (str ))
338
+ else :
339
+ dataframe = dataframe .astype (str )
340
+ else :
341
+ raise ValueError ('Invalid numeric precision.' )
342
+
343
+ if datatype == 'field' :
344
+ # If dealing with fields, format ints and strings correctly
345
+ dataframe [int_columns ] = dataframe [int_columns ] + 'i'
346
+ dataframe [string_columns ] = '"' + dataframe [string_columns ] + '"'
347
+
348
+ dataframe .columns = dataframe .columns .astype (str )
349
+ return dataframe
350
+
150
351
def _datetime_to_epoch (self , datetime , time_precision = 's' ):
151
352
seconds = (datetime - self .EPOCH ).total_seconds ()
152
353
if time_precision == 'h' :
0 commit comments