@@ -35,9 +35,17 @@ class DataFrameClient(InfluxDBClient):
35
35
36
36
EPOCH = pd .Timestamp ('1970-01-01 00:00:00.000+00:00' )
37
37
38
- def write_points (self , dataframe , measurement , tags = None ,
39
- time_precision = None , database = None , retention_policy = None ,
40
- batch_size = None ):
38
+ def write_points (self ,
39
+ dataframe ,
40
+ measurement ,
41
+ tags = None ,
42
+ tag_columns = [],
43
+ field_columns = [],
44
+ time_precision = None ,
45
+ database = None ,
46
+ retention_policy = None ,
47
+ batch_size = None ,
48
+ protocol = 'line' ):
41
49
"""
42
50
Write to multiple time series names.
43
51
@@ -50,27 +58,60 @@ def write_points(self, dataframe, measurement, tags=None,
50
58
instead of all at one time. Useful for when doing data dumps from
51
59
one database to another or when doing a massive write operation
52
60
:type batch_size: int
61
+ :param protocol: Protocol for writing data. Either 'line' or 'json'.
53
62
54
63
"""
55
64
if batch_size :
56
- number_batches = int (math .ceil (
57
- len (dataframe ) / float (batch_size )))
65
+ number_batches = int (math .ceil (len (dataframe ) / float (batch_size )))
58
66
for batch in range (number_batches ):
59
67
start_index = batch * batch_size
60
68
end_index = (batch + 1 ) * batch_size
61
- points = self ._convert_dataframe_to_json (
62
- dataframe .ix [start_index :end_index ].copy (),
63
- measurement , tags , time_precision
64
- )
69
+ if protocol == 'line' :
70
+ points = self ._convert_dataframe_to_lines (
71
+ dataframe .ix [start_index :end_index ].copy (),
72
+ measurement = measurement ,
73
+ global_tags = tags ,
74
+ time_precision = time_precision ,
75
+ tag_columns = tag_columns ,
76
+ field_columns = field_columns )
77
+ else :
78
+ points = self ._convert_dataframe_to_json (
79
+ dataframe .ix [start_index :end_index ].copy (),
80
+ measurement = measurement ,
81
+ tags = tags ,
82
+ time_precision = time_precision ,
83
+ tag_columns = tag_columns ,
84
+ field_columns = field_columns )
65
85
super (DataFrameClient , self ).write_points (
66
- points , time_precision , database , retention_policy )
86
+ points ,
87
+ time_precision ,
88
+ database ,
89
+ retention_policy ,
90
+ protocol = 'line' )
67
91
return True
68
92
else :
69
- points = self ._convert_dataframe_to_json (
70
- dataframe , measurement , tags , time_precision
71
- )
93
+ if protocol == 'line' :
94
+ points = self ._convert_dataframe_to_lines (
95
+ dataframe ,
96
+ measurement = measurement ,
97
+ global_tags = tags ,
98
+ tag_columns = tag_columns ,
99
+ field_columns = field_columns ,
100
+ time_precision = time_precision )
101
+ else :
102
+ points = self ._convert_dataframe_to_json (
103
+ dataframe ,
104
+ measurement = measurement ,
105
+ tags = tags ,
106
+ time_precision = time_precision ,
107
+ tag_columns = tag_columns ,
108
+ field_columns = field_columns )
72
109
super (DataFrameClient , self ).write_points (
73
- points , time_precision , database , retention_policy )
110
+ points ,
111
+ time_precision ,
112
+ database ,
113
+ retention_policy ,
114
+ protocol = 'line' )
74
115
return True
75
116
76
117
def query (self , query , chunked = False , database = None ):
@@ -108,7 +149,12 @@ def _to_dataframe(self, rs):
108
149
result [key ] = df
109
150
return result
110
151
111
- def _convert_dataframe_to_json (self , dataframe , measurement , tags = None ,
152
+ def _convert_dataframe_to_json (self ,
153
+ dataframe ,
154
+ measurement ,
155
+ tags = None ,
156
+ tag_columns = [],
157
+ field_columns = [],
112
158
time_precision = None ):
113
159
114
160
if not isinstance (dataframe , pd .DataFrame ):
@@ -119,6 +165,15 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
119
165
raise TypeError ('Must be DataFrame with DatetimeIndex or \
120
166
PeriodIndex.' )
121
167
168
+ # Make sure tags and tag columns are correctly typed
169
+ tag_columns = tag_columns if tag_columns else []
170
+ field_columns = field_columns if field_columns else []
171
+ tags = tags if tags else {}
172
+ # Assume field columns are all columns not included in tag columns
173
+ if not field_columns :
174
+ field_columns = list (
175
+ set (dataframe .columns ).difference (set (tag_columns )))
176
+
122
177
dataframe .index = dataframe .index .to_datetime ()
123
178
if dataframe .index .tzinfo is None :
124
179
dataframe .index = dataframe .index .tz_localize ('UTC' )
@@ -140,13 +195,127 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
140
195
141
196
points = [
142
197
{'measurement' : measurement ,
143
- 'tags' : tags if tags else {} ,
198
+ 'tags' : dict ( list ( tag . items ()) + list ( tags . items ())) ,
144
199
'fields' : rec ,
145
- 'time' : int (ts .value / precision_factor )
146
- }
147
- for ts , rec in zip (dataframe .index , dataframe .to_dict ('record' ))]
200
+ 'time' : int (ts .value / precision_factor )}
201
+ for ts , tag , rec in zip (dataframe .index ,
202
+ dataframe [tag_columns ].to_dict ('record' ),
203
+ dataframe [field_columns ].to_dict ('record' ))
204
+ ]
205
+
148
206
return points
149
207
208
+ def _convert_dataframe_to_lines (self ,
209
+ dataframe ,
210
+ measurement ,
211
+ field_columns = [],
212
+ tag_columns = [],
213
+ global_tags = {},
214
+ time_precision = None ,
215
+ numeric_precision = None ):
216
+
217
+ if not isinstance (dataframe , pd .DataFrame ):
218
+ raise TypeError ('Must be DataFrame, but type was: {0}.'
219
+ .format (type (dataframe )))
220
+ if not (isinstance (dataframe .index , pd .tseries .period .PeriodIndex ) or
221
+ isinstance (dataframe .index , pd .tseries .index .DatetimeIndex )):
222
+ raise TypeError ('Must be DataFrame with DatetimeIndex or \
223
+ PeriodIndex.' )
224
+
225
+ column_series = pd .Series (dataframe .columns )
226
+
227
+ if field_columns is None :
228
+ field_columns = []
229
+ if tag_columns is None :
230
+ tag_columns = []
231
+
232
+ field_columns = list (field_columns ) if list (field_columns ) else []
233
+ tag_columns = list (tag_columns ) if list (tag_columns ) else []
234
+
235
+ # Assume that all columns not listed as tag columns are field columns
236
+ if not field_columns :
237
+ field_columns = list (column_series [~ column_series .isin (
238
+ tag_columns )])
239
+
240
+ precision_factor = {
241
+ "n" : 1 ,
242
+ "u" : 1e3 ,
243
+ "ms" : 1e6 ,
244
+ "s" : 1e9 ,
245
+ "m" : 1e9 * 60 ,
246
+ "h" : 1e9 * 3600 ,
247
+ }.get (time_precision , 1 )
248
+
249
+ # Make array of timestamp ints
250
+ time = ((dataframe .index .to_datetime ().values .astype (int ) /
251
+ precision_factor ).astype (int ).astype (str ))
252
+
253
+ # If tag columns exist, make an array of formatted tag keys and values
254
+ if tag_columns :
255
+ tag_df = dataframe [tag_columns ]
256
+ tag_df = self ._stringify_dataframe (
257
+ tag_df , numeric_precision , datatype = 'tag' )
258
+ tags = (',' + (
259
+ (tag_df .columns .values + '=' ).tolist () + tag_df )).sum (axis = 1 )
260
+ del tag_df
261
+
262
+ else :
263
+ tags = ''
264
+
265
+ # Make an array of formatted field keys and values
266
+ field_df = dataframe [field_columns ]
267
+ field_df = self ._stringify_dataframe (
268
+ field_df , numeric_precision , datatype = 'field' )
269
+ field_df = (field_df .columns .values + '=' ).tolist () + field_df
270
+ field_df [field_df .columns [1 :]] = ',' + field_df [field_df .columns [1 :]]
271
+ fields = field_df .sum (axis = 1 )
272
+ del field_df
273
+
274
+ # Add any global tags to formatted tag strings
275
+ if global_tags :
276
+ global_tags = ',' .join (['=' .join ([tag , global_tags [tag ]])
277
+ for tag in global_tags ])
278
+ if tag_columns :
279
+ tags = tags + ',' + global_tags
280
+ else :
281
+ tags = ',' + global_tags
282
+
283
+ # Generate line protocol string
284
+ points = (measurement + tags + ' ' + fields + ' ' + time ).tolist ()
285
+ return points
286
+
287
+ def _stringify_dataframe (self ,
288
+ dataframe ,
289
+ numeric_precision ,
290
+ datatype = 'field' ):
291
+ int_columns = dataframe .select_dtypes (include = ['int' ]).columns
292
+ float_columns = dataframe .select_dtypes (include = ['floating' ]).columns
293
+ nonfloat_columns = dataframe .columns [~ dataframe .columns .isin (
294
+ float_columns )]
295
+ numeric_columns = dataframe .select_dtypes (include = ['number' ]).columns
296
+ string_columns = dataframe .select_dtypes (include = ['object' ]).columns
297
+
298
+ # Convert dataframe to string
299
+ if numeric_precision is None :
300
+ dataframe = dataframe .astype (str )
301
+ elif numeric_precision == 'full' :
302
+ dataframe [float_columns ] = dataframe [float_columns ].applymap (repr )
303
+ dataframe [nonfloat_columns ] = (dataframe [nonfloat_columns ]
304
+ .astype (str ))
305
+ elif isinstance (numeric_precision , int ):
306
+ dataframe [numeric_columns ] = (dataframe [numeric_columns ]
307
+ .round (numeric_precision ))
308
+ dataframe = dataframe .astype (str )
309
+ else :
310
+ raise ValueError ('Invalid numeric precision' )
311
+
312
+ if datatype == 'field' :
313
+ dataframe [int_columns ] = dataframe [int_columns ] + 'i'
314
+ dataframe [string_columns ] = '"' + dataframe [string_columns ] + '"'
315
+
316
+ dataframe .columns = dataframe .columns .astype (str )
317
+ return dataframe
318
+
150
319
def _datetime_to_epoch (self , datetime , time_precision = 's' ):
151
320
seconds = (datetime - self .EPOCH ).total_seconds ()
152
321
if time_precision == 'h' :
0 commit comments