Skip to content

Commit 9006644

Browse files
committed
Merge pull request influxdata#38 from ReAzem/master
Added UDP support
2 parents 4aba3c1 + bc90e45 commit 9006644

File tree

2 files changed

+59
-15
lines changed

2 files changed

+59
-15
lines changed

influxdb/client.py

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
python client for influxdb
44
"""
55
import json
6-
6+
import socket
77
import requests
88
session = requests.Session()
99

@@ -12,14 +12,23 @@ class InfluxDBClientError(Exception):
1212
def __init__(self, message, code):
1313
self.message = message
1414
self.code = code
15-
15+
1616
class InfluxDBClient(object):
1717
"""
1818
InfluxDB Client
1919
"""
2020

21-
def __init__(self, host='localhost', port=8086, username='root',
22-
password='root', database=None, ssl=False, verify_ssl=False, timeout=0):
21+
def __init__(self,
22+
host='localhost',
23+
port=8086,
24+
username='root',
25+
password='root',
26+
database=None,
27+
ssl=False,
28+
verify_ssl=False,
29+
timeout=0,
30+
use_udp=False,
31+
udp_port=4444):
2332
"""
2433
Initialize client
2534
"""
@@ -32,6 +41,11 @@ def __init__(self, host='localhost', port=8086, username='root',
3241

3342
self._verify_ssl = verify_ssl
3443

44+
self.use_udp = use_udp
45+
self.udp_port = udp_port
46+
if use_udp:
47+
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
48+
3549
self._scheme = "http"
3650

3751
if ssl is True:
@@ -102,7 +116,7 @@ def request(self, url, method='GET', params=None, data=None,
102116

103117
if response.status_code == status_code:
104118
return response
105-
119+
106120
else:
107121
error = InfluxDBClientError("{0}: {1}".format(response.status_code, response.content), response.status_code)
108122
raise error
@@ -167,13 +181,16 @@ def write_points_with_precision(self, data, time_precision='s'):
167181
'time_precision': time_precision
168182
}
169183

170-
self.request(
171-
url=url,
172-
method='POST',
173-
params=params,
174-
data=data,
175-
status_code=200
176-
)
184+
if self.use_udp:
185+
self.send_packet(data)
186+
else:
187+
self.request(
188+
url=url,
189+
method='POST',
190+
params=params,
191+
data=data,
192+
status_code=200
193+
)
177194

178195
return True
179196

@@ -266,7 +283,7 @@ def query(self, query, time_precision='s', chunked=False):
266283
try:
267284
res = json.loads(response.content)
268285
except TypeError:
269-
# must decode in python 3
286+
# must decode in python 3
270287
res = json.loads(response.content.decode('utf8'))
271288

272289
return res
@@ -618,3 +635,8 @@ def update_permission(self, username, json_body):
618635
See also: src/api/http/api.go:l57
619636
"""
620637
raise NotImplementedError()
638+
639+
def send_packet(self, packet):
640+
data = json.dumps(packet)
641+
byte = data.encode('utf-8')
642+
self.udp_socket.sendto(byte, (self._host, self.udp_port))

tests/influxdb/client_test.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""
55
import json
66
import requests
7+
import socket
78
from nose.tools import raises
89
from mock import patch
910

@@ -36,7 +37,7 @@ def request(*args, **kwargs):
3637
assert isinstance(data, str)
3738

3839
# Data must be a JSON string
39-
assert c == json.loads(data)
40+
assert c == json.loads(data, strict=True)
4041

4142
c = data
4243

@@ -91,6 +92,27 @@ def test_write_points(self):
9192
cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')
9293
assert cli.write_points(data) is True
9394

95+
def test_write_points_udp(self):
96+
data = [
97+
{
98+
"points": [
99+
["1", 1, 1.0],
100+
["2", 2, 2.0]
101+
],
102+
"name": "foo",
103+
"columns": ["column_one", "column_two", "column_three"]
104+
}
105+
]
106+
107+
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
108+
s.bind(('0.0.0.0', 4444))
109+
110+
cli = InfluxDBClient('localhost', 8086, 'root', 'root', 'test', use_udp=True, udp_port=4444)
111+
cli.write_points(data)
112+
113+
received_data, addr = s.recvfrom(1024)
114+
assert data == json.loads(received_data.decode(), strict=True)
115+
94116
@raises(Exception)
95117
def test_write_points_fails(self):
96118
with _mocked_session('post', 500) as mocked:
@@ -273,4 +295,4 @@ def test_delete_database_user(self):
273295
@raises(NotImplementedError)
274296
def test_update_permission(self):
275297
cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')
276-
cli.update_permission('admin', [])
298+
cli.update_permission('admin', [])

0 commit comments

Comments
 (0)