Skip to content

Commit 2e9fbf5

Browse files
committed
Moved writing DataFrames to write_points
1 parent b716461 commit 2e9fbf5

File tree

2 files changed

+60
-62
lines changed

2 files changed

+60
-62
lines changed

influxdb/client.py

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,14 @@ def request(self, url, method='GET', params=None, data=None,
167167
# by doing a POST to /db/foo_production/series?u=some_user&p=some_password
168168
# with a JSON body of points.
169169

170-
def write_points(self, *args, **kwargs):
170+
def write_points(self, data, *args, **kwargs):
171171
"""
172172
write_points()
173173
174174
Write to multiple time series names.
175175
176+
:param data: A list of dicts, or a dictionary mapping series names to
177+
pandas DataFrames
176178
:param batch_size: [Optional] Value to write the points in batches
177179
instead of all at one time. Useful for when doing data dumps from
178180
one database to another or when doing a massive write operation
@@ -185,27 +187,30 @@ def list_chunks(l, n):
185187
for i in xrange(0, len(l), n):
186188
yield l[i:i + n]
187189

190+
# check for pandas dataframe
191+
if isinstance(data, dict):
192+
data = [self._convert_dataframe_to_json(name=key, dataframe=value) for key, value in data.items()]
188193
batch_size = kwargs.get('batch_size')
189194
if batch_size:
190-
for data in kwargs.get('data'):
191-
name = data.get('name')
192-
columns = data.get('columns')
193-
point_list = data.get('points')
195+
for item in data:
196+
name = item.get('name')
197+
columns = item.get('columns')
198+
point_list = item.get('points')
194199

195200
for batch in list_chunks(point_list, batch_size):
196-
data = [{
201+
item = [{
197202
"points": batch,
198203
"name": name,
199204
"columns": columns
200205
}]
201206
time_precision = kwargs.get('time_precision', 's')
202207
self.write_points_with_precision(
203-
data=data,
208+
data=item,
204209
time_precision=time_precision)
205210

206211
return True
207212

208-
return self.write_points_with_precision(*args, **kwargs)
213+
return self.write_points_with_precision(data, *args, **kwargs)
209214

210215
def write_points_with_precision(self, data, time_precision='s'):
211216
"""
@@ -220,6 +225,10 @@ def write_points_with_precision(self, data, time_precision='s'):
220225
"InfluxDB only supports seconds precision for udp writes"
221226
)
222227

228+
# check for pandas dataframe
229+
if isinstance(data, dict):
230+
data = [self._convert_dataframe_to_json(name=key, dataframe=value) for key, value in data.items()]
231+
223232
url = "db/{0}/series".format(self._database)
224233

225234
params = {
@@ -239,6 +248,23 @@ def write_points_with_precision(self, data, time_precision='s'):
239248

240249
return True
241250

251+
def _convert_dataframe_to_json(self, dataframe, name):
252+
try:
253+
import pandas as pd
254+
except ImportError:
255+
raise ImportError('pandas required for writing as dataframe.')
256+
if not isinstance(dataframe, pd.DataFrame):
257+
raise TypeError('Must be DataFrame, but type was: {}.'.format(type(dataframe)))
258+
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
259+
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
260+
raise TypeError('Must be DataFrame with DatetimeIndex or PeriodIndex.')
261+
dataframe.index = dataframe.index.to_datetime()
262+
dataframe['time'] = [time.mktime(dt.timetuple()) for dt in dataframe.index]
263+
data = {'name':name,
264+
'columns':list(dataframe.columns),
265+
'points':list([list(x) for x in dataframe.values])}
266+
return data
267+
242268
# One Time Deletes
243269

244270
def delete_points(self, name):
@@ -299,6 +325,13 @@ def remove_scheduled_delete(self, delete_id):
299325
def query(self, query, time_precision='s', chunked=False, output_format='json'):
300326
"""
301327
Quering data
328+
329+
:param time_precision: [Optional, default 's'] Either 's', 'm', 'ms' or 'u'.
330+
:param chunked: [Optional, default=False] True if the data shall be retrieved
331+
in chunks, False otherwise.
332+
:param output_format: [Optional, default 'json'] Format of the resulting
333+
output. Can be 'json' or 'dataframe' for a pandas DataFrame.
334+
302335
"""
303336
if time_precision not in ['s', 'm', 'ms', 'u']:
304337
raise Exception(
@@ -740,22 +773,3 @@ def send_packet(self, packet):
740773
data = json.dumps(packet)
741774
byte = data.encode('utf-8')
742775
self.udp_socket.sendto(byte, (self._host, self.udp_port))
743-
744-
def write_points_from_dataframe(self, dataframe, name):
745-
try:
746-
import pandas as pd
747-
except ImportError:
748-
raise ImportError('pandas required for writing as dataframe.')
749-
if not isinstance(dataframe, pd.DataFrame):
750-
raise TypeError('Must be DataFrame, but type was: {}.'.format(type(dataframe)))
751-
if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
752-
isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
753-
raise TypeError('Must be DataFrame with DatetimeIndex or PeriodIndex.')
754-
dataframe.index = dataframe.index.to_datetime()
755-
dataframe['time'] = [time.mktime(dt.timetuple()) for dt in dataframe.index]
756-
data = dict()
757-
data['name'] = name
758-
data['columns'] = list(dataframe.columns)
759-
data['points'] = list([list(x) for x in dataframe.values])
760-
print(data)
761-
self.write_points(data=[data], time_precision='s')

tests/influxdb/client_test.py

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ def test_update_permission(self):
599599

600600
def test_write_points_from_dataframe(self):
601601
now = datetime(2014, 11, 15, 15, 42, 44, 543)
602-
self.dummy_points = [
602+
points = [
603603
{
604604
"points": [
605605
["1", 1, 1.0, time.mktime(now.timetuple())],
@@ -609,29 +609,21 @@ def test_write_points_from_dataframe(self):
609609
"columns": ["column_one", "column_two", "column_three", "time"]
610610
}
611611
]
612-
self.dummy_dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
613-
index = [now, now + timedelta(hours=1)],
614-
columns=["column_one", "column_two", "column_three"])
612+
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
613+
index = [now, now + timedelta(hours=1)],
614+
columns=["column_one", "column_two", "column_three"])
615615

616616
with requests_mock.Mocker() as m:
617-
m.register_uri(
618-
requests_mock.POST,
619-
"http://localhost:8086/db/db/series"
620-
)
617+
m.register_uri(requests_mock.POST, "http://localhost:8086/db/db/series")
621618

622619
cli = InfluxDBClient(database='db')
623-
cli.write_points_from_dataframe(
624-
self.dummy_dataframe, name="foo"
625-
)
620+
cli.write_points({"foo":dataframe})
626621

627-
self.assertListEqual(
628-
json.loads(m.last_request.body),
629-
self.dummy_points
630-
)
622+
self.assertListEqual(json.loads(m.last_request.body), points)
631623

632624
def test_write_points_from_dataframe_with_period_index(self):
633625
now = datetime(2014, 11, 16)
634-
self.dummy_points = [
626+
points = [
635627
{
636628
"points": [
637629
["1", 1, 1.0, time.mktime(now.timetuple())],
@@ -641,47 +633,39 @@ def test_write_points_from_dataframe_with_period_index(self):
641633
"columns": ["column_one", "column_two", "column_three", "time"]
642634
}
643635
]
644-
self.dummy_dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
645-
index = [pd.Period('2014-11-16'), pd.Period('2014-11-17')],
646-
columns=["column_one", "column_two", "column_three"])
636+
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
637+
index = [pd.Period('2014-11-16'), pd.Period('2014-11-17')],
638+
columns=["column_one", "column_two", "column_three"])
647639

648640
with requests_mock.Mocker() as m:
649-
m.register_uri(
650-
requests_mock.POST,
651-
"http://localhost:8086/db/db/series"
652-
)
641+
m.register_uri(requests_mock.POST, "http://localhost:8086/db/db/series")
653642

654643
cli = InfluxDBClient(database='db')
655-
cli.write_points_from_dataframe(
656-
self.dummy_dataframe, name="foo"
657-
)
644+
cli.write_points({"foo":dataframe})
658645

659-
self.assertListEqual(
660-
json.loads(m.last_request.body),
661-
self.dummy_points
662-
)
646+
self.assertListEqual(json.loads(m.last_request.body), points)
663647

664648
@raises(TypeError)
665649
def test_write_points_from_dataframe_fails_without_time_index(self):
666-
self.dummy_dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
667-
columns=["column_one", "column_two", "column_three"])
650+
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
651+
columns=["column_one", "column_two", "column_three"])
668652

669653
with requests_mock.Mocker() as m:
670654
m.register_uri(requests_mock.POST, "http://localhost:8086/db/db/series")
671655

672656
cli = InfluxDBClient(database='db')
673-
cli.write_points_from_dataframe(self.dummy_dataframe, name="foo")
657+
cli.write_points({"foo":dataframe})
674658

675659
@raises(TypeError)
676660
def test_write_points_from_dataframe_fails_with_anything_but_dataframe(self):
677661
now = datetime(2014, 11, 16)
678-
self.dummy_dataframe = pd.Series(data=[1.0, 2.0], index=[now, now+timedelta(hours=1)])
662+
dataframe = pd.Series(data=[1.0, 2.0], index=[now, now+timedelta(hours=1)])
679663

680664
with requests_mock.Mocker() as m:
681665
m.register_uri(requests_mock.POST, "http://localhost:8086/db/db/series")
682666

683667
cli = InfluxDBClient(database='db')
684-
cli.write_points_from_dataframe(self.dummy_dataframe, name="foo")
668+
cli.write_points({"foo":dataframe})
685669

686670
def test_query_into_dataframe(self):
687671
data = [

0 commit comments

Comments
 (0)