Skip to content

Commit 62f7426

Browse files
committed
Changed DataFrameClient.write_points parameters.
1 parent 6969f75 commit 62f7426

File tree

3 files changed

+69
-66
lines changed

3 files changed

+69
-66
lines changed

influxdb/_dataframe_client.py

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,41 +30,43 @@ class DataFrameClient(InfluxDBClient):
3030

3131
EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
3232

33-
def write_points(self, data, time_precision=None, database=None,
34-
retention_policy=None, tags=None, batch_size=None):
33+
def write_points(self, dataframe, measurement, tags=None,
34+
time_precision=None, database=None, retention_policy=None,
35+
batch_size=None):
3536
"""
3637
Write to multiple time series names.
3738
38-
:param data: A dictionary mapping series to pandas DataFrames
39+
:param dataframe: data points in a DataFrame
40+
:param measurement: name of measurement
41+
:param tags: dictionary of tags, with string key-values
3942
:param time_precision: [Optional, default 's'] Either 's', 'ms', 'u'
4043
or 'n'.
4144
:param batch_size: [Optional] Value to write the points in batches
4245
instead of all at one time. Useful for when doing data dumps from
4346
one database to another or when doing a massive write operation
4447
:type batch_size: int
45-
"""
4648
49+
"""
4750
if batch_size:
48-
for key, data_frame in data.items():
49-
number_batches = int(math.ceil(
50-
len(data_frame) / float(batch_size)))
51-
for batch in range(number_batches):
52-
start_index = batch * batch_size
53-
end_index = (batch + 1) * batch_size
54-
data = self._convert_dataframe_to_json(
55-
key=key,
56-
dataframe=data_frame.ix[start_index:end_index].copy(),
57-
)
58-
super(DataFrameClient, self).write_points(
59-
data, time_precision, database, retention_policy, tags)
60-
return True
61-
else:
62-
for key, data_frame in data.items():
63-
data = self._convert_dataframe_to_json(
64-
key=key, dataframe=data_frame,
51+
number_batches = int(math.ceil(
52+
len(dataframe) / float(batch_size)))
53+
for batch in range(number_batches):
54+
start_index = batch * batch_size
55+
end_index = (batch + 1) * batch_size
56+
points = self._convert_dataframe_to_json(
57+
dataframe.ix[start_index:end_index].copy(),
58+
measurement,
59+
tags
6560
)
6661
super(DataFrameClient, self).write_points(
67-
data, time_precision, database, retention_policy, tags)
62+
points, time_precision, database, retention_policy)
63+
return True
64+
else:
65+
points = self._convert_dataframe_to_json(
66+
dataframe, measurement, tags
67+
)
68+
super(DataFrameClient, self).write_points(
69+
points, time_precision, database, retention_policy)
6870
return True
6971

7072
def query(self, query, chunked=False, database=None):
@@ -114,7 +116,7 @@ def _to_dataframe(self, rs):
114116
result[key] = df
115117
return result
116118

117-
def _convert_dataframe_to_json(self, key, dataframe):
119+
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
118120

119121
if not isinstance(dataframe, pd.DataFrame):
120122
raise TypeError('Must be DataFrame, but type was: {}.'
@@ -134,14 +136,9 @@ def _convert_dataframe_to_json(self, key, dataframe):
134136
# Convert dtype for json serialization
135137
dataframe = dataframe.astype('object')
136138

137-
if isinstance(key, str):
138-
name = key
139-
tags = None
140-
else:
141-
name, tags = key
142139
points = [
143-
{'name': name,
144-
'tags': dict(tags) if tags else {},
140+
{'name': measurement,
141+
'tags': tags if tags else {},
145142
'fields': rec,
146143
'timestamp': ts.isoformat()
147144
}

tests/influxdb/client_test_with_server.py

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -117,22 +117,27 @@ def point(serie_name, timestamp=None, tags=None, **fields):
117117

118118
if not using_pypy:
119119
dummy_pointDF = {
120-
("cpu_load_short", (("host", "server01"), ("region", "us-west"))):
121-
pd.DataFrame(
120+
"measurement": "cpu_load_short",
121+
"tags": {"host": "server01",
122+
"region": "us-west"},
123+
"dataframe": pd.DataFrame(
122124
[[0.64]], columns=['value'],
123125
index=pd.to_datetime(["2009-11-10T23:00:00Z"]))
124126
}
125-
dummy_pointsDF = {
126-
("cpu_load_short", (("host", "server01"), ("region", "us-west"))):
127-
pd.DataFrame(
127+
dummy_pointsDF = [{
128+
"measurement": "cpu_load_short",
129+
"tags": {"host": "server01", "region": "us-west"},
130+
"dataframe": pd.DataFrame(
128131
[[0.64]], columns=['value'],
129132
index=pd.to_datetime(["2009-11-10T23:00:00Z"])),
130-
("memory", (("host", "server01"), ("region", "us-west"))):
131-
pd.DataFrame(
133+
}, {
134+
"measurement": "memory",
135+
"tags": {"host": "server01", "region": "us-west"},
136+
"dataframe": pd.DataFrame(
132137
[[33]], columns=['value'],
133138
index=pd.to_datetime(["2009-11-10T23:01:35Z"])
134139
)
135-
}
140+
}]
136141

137142

138143
dummy_point_without_timestamp = [
@@ -507,7 +512,14 @@ def test_write_points(self):
507512
@skipIfPYpy
508513
def test_write_points_DF(self):
509514
""" same as test_write() but with write_points \o/ """
510-
self.assertIs(True, self.cliDF.write_points(dummy_pointDF))
515+
self.assertIs(
516+
True,
517+
self.cliDF.write_points(
518+
dummy_pointDF['dataframe'],
519+
dummy_pointDF['measurement'],
520+
dummy_pointDF['tags']
521+
)
522+
)
511523

512524
def test_write_points_check_read(self):
513525
""" same as test_write_check_read() but with write_points \o/ """
@@ -538,9 +550,7 @@ def test_write_points_check_read_DF(self):
538550
rsp = self.cliDF.query('SELECT * FROM cpu_load_short')
539551
assert_frame_equal(
540552
rsp['cpu_load_short'],
541-
dummy_pointDF[
542-
('cpu_load_short',
543-
(('host', 'server01'), ('region', 'us-west')))]
553+
dummy_pointDF['dataframe']
544554
)
545555

546556
# Query with Tags
@@ -549,9 +559,7 @@ def test_write_points_check_read_DF(self):
549559
assert_frame_equal(
550560
rsp[('cpu_load_short',
551561
(('host', 'server01'), ('region', 'us-west')))],
552-
dummy_pointDF[
553-
('cpu_load_short',
554-
(('host', 'server01'), ('region', 'us-west')))]
562+
dummy_pointDF['dataframe']
555563
)
556564

557565
def test_write_multiple_points_different_series(self):
@@ -574,25 +582,24 @@ def test_write_multiple_points_different_series(self):
574582

575583
@skipIfPYpy
576584
def test_write_multiple_points_different_series_DF(self):
577-
self.assertIs(True, self.cliDF.write_points(dummy_pointsDF))
585+
for i in range(2):
586+
self.assertIs(
587+
True, self.cliDF.write_points(
588+
dummy_pointsDF[i]['dataframe'],
589+
dummy_pointsDF[i]['measurement'],
590+
dummy_pointsDF[i]['tags']))
578591
time.sleep(1)
579592
rsp = self.cliDF.query('SELECT * FROM cpu_load_short')
580593

581594
assert_frame_equal(
582595
rsp['cpu_load_short'],
583-
dummy_pointsDF[
584-
('cpu_load_short', (('host', 'server01'),
585-
('region', 'us-west')))
586-
]
596+
dummy_pointsDF[0]['dataframe']
587597
)
588598

589599
rsp = self.cliDF.query('SELECT * FROM memory')
590600
assert_frame_equal(
591601
rsp['memory'],
592-
dummy_pointsDF[
593-
('memory', (('host', 'server01'),
594-
('region', 'us-west')))
595-
]
602+
dummy_pointsDF[1]['dataframe']
596603
)
597604

598605
def test_write_points_batch(self):

tests/influxdb/dataframe_client_test.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ def test_write_points_from_dataframe(self):
5656

5757
cli = DataFrameClient(database='db')
5858

59-
cli.write_points({"foo": dataframe})
59+
cli.write_points(dataframe, 'foo')
6060
self.assertEqual(json.loads(m.last_request.body), expected)
6161

62-
cli.write_points({("foo", None): dataframe})
62+
cli.write_points(dataframe, 'foo', tags=None)
6363
self.assertEqual(json.loads(m.last_request.body), expected)
6464

6565
def test_write_points_from_dataframe_in_batches(self):
@@ -73,8 +73,7 @@ def test_write_points_from_dataframe_in_batches(self):
7373
"http://localhost:8086/write")
7474

7575
cli = DataFrameClient(database='db')
76-
assert cli.write_points({("foo", None): dataframe},
77-
batch_size=1) is True
76+
assert cli.write_points(dataframe, "foo", batch_size=1) is True
7877

7978
def test_write_points_from_dataframe_with_numeric_column_names(self):
8079
now = pd.Timestamp('1970-01-01 00:00+00:00')
@@ -106,7 +105,7 @@ def test_write_points_from_dataframe_with_numeric_column_names(self):
106105
"http://localhost:8086/write")
107106

108107
cli = DataFrameClient(database='db')
109-
cli.write_points({("foo", (('hello', 'there'),)): dataframe})
108+
cli.write_points(dataframe, "foo", {"hello": "there"})
110109

111110
self.assertEqual(json.loads(m.last_request.body), expected)
112111

@@ -140,7 +139,7 @@ def test_write_points_from_dataframe_with_period_index(self):
140139
"http://localhost:8086/write")
141140

142141
cli = DataFrameClient(database='db')
143-
cli.write_points({"foo": dataframe})
142+
cli.write_points(dataframe, "foo")
144143

145144
self.assertEqual(json.loads(m.last_request.body), expected)
146145

@@ -175,17 +174,17 @@ def test_write_points_from_dataframe_with_time_precision(self):
175174
}
176175

177176
cli = DataFrameClient(database='db')
178-
key = "foo"
177+
measurement = "foo"
179178

180-
cli.write_points({key: dataframe}, time_precision='s')
179+
cli.write_points(dataframe, measurement, time_precision='s')
181180
points.update(precision='s')
182181
self.assertEqual(json.loads(m.last_request.body), points)
183182

184-
cli.write_points({key: dataframe}, time_precision='m')
183+
cli.write_points(dataframe, measurement, time_precision='m')
185184
points.update(precision='m')
186185
self.assertEqual(json.loads(m.last_request.body), points)
187186

188-
cli.write_points({key: dataframe}, time_precision='u')
187+
cli.write_points(dataframe, measurement, time_precision='u')
189188
points.update(precision='u')
190189
self.assertEqual(json.loads(m.last_request.body), points)
191190

@@ -200,7 +199,7 @@ def test_write_points_from_dataframe_fails_without_time_index(self):
200199
"http://localhost:8086/db/db/series")
201200

202201
cli = DataFrameClient(database='db')
203-
cli.write_points({"foo": dataframe})
202+
cli.write_points(dataframe, "foo")
204203

205204
@raises(TypeError)
206205
def test_write_points_from_dataframe_fails_with_series(self):
@@ -213,7 +212,7 @@ def test_write_points_from_dataframe_fails_with_series(self):
213212
"http://localhost:8086/db/db/series")
214213

215214
cli = DataFrameClient(database='db')
216-
cli.write_points({"foo": dataframe})
215+
cli.write_points(dataframe, "foo")
217216

218217
def test_query_into_dataframe(self):
219218
data = {

0 commit comments

Comments
 (0)