Skip to content

Commit de44289

Browse files
authored
Make batched writing support all iterables (influxdata#746)
* Make batched writing support all iterables * Also test batching generator against real server * Fix PEP257 error * Import itertools functions directly
1 parent d590119 commit de44289

File tree

3 files changed

+69
-3
lines changed

3 files changed

+69
-3
lines changed

influxdb/client.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
import socket
1616
import struct
1717
import time
18+
from itertools import chain, islice
1819

1920
import msgpack
2021
import requests
2122
import requests.exceptions
22-
from six.moves import xrange
2323
from six.moves.urllib.parse import urlparse
2424

2525
from influxdb.line_protocol import make_lines, quote_ident, quote_literal
@@ -597,8 +597,17 @@ def ping(self):
597597

598598
@staticmethod
599599
def _batches(iterable, size):
600-
for i in xrange(0, len(iterable), size):
601-
yield iterable[i:i + size]
600+
# Iterate over an iterable producing iterables of batches. Based on:
601+
# http://code.activestate.com/recipes/303279-getting-items-in-batches/
602+
iterator = iter(iterable)
603+
while True:
604+
try: # Try get the first element in the iterator...
605+
head = (next(iterator),)
606+
except StopIteration:
607+
return # ...so that we can stop if there isn't one
608+
# Otherwise, lazily slice the rest of the batch
609+
rest = islice(iterator, size - 1)
610+
yield chain(head, rest)
602611

603612
def _write_points(self,
604613
points,

influxdb/tests/client_test.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,36 @@ def test_write_points_batch(self):
332332
self.assertEqual(expected_last_body,
333333
m.last_request.body.decode('utf-8'))
334334

335+
def test_write_points_batch_generator(self):
336+
"""Test write points batch from a generator for TestInfluxDBClient."""
337+
dummy_points = [
338+
{"measurement": "cpu_usage", "tags": {"unit": "percent"},
339+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
340+
{"measurement": "network", "tags": {"direction": "in"},
341+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
342+
{"measurement": "network", "tags": {"direction": "out"},
343+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
344+
]
345+
dummy_points_generator = (point for point in dummy_points)
346+
expected_last_body = (
347+
"network,direction=out,host=server01,region=us-west "
348+
"value=12.0 1257894000000000000\n"
349+
)
350+
351+
with requests_mock.Mocker() as m:
352+
m.register_uri(requests_mock.POST,
353+
"http://localhost:8086/write",
354+
status_code=204)
355+
cli = InfluxDBClient(database='db')
356+
cli.write_points(points=dummy_points_generator,
357+
database='db',
358+
tags={"host": "server01",
359+
"region": "us-west"},
360+
batch_size=2)
361+
self.assertEqual(m.call_count, 2)
362+
self.assertEqual(expected_last_body,
363+
m.last_request.body.decode('utf-8'))
364+
335365
def test_write_points_udp(self):
336366
"""Test write points UDP for TestInfluxDBClient object."""
337367
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

influxdb/tests/server_tests/client_test_with_server.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,33 @@ def test_write_points_batch(self):
452452
self.assertIn(12, net_out['series'][0]['values'][0])
453453
self.assertIn(12.34, cpu['series'][0]['values'][0])
454454

455+
def test_write_points_batch_generator(self):
456+
"""Test writing points in a batch from a generator."""
457+
dummy_points = [
458+
{"measurement": "cpu_usage", "tags": {"unit": "percent"},
459+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
460+
{"measurement": "network", "tags": {"direction": "in"},
461+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
462+
{"measurement": "network", "tags": {"direction": "out"},
463+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
464+
]
465+
dummy_points_generator = (point for point in dummy_points)
466+
self.cli.write_points(points=dummy_points_generator,
467+
tags={"host": "server01",
468+
"region": "us-west"},
469+
batch_size=2)
470+
time.sleep(5)
471+
net_in = self.cli.query("SELECT value FROM network "
472+
"WHERE direction=$dir",
473+
bind_params={'dir': 'in'}
474+
).raw
475+
net_out = self.cli.query("SELECT value FROM network "
476+
"WHERE direction='out'").raw
477+
cpu = self.cli.query("SELECT value FROM cpu_usage").raw
478+
self.assertIn(123, net_in['series'][0]['values'][0])
479+
self.assertIn(12, net_out['series'][0]['values'][0])
480+
self.assertIn(12.34, cpu['series'][0]['values'][0])
481+
455482
def test_query(self):
456483
"""Test querying data back from server."""
457484
self.assertIs(True, self.cli.write_points(dummy_point))

0 commit comments

Comments
 (0)