Skip to content

Commit 30e4e87

Browse files
committed
Addressed issues 362 and 363
1 parent 7debaca commit 30e4e87

File tree

5 files changed

+284
-57
lines changed

5 files changed

+284
-57
lines changed

influxdb/_dataframe_client.py

Lines changed: 188 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,17 @@ class DataFrameClient(InfluxDBClient):
3535

3636
EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
3737

38-
def write_points(self, dataframe, measurement, tags=None,
39-
time_precision=None, database=None, retention_policy=None,
40-
batch_size=None):
38+
def write_points(self,
39+
dataframe,
40+
measurement,
41+
tags=None,
42+
tag_columns=[],
43+
field_columns=[],
44+
time_precision=None,
45+
database=None,
46+
retention_policy=None,
47+
batch_size=None,
48+
protocol='line'):
4149
"""
4250
Write to multiple time series names.
4351
@@ -50,27 +58,60 @@ def write_points(self, dataframe, measurement, tags=None,
5058
instead of all at one time. Useful for when doing data dumps from
5159
one database to another or when doing a massive write operation
5260
:type batch_size: int
61+
:param protocol: Protocol for writing data. Either 'line' or 'json'.
5362
5463
"""
5564
if batch_size:
56-
number_batches = int(math.ceil(
57-
len(dataframe) / float(batch_size)))
65+
number_batches = int(math.ceil(len(dataframe) / float(batch_size)))
5866
for batch in range(number_batches):
5967
start_index = batch * batch_size
6068
end_index = (batch + 1) * batch_size
61-
points = self._convert_dataframe_to_json(
62-
dataframe.ix[start_index:end_index].copy(),
63-
measurement, tags, time_precision
64-
)
69+
if protocol == 'line':
70+
points = self._convert_dataframe_to_lines(
71+
dataframe.ix[start_index:end_index].copy(),
72+
measurement=measurement,
73+
global_tags=tags,
74+
time_precision=time_precision,
75+
tag_columns=tag_columns,
76+
field_columns=field_columns)
77+
else:
78+
points = self._convert_dataframe_to_json(
79+
dataframe.ix[start_index:end_index].copy(),
80+
measurement=measurement,
81+
tags=tags,
82+
time_precision=time_precision,
83+
tag_columns=tag_columns,
84+
field_columns=field_columns)
6585
super(DataFrameClient, self).write_points(
66-
points, time_precision, database, retention_policy)
86+
points,
87+
time_precision,
88+
database,
89+
retention_policy,
90+
protocol='line')
6791
return True
6892
else:
69-
points = self._convert_dataframe_to_json(
70-
dataframe, measurement, tags, time_precision
71-
)
93+
if protocol == 'line':
94+
points = self._convert_dataframe_to_lines(
95+
dataframe,
96+
measurement=measurement,
97+
global_tags=tags,
98+
tag_columns=tag_columns,
99+
field_columns=field_columns,
100+
time_precision=time_precision)
101+
else:
102+
points = self._convert_dataframe_to_json(
103+
dataframe,
104+
measurement=measurement,
105+
tags=tags,
106+
time_precision=time_precision,
107+
tag_columns=tag_columns,
108+
field_columns=field_columns)
72109
super(DataFrameClient, self).write_points(
73-
points, time_precision, database, retention_policy)
110+
points,
111+
time_precision,
112+
database,
113+
retention_policy,
114+
protocol='line')
74115
return True
75116

76117
def query(self, query, chunked=False, database=None):
@@ -108,7 +149,12 @@ def _to_dataframe(self, rs):
108149
result[key] = df
109150
return result
110151

111-
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
152+
def _convert_dataframe_to_json(self,
153+
dataframe,
154+
measurement,
155+
tags=None,
156+
tag_columns=[],
157+
field_columns=[],
112158
time_precision=None):
113159

114160
if not isinstance(dataframe, pd.DataFrame):
@@ -119,6 +165,15 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
119165
raise TypeError('Must be DataFrame with DatetimeIndex or \
120166
PeriodIndex.')
121167

168+
# Make sure tags and tag columns are correctly typed
169+
tag_columns = tag_columns if tag_columns else []
170+
field_columns = field_columns if field_columns else []
171+
tags = tags if tags else {}
172+
# Assume field columns are all columns not included in tag columns
173+
if not field_columns:
174+
field_columns = list(
175+
set(dataframe.columns).difference(set(tag_columns)))
176+
122177
dataframe.index = dataframe.index.to_datetime()
123178
if dataframe.index.tzinfo is None:
124179
dataframe.index = dataframe.index.tz_localize('UTC')
@@ -140,13 +195,127 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
140195

141196
points = [
142197
{'measurement': measurement,
143-
'tags': tags if tags else {},
198+
'tags': dict(list(tag.items()) + list(tags.items())),
144199
'fields': rec,
145-
'time': int(ts.value / precision_factor)
146-
}
147-
for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))]
200+
'time': int(ts.value / precision_factor)}
201+
for ts, tag, rec in zip(dataframe.index,
202+
dataframe[tag_columns].to_dict('record'),
203+
dataframe[field_columns].to_dict('record'))
204+
]
205+
148206
return points
149207

208+
def _convert_dataframe_to_lines(self,
209+
dataframe,
210+
measurement,
211+
field_columns=[],
212+
tag_columns=[],
213+
global_tags={},
214+
time_precision=None,
215+
numeric_precision=None):
216+
217+
if not isinstance(dataframe, pd.DataFrame):
218+
raise TypeError('Must be DataFrame, but type was: {0}.'
219+
.format(type(dataframe)))
220+
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
221+
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
222+
raise TypeError('Must be DataFrame with DatetimeIndex or \
223+
PeriodIndex.')
224+
225+
column_series = pd.Series(dataframe.columns)
226+
227+
if field_columns is None:
228+
field_columns = []
229+
if tag_columns is None:
230+
tag_columns = []
231+
232+
field_columns = list(field_columns) if list(field_columns) else []
233+
tag_columns = list(tag_columns) if list(tag_columns) else []
234+
235+
# Assume that all columns not listed as tag columns are field columns
236+
if not field_columns:
237+
field_columns = list(column_series[~column_series.isin(
238+
tag_columns)])
239+
240+
precision_factor = {
241+
"n": 1,
242+
"u": 1e3,
243+
"ms": 1e6,
244+
"s": 1e9,
245+
"m": 1e9 * 60,
246+
"h": 1e9 * 3600,
247+
}.get(time_precision, 1)
248+
249+
# Make array of timestamp ints
250+
time = ((dataframe.index.to_datetime().values.astype(int) /
251+
precision_factor).astype(int).astype(str))
252+
253+
# If tag columns exist, make an array of formatted tag keys and values
254+
if tag_columns:
255+
tag_df = dataframe[tag_columns]
256+
tag_df = self._stringify_dataframe(
257+
tag_df, numeric_precision, datatype='tag')
258+
tags = (',' + (
259+
(tag_df.columns.values + '=').tolist() + tag_df)).sum(axis=1)
260+
del tag_df
261+
262+
else:
263+
tags = ''
264+
265+
# Make an array of formatted field keys and values
266+
field_df = dataframe[field_columns]
267+
field_df = self._stringify_dataframe(
268+
field_df, numeric_precision, datatype='field')
269+
field_df = (field_df.columns.values + '=').tolist() + field_df
270+
field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
271+
fields = field_df.sum(axis=1)
272+
del field_df
273+
274+
# Add any global tags to formatted tag strings
275+
if global_tags:
276+
global_tags = ','.join(['='.join([tag, global_tags[tag]])
277+
for tag in global_tags])
278+
if tag_columns:
279+
tags = tags + ',' + global_tags
280+
else:
281+
tags = ',' + global_tags
282+
283+
# Generate line protocol string
284+
points = (measurement + tags + ' ' + fields + ' ' + time).tolist()
285+
return points
286+
287+
def _stringify_dataframe(self,
288+
dataframe,
289+
numeric_precision,
290+
datatype='field'):
291+
int_columns = dataframe.select_dtypes(include=['int']).columns
292+
float_columns = dataframe.select_dtypes(include=['floating']).columns
293+
nonfloat_columns = dataframe.columns[~dataframe.columns.isin(
294+
float_columns)]
295+
numeric_columns = dataframe.select_dtypes(include=['number']).columns
296+
string_columns = dataframe.select_dtypes(include=['object']).columns
297+
298+
# Convert dataframe to string
299+
if numeric_precision is None:
300+
dataframe = dataframe.astype(str)
301+
elif numeric_precision == 'full':
302+
dataframe[float_columns] = dataframe[float_columns].applymap(repr)
303+
dataframe[nonfloat_columns] = (dataframe[nonfloat_columns]
304+
.astype(str))
305+
elif isinstance(numeric_precision, int):
306+
dataframe[numeric_columns] = (dataframe[numeric_columns]
307+
.round(numeric_precision))
308+
dataframe = dataframe.astype(str)
309+
else:
310+
raise ValueError('Invalid numeric precision')
311+
312+
if datatype == 'field':
313+
dataframe[int_columns] = dataframe[int_columns] + 'i'
314+
dataframe[string_columns] = '"' + dataframe[string_columns] + '"'
315+
316+
dataframe.columns = dataframe.columns.astype(str)
317+
return dataframe
318+
150319
def _datetime_to_epoch(self, datetime, time_precision='s'):
151320
seconds = (datetime - self.EPOCH).total_seconds()
152321
if time_precision == 'h':

influxdb/client.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ def request(self, url, method='GET', params=None, data=None,
252252
else:
253253
raise InfluxDBClientError(response.content, response.status_code)
254254

255-
def write(self, data, params=None, expected_response_code=204):
255+
def write(self, data, params=None, expected_response_code=204,
256+
protocol='json'):
256257
"""Write data to InfluxDB.
257258
258259
:param data: the data to be written
@@ -274,11 +275,16 @@ def write(self, data, params=None, expected_response_code=204):
274275
else:
275276
precision = None
276277

278+
if protocol == 'json':
279+
data = make_lines(data, precision).encode('utf-8')
280+
elif protocol == 'line':
281+
data = ('\n'.join(data) + '\n').encode('utf-8')
282+
277283
self.request(
278284
url="write",
279285
method='POST',
280286
params=params,
281-
data=make_lines(data, precision).encode('utf-8'),
287+
data=data,
282288
expected_response_code=expected_response_code,
283289
headers=headers
284290
)
@@ -351,6 +357,7 @@ def write_points(self,
351357
retention_policy=None,
352358
tags=None,
353359
batch_size=None,
360+
protocol='json'
354361
):
355362
"""Write to multiple time series names.
356363
@@ -375,6 +382,7 @@ def write_points(self,
375382
:type batch_size: int
376383
:returns: True, if the operation is successful
377384
:rtype: bool
385+
:param protocol: Protocol for writing data. Either 'line' or 'json'.
378386
379387
.. note:: if no retention policy is specified, the default retention
380388
policy for the database is used
@@ -386,14 +394,14 @@ def write_points(self,
386394
time_precision=time_precision,
387395
database=database,
388396
retention_policy=retention_policy,
389-
tags=tags)
397+
tags=tags, protocol=protocol)
390398
return True
391399
else:
392400
return self._write_points(points=points,
393401
time_precision=time_precision,
394402
database=database,
395403
retention_policy=retention_policy,
396-
tags=tags)
404+
tags=tags, protocol=protocol)
397405

398406
def _batches(self, iterable, size):
399407
for i in xrange(0, len(iterable), size):
@@ -404,7 +412,8 @@ def _write_points(self,
404412
time_precision,
405413
database,
406414
retention_policy,
407-
tags):
415+
tags,
416+
protocol='json'):
408417
if time_precision not in ['n', 'u', 'ms', 's', 'm', 'h', None]:
409418
raise ValueError(
410419
"Invalid time precision is given. "
@@ -415,12 +424,15 @@ def _write_points(self,
415424
"InfluxDB only supports seconds precision for udp writes"
416425
)
417426

418-
data = {
419-
'points': points
420-
}
427+
if protocol == 'json':
428+
data = {
429+
'points': points
430+
}
421431

422-
if tags is not None:
423-
data['tags'] = tags
432+
if tags is not None:
433+
data['tags'] = tags
434+
else:
435+
data = points
424436

425437
params = {
426438
'db': database or self._database
@@ -433,12 +445,13 @@ def _write_points(self,
433445
params['rp'] = retention_policy
434446

435447
if self.use_udp:
436-
self.send_packet(data)
448+
self.send_packet(data, protocol=protocol)
437449
else:
438450
self.write(
439451
data=data,
440452
params=params,
441-
expected_response_code=204
453+
expected_response_code=204,
454+
protocol=protocol
442455
)
443456

444457
return True
@@ -737,13 +750,16 @@ def get_list_privileges(self, username):
737750
text = "SHOW GRANTS FOR {0}".format(username)
738751
return list(self.query(text).get_points())
739752

740-
def send_packet(self, packet):
753+
def send_packet(self, packet, protocol='json'):
741754
"""Send an UDP packet.
742755
743756
:param packet: the packet to be sent
744757
:type packet: dict
745758
"""
746-
data = make_lines(packet).encode('utf-8')
759+
if protocol == 'json':
760+
data = make_lines(packet).encode('utf-8')
761+
elif protocol == 'line':
762+
data = ('\n'.join(data) + '\n').encode('utf-8')
747763
self.udp_socket.sendto(data, (self._host, self.udp_port))
748764

749765

0 commit comments

Comments
 (0)