From 30e4e87a0f8fba92b8f9c3d6ec4e02d93671362c Mon Sep 17 00:00:00 2001 From: Matt Bartos Date: Mon, 29 Aug 2016 04:04:11 -0400 Subject: [PATCH 1/8] Addressed issues 362 and 363 --- influxdb/_dataframe_client.py | 207 ++++++++++++++++-- influxdb/client.py | 44 ++-- influxdb/tests/dataframe_client_test.py | 33 +-- influxdb/tests/misc.py | 30 +++ .../server_tests/client_test_with_server.py | 27 ++- 5 files changed, 284 insertions(+), 57 deletions(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 2980be90..855c8233 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -35,9 +35,17 @@ class DataFrameClient(InfluxDBClient): EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00') - def write_points(self, dataframe, measurement, tags=None, - time_precision=None, database=None, retention_policy=None, - batch_size=None): + def write_points(self, + dataframe, + measurement, + tags=None, + tag_columns=[], + field_columns=[], + time_precision=None, + database=None, + retention_policy=None, + batch_size=None, + protocol='line'): """ Write to multiple time series names. @@ -50,27 +58,60 @@ def write_points(self, dataframe, measurement, tags=None, instead of all at one time. Useful for when doing data dumps from one database to another or when doing a massive write operation :type batch_size: int + :param protocol: Protocol for writing data. Either 'line' or 'json'. """ if batch_size: - number_batches = int(math.ceil( - len(dataframe) / float(batch_size))) + number_batches = int(math.ceil(len(dataframe) / float(batch_size))) for batch in range(number_batches): start_index = batch * batch_size end_index = (batch + 1) * batch_size - points = self._convert_dataframe_to_json( - dataframe.ix[start_index:end_index].copy(), - measurement, tags, time_precision - ) + if protocol == 'line': + points = self._convert_dataframe_to_lines( + dataframe.ix[start_index:end_index].copy(), + measurement=measurement, + global_tags=tags, + time_precision=time_precision, + tag_columns=tag_columns, + field_columns=field_columns) + else: + points = self._convert_dataframe_to_json( + dataframe.ix[start_index:end_index].copy(), + measurement=measurement, + tags=tags, + time_precision=time_precision, + tag_columns=tag_columns, + field_columns=field_columns) super(DataFrameClient, self).write_points( - points, time_precision, database, retention_policy) + points, + time_precision, + database, + retention_policy, + protocol='line') return True else: - points = self._convert_dataframe_to_json( - dataframe, measurement, tags, time_precision - ) + if protocol == 'line': + points = self._convert_dataframe_to_lines( + dataframe, + measurement=measurement, + global_tags=tags, + tag_columns=tag_columns, + field_columns=field_columns, + time_precision=time_precision) + else: + points = self._convert_dataframe_to_json( + dataframe, + measurement=measurement, + tags=tags, + time_precision=time_precision, + tag_columns=tag_columns, + field_columns=field_columns) super(DataFrameClient, self).write_points( - points, time_precision, database, retention_policy) + points, + time_precision, + database, + retention_policy, + protocol='line') return True def query(self, query, chunked=False, database=None): @@ -108,7 +149,12 @@ def _to_dataframe(self, rs): result[key] = df return result - def _convert_dataframe_to_json(self, dataframe, measurement, tags=None, + def _convert_dataframe_to_json(self, + dataframe, + measurement, + tags=None, + tag_columns=[], + field_columns=[], time_precision=None): if not isinstance(dataframe, pd.DataFrame): @@ -119,6 +165,15 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None, raise TypeError('Must be DataFrame with DatetimeIndex or \ PeriodIndex.') + # Make sure tags and tag columns are correctly typed + tag_columns = tag_columns if tag_columns else [] + field_columns = field_columns if field_columns else [] + tags = tags if tags else {} + # Assume field columns are all columns not included in tag columns + if not field_columns: + field_columns = list( + set(dataframe.columns).difference(set(tag_columns))) + dataframe.index = dataframe.index.to_datetime() if dataframe.index.tzinfo is None: dataframe.index = dataframe.index.tz_localize('UTC') @@ -140,13 +195,127 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None, points = [ {'measurement': measurement, - 'tags': tags if tags else {}, + 'tags': dict(list(tag.items()) + list(tags.items())), 'fields': rec, - 'time': int(ts.value / precision_factor) - } - for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))] + 'time': int(ts.value / precision_factor)} + for ts, tag, rec in zip(dataframe.index, + dataframe[tag_columns].to_dict('record'), + dataframe[field_columns].to_dict('record')) + ] + return points + def _convert_dataframe_to_lines(self, + dataframe, + measurement, + field_columns=[], + tag_columns=[], + global_tags={}, + time_precision=None, + numeric_precision=None): + + if not isinstance(dataframe, pd.DataFrame): + raise TypeError('Must be DataFrame, but type was: {0}.' + .format(type(dataframe))) + if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or + isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)): + raise TypeError('Must be DataFrame with DatetimeIndex or \ + PeriodIndex.') + + column_series = pd.Series(dataframe.columns) + + if field_columns is None: + field_columns = [] + if tag_columns is None: + tag_columns = [] + + field_columns = list(field_columns) if list(field_columns) else [] + tag_columns = list(tag_columns) if list(tag_columns) else [] + + # Assume that all columns not listed as tag columns are field columns + if not field_columns: + field_columns = list(column_series[~column_series.isin( + tag_columns)]) + + precision_factor = { + "n": 1, + "u": 1e3, + "ms": 1e6, + "s": 1e9, + "m": 1e9 * 60, + "h": 1e9 * 3600, + }.get(time_precision, 1) + + # Make array of timestamp ints + time = ((dataframe.index.to_datetime().values.astype(int) / + precision_factor).astype(int).astype(str)) + + # If tag columns exist, make an array of formatted tag keys and values + if tag_columns: + tag_df = dataframe[tag_columns] + tag_df = self._stringify_dataframe( + tag_df, numeric_precision, datatype='tag') + tags = (',' + ( + (tag_df.columns.values + '=').tolist() + tag_df)).sum(axis=1) + del tag_df + + else: + tags = '' + + # Make an array of formatted field keys and values + field_df = dataframe[field_columns] + field_df = self._stringify_dataframe( + field_df, numeric_precision, datatype='field') + field_df = (field_df.columns.values + '=').tolist() + field_df + field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]] + fields = field_df.sum(axis=1) + del field_df + + # Add any global tags to formatted tag strings + if global_tags: + global_tags = ','.join(['='.join([tag, global_tags[tag]]) + for tag in global_tags]) + if tag_columns: + tags = tags + ',' + global_tags + else: + tags = ',' + global_tags + + # Generate line protocol string + points = (measurement + tags + ' ' + fields + ' ' + time).tolist() + return points + + def _stringify_dataframe(self, + dataframe, + numeric_precision, + datatype='field'): + int_columns = dataframe.select_dtypes(include=['int']).columns + float_columns = dataframe.select_dtypes(include=['floating']).columns + nonfloat_columns = dataframe.columns[~dataframe.columns.isin( + float_columns)] + numeric_columns = dataframe.select_dtypes(include=['number']).columns + string_columns = dataframe.select_dtypes(include=['object']).columns + + # Convert dataframe to string + if numeric_precision is None: + dataframe = dataframe.astype(str) + elif numeric_precision == 'full': + dataframe[float_columns] = dataframe[float_columns].applymap(repr) + dataframe[nonfloat_columns] = (dataframe[nonfloat_columns] + .astype(str)) + elif isinstance(numeric_precision, int): + dataframe[numeric_columns] = (dataframe[numeric_columns] + .round(numeric_precision)) + dataframe = dataframe.astype(str) + else: + raise ValueError('Invalid numeric precision') + + if datatype == 'field': + dataframe[int_columns] = dataframe[int_columns] + 'i' + dataframe[string_columns] = '"' + dataframe[string_columns] + '"' + + dataframe.columns = dataframe.columns.astype(str) + return dataframe + def _datetime_to_epoch(self, datetime, time_precision='s'): seconds = (datetime - self.EPOCH).total_seconds() if time_precision == 'h': diff --git a/influxdb/client.py b/influxdb/client.py index b6bbc72f..aae44158 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -252,7 +252,8 @@ def request(self, url, method='GET', params=None, data=None, else: raise InfluxDBClientError(response.content, response.status_code) - def write(self, data, params=None, expected_response_code=204): + def write(self, data, params=None, expected_response_code=204, + protocol='json'): """Write data to InfluxDB. :param data: the data to be written @@ -274,11 +275,16 @@ def write(self, data, params=None, expected_response_code=204): else: precision = None + if protocol == 'json': + data = make_lines(data, precision).encode('utf-8') + elif protocol == 'line': + data = ('\n'.join(data) + '\n').encode('utf-8') + self.request( url="write", method='POST', params=params, - data=make_lines(data, precision).encode('utf-8'), + data=data, expected_response_code=expected_response_code, headers=headers ) @@ -351,6 +357,7 @@ def write_points(self, retention_policy=None, tags=None, batch_size=None, + protocol='json' ): """Write to multiple time series names. @@ -375,6 +382,7 @@ def write_points(self, :type batch_size: int :returns: True, if the operation is successful :rtype: bool + :param protocol: Protocol for writing data. Either 'line' or 'json'. .. note:: if no retention policy is specified, the default retention policy for the database is used @@ -386,14 +394,14 @@ def write_points(self, time_precision=time_precision, database=database, retention_policy=retention_policy, - tags=tags) + tags=tags, protocol=protocol) return True else: return self._write_points(points=points, time_precision=time_precision, database=database, retention_policy=retention_policy, - tags=tags) + tags=tags, protocol=protocol) def _batches(self, iterable, size): for i in xrange(0, len(iterable), size): @@ -404,7 +412,8 @@ def _write_points(self, time_precision, database, retention_policy, - tags): + tags, + protocol='json'): if time_precision not in ['n', 'u', 'ms', 's', 'm', 'h', None]: raise ValueError( "Invalid time precision is given. " @@ -415,12 +424,15 @@ def _write_points(self, "InfluxDB only supports seconds precision for udp writes" ) - data = { - 'points': points - } + if protocol == 'json': + data = { + 'points': points + } - if tags is not None: - data['tags'] = tags + if tags is not None: + data['tags'] = tags + else: + data = points params = { 'db': database or self._database @@ -433,12 +445,13 @@ def _write_points(self, params['rp'] = retention_policy if self.use_udp: - self.send_packet(data) + self.send_packet(data, protocol=protocol) else: self.write( data=data, params=params, - expected_response_code=204 + expected_response_code=204, + protocol=protocol ) return True @@ -737,13 +750,16 @@ def get_list_privileges(self, username): text = "SHOW GRANTS FOR {0}".format(username) return list(self.query(text).get_points()) - def send_packet(self, packet): + def send_packet(self, packet, protocol='json'): """Send an UDP packet. :param packet: the packet to be sent :type packet: dict """ - data = make_lines(packet).encode('utf-8') + if protocol == 'json': + data = make_lines(packet).encode('utf-8') + elif protocol == 'line': + data = ('\n'.join(data) + '\n').encode('utf-8') self.udp_socket.sendto(data, (self._host, self.udp_port)) diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 5b868d14..780b9419 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -37,8 +37,8 @@ def test_write_points_from_dataframe(self): columns=["column_one", "column_two", "column_three"]) expected = ( - b"foo column_one=\"1\",column_three=1.0,column_two=1i 0\n" - b"foo column_one=\"2\",column_three=2.0,column_two=2i " + b"foo column_one=\"1\",column_two=1i,column_three=1.0 0\n" + b"foo column_one=\"2\",column_two=2i,column_three=2.0 " b"3600000000000\n" ) @@ -96,9 +96,10 @@ def test_write_points_from_dataframe_with_period_index(self): pd.Period('1970-01-02')], columns=["column_one", "column_two", "column_three"]) + expected = ( - b"foo column_one=\"1\",column_three=1.0,column_two=1i 0\n" - b"foo column_one=\"2\",column_three=2.0,column_two=2i " + b"foo column_one=\"1\",column_two=1i,column_three=1.0 0\n" + b"foo column_one=\"2\",column_two=2i,column_three=2.0 " b"86400000000000\n" ) @@ -130,48 +131,48 @@ def test_write_points_from_dataframe_with_time_precision(self): cli.write_points(dataframe, measurement, time_precision='h') self.assertEqual(m.last_request.qs['precision'], ['h']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 1\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 1\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='m') self.assertEqual(m.last_request.qs['precision'], ['m']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 60\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 60\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='s') self.assertEqual(m.last_request.qs['precision'], ['s']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 3600\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 3600\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='ms') self.assertEqual(m.last_request.qs['precision'], ['ms']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 3600000\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 3600000\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='u') self.assertEqual(m.last_request.qs['precision'], ['u']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 3600000000\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 3600000000\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='n') self.assertEqual(m.last_request.qs['precision'], ['n']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\n' - b'foo column_one="2",column_three=2.0,column_two=2i ' + b'foo column_one="1",column_two=1i,column_three=1.0 0\n' + b'foo column_one="2",column_two=2i,column_three=2.0 ' b'3600000000000\n', m.last_request.body, ) diff --git a/influxdb/tests/misc.py b/influxdb/tests/misc.py index 7dffc219..36b5a253 100644 --- a/influxdb/tests/misc.py +++ b/influxdb/tests/misc.py @@ -5,7 +5,11 @@ from __future__ import print_function from __future__ import unicode_literals +import os +import sys +import subprocess import socket +import distutils def get_free_ports(num_ports, ip='127.0.0.1'): @@ -44,3 +48,29 @@ def is_port_open(port, ip='127.0.0.1'): return result == 0 finally: sock.close() + + +def find_influxd_path(): + influxdb_bin_path = os.environ.get( + 'INFLUXDB_PYTHON_INFLUXD_PATH', + None + ) + + if influxdb_bin_path is None: + influxdb_bin_path = distutils.spawn.find_executable('influxd') + if not influxdb_bin_path: + try: + influxdb_bin_path = subprocess.check_output( + ['which', 'influxd'] + ).strip() + except subprocess.CalledProcessError: + # fallback on : + influxdb_bin_path = '/opt/influxdb/influxd' + + if not os.path.isfile(influxdb_bin_path): + raise unittest.SkipTest("Could not find influxd binary") + + version = subprocess.check_output([influxdb_bin_path, 'version']) + print("InfluxDB version: %s" % version, file=sys.stderr) + + return influxdb_bin_path diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index 560c506a..6efa5300 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -19,6 +19,7 @@ import time import unittest import warnings +import subprocess # By default, raise exceptions on warnings warnings.simplefilter('error', FutureWarning) @@ -29,6 +30,7 @@ from influxdb.tests import skipIfPYpy, using_pypy, skipServerTests from influxdb.tests.server_tests.base import ManyTestCasesWithServerMixin from influxdb.tests.server_tests.base import SingleTestCaseWithServerMixin +from influxdb.tests.misc import find_influxd_path if not using_pypy: import pandas as pd @@ -36,6 +38,15 @@ THIS_DIR = os.path.abspath(os.path.dirname(__file__)) +INFLUXDB_BIN_PATH = find_influxd_path() +INFLUXDB_VERSION = (subprocess.check_output([INFLUXDB_BIN_PATH, 'version']) + .split()[1]) + +# Version-specific differences between 0.9 and 0.13 +if (INFLUXDB_VERSION.split(b'.')[:2] == [b'v0', b'13']): + rp_duration = '0s' +elif (INFLUXDB_VERSION.split(b'.')[:2] == [b'v0', b'9']): + rp_duration = '0' def point(serie_name, timestamp=None, tags=None, **fields): @@ -447,7 +458,7 @@ def test_default_retention_policy(self): self.assertEqual( [ {'name': 'default', - 'duration': '0', + 'duration': rp_duration, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', 'default': True} @@ -462,7 +473,7 @@ def test_create_retention_policy_default(self): self.assertEqual( [ - {'duration': '0', + {'duration': rp_duration, 'default': False, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -486,7 +497,7 @@ def test_create_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': '0', + {'duration': rp_duration, 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -509,7 +520,7 @@ def test_alter_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': '0', + {'duration': rp_duration, 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -529,7 +540,7 @@ def test_alter_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': '0', + {'duration': rp_duration, 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -549,7 +560,7 @@ def test_alter_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': '0', + {'duration': rp_duration, 'default': False, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -573,7 +584,7 @@ def test_alter_retention_policy_invalid(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': '0', + {'duration': rp_duration, 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -595,7 +606,7 @@ def test_drop_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': '0', + {'duration': rp_duration, 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', From 7e092ad2be8ea2f78d8879059427a0d3b5f170d0 Mon Sep 17 00:00:00 2001 From: Matt Bartos Date: Mon, 29 Aug 2016 15:18:43 -0400 Subject: [PATCH 2/8] Added unit tests for tag columns. All tests working. --- influxdb/_dataframe_client.py | 5 + influxdb/tests/dataframe_client_test.py | 129 ++++++++++++++++++++++++ 2 files changed, 134 insertions(+) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 855c8233..1e29444b 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -232,6 +232,11 @@ def _convert_dataframe_to_lines(self, field_columns = list(field_columns) if list(field_columns) else [] tag_columns = list(tag_columns) if list(tag_columns) else [] + # If field columns but no tag columns, assume rest of columns are tags + if field_columns and (not tag_columns): + tag_columns = list(column_series[~column_series.isin( + field_columns)]) + # Assume that all columns not listed as tag columns are field columns if not field_columns: field_columns = list(column_series[~column_series.isin( diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 780b9419..9aa96f68 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -69,6 +69,135 @@ def test_write_points_from_dataframe_in_batches(self): cli = DataFrameClient(database='db') self.assertTrue(cli.write_points(dataframe, "foo", batch_size=1)) + def test_write_points_with_tag_columns(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, 1.0], + ['red', 0, "2", 2, 2.0]], + index=[now, now + timedelta(hours=1)], + columns=["tag_one", "tag_two", "column_one", + "column_two", "column_three"]) + expected = ( + b"foo,tag_one=blue,tag_two=1 " + b"column_one=\"1\",column_two=1i,column_three=1.0 " + b"0\n" + b"foo,tag_one=red,tag_two=0 " + b"column_one=\"2\",column_two=2i,column_three=2.0 " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', + tag_columns=['tag_one', 'tag_two']) + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', + tag_columns=['tag_one', 'tag_two'], tags=None) + self.assertEqual(m.last_request.body, expected) + + def test_write_points_with_tag_columns_and_global_tags(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, 1.0], + ['red', 0, "2", 2, 2.0]], + index=[now, now + timedelta(hours=1)], + columns=["tag_one", "tag_two", "column_one", + "column_two", "column_three"]) + expected = ( + b"foo,tag_one=blue,tag_two=1,global_tag=value " + b"column_one=\"1\",column_two=1i,column_three=1.0 " + b"0\n" + b"foo,tag_one=red,tag_two=0,global_tag=value " + b"column_one=\"2\",column_two=2i,column_three=2.0 " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', + tag_columns=['tag_one', 'tag_two'], + tags={'global_tag': 'value'}) + self.assertEqual(m.last_request.body, expected) + + def test_write_points_with_tag_columns_and_defaults(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, 1.0, 'hot'], + ['red', 0, "2", 2, 2.0, 'cold']], + index=[now, now + timedelta(hours=1)], + columns=["tag_one", "tag_two", "column_one", + "column_two", "column_three", + "tag_three"]) + expected_tags_and_fields = ( + b"foo,tag_one=blue " + b"column_one=\"1\",column_two=1i " + b"0\n" + b"foo,tag_one=red " + b"column_one=\"2\",column_two=2i " + b"3600000000000\n" + ) + + expected_tags_no_fields = ( + b"foo,tag_one=blue,tag_two=1 " + b"column_one=\"1\",column_two=1i,column_three=1.0," + b"tag_three=\"hot\" 0\n" + b"foo,tag_one=red,tag_two=0 " + b"column_one=\"2\",column_two=2i,column_three=2.0," + b"tag_three=\"cold\" 3600000000000\n" + ) + + expected_fields_no_tags = ( + b"foo,tag_one=blue,tag_two=1,tag_three=hot " + b"column_one=\"1\",column_two=1i,column_three=1.0 " + b"0\n" + b"foo,tag_one=red,tag_two=0,tag_three=cold " + b"column_one=\"2\",column_two=2i,column_three=2.0 " + b"3600000000000\n" + ) + + expected_no_tags_no_fields = ( + b"foo " + b"tag_one=\"blue\",tag_two=1i,column_one=\"1\"," + b"column_two=1i,column_three=1.0,tag_three=\"hot\" " + b"0\n" + b"foo " + b"tag_one=\"red\",tag_two=0i,column_one=\"2\"," + b"column_two=2i,column_three=2.0,tag_three=\"cold\" " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', + field_columns=['column_one', 'column_two'], + tag_columns=['tag_one']) + self.assertEqual(m.last_request.body, expected_tags_and_fields) + + cli.write_points(dataframe, 'foo', + tag_columns=['tag_one', 'tag_two']) + self.assertEqual(m.last_request.body, expected_tags_no_fields) + + cli.write_points(dataframe, 'foo', + field_columns=['column_one', 'column_two', + 'column_three']) + self.assertEqual(m.last_request.body, expected_fields_no_tags) + + cli.write_points(dataframe, 'foo') + self.assertEqual(m.last_request.body, expected_no_tags_no_fields) + def test_write_points_from_dataframe_with_numeric_column_names(self): now = pd.Timestamp('1970-01-01 00:00+00:00') # df with numeric column names From 1af9d4e8eb6b9294e85c91196c107be3d5045f71 Mon Sep 17 00:00:00 2001 From: Matt Bartos Date: Mon, 29 Aug 2016 16:01:31 -0400 Subject: [PATCH 3/8] Added more comments and docstrings --- influxdb/_dataframe_client.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 1e29444b..40f0f111 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -45,7 +45,8 @@ def write_points(self, database=None, retention_policy=None, batch_size=None, - protocol='line'): + protocol='line', + numeric_precision=None): """ Write to multiple time series names. @@ -59,6 +60,11 @@ def write_points(self, one database to another or when doing a massive write operation :type batch_size: int :param protocol: Protocol for writing data. Either 'line' or 'json'. + :param numeric_precision: Precision for float or int datatypes. + Either None, 'full' or some int, where int is the desired decimal + precision. 'full' preserves full precision for int and float + datatypes. Defaults to None, which preserves 14-15 significant + figures for float and all significant figures for int datatypes. """ if batch_size: @@ -73,7 +79,8 @@ def write_points(self, global_tags=tags, time_precision=time_precision, tag_columns=tag_columns, - field_columns=field_columns) + field_columns=field_columns, + numeric_precision=numeric_precision) else: points = self._convert_dataframe_to_json( dataframe.ix[start_index:end_index].copy(), @@ -97,7 +104,8 @@ def write_points(self, global_tags=tags, tag_columns=tag_columns, field_columns=field_columns, - time_precision=time_precision) + time_precision=time_precision, + numeric_precision=numeric_precision) else: points = self._convert_dataframe_to_json( dataframe, @@ -229,6 +237,7 @@ def _convert_dataframe_to_lines(self, if tag_columns is None: tag_columns = [] + # Make sure field_columns and tag_columns are lists field_columns = list(field_columns) if list(field_columns) else [] tag_columns = list(tag_columns) if list(tag_columns) else [] @@ -237,7 +246,7 @@ def _convert_dataframe_to_lines(self, tag_columns = list(column_series[~column_series.isin( field_columns)]) - # Assume that all columns not listed as tag columns are field columns + # If no field columns, assume non-tag columns are fields if not field_columns: field_columns = list(column_series[~column_series.isin( tag_columns)]) @@ -293,21 +302,27 @@ def _stringify_dataframe(self, dataframe, numeric_precision, datatype='field'): + int_columns = dataframe.select_dtypes(include=['int']).columns - float_columns = dataframe.select_dtypes(include=['floating']).columns - nonfloat_columns = dataframe.columns[~dataframe.columns.isin( - float_columns)] - numeric_columns = dataframe.select_dtypes(include=['number']).columns string_columns = dataframe.select_dtypes(include=['object']).columns # Convert dataframe to string if numeric_precision is None: + # If no precision specified, convert directly to string (fast) dataframe = dataframe.astype(str) elif numeric_precision == 'full': + # If full precision, use repr to get full float precision + float_columns = (dataframe.select_dtypes(include=['floating']) + .columns) + nonfloat_columns = dataframe.columns[~dataframe.columns.isin( + float_columns)] dataframe[float_columns] = dataframe[float_columns].applymap(repr) dataframe[nonfloat_columns] = (dataframe[nonfloat_columns] .astype(str)) elif isinstance(numeric_precision, int): + # If precision is specified, round to appropriate precision + numeric_columns = (dataframe.select_dtypes(include=['number']) + .columns) dataframe[numeric_columns] = (dataframe[numeric_columns] .round(numeric_precision)) dataframe = dataframe.astype(str) @@ -315,6 +330,7 @@ def _stringify_dataframe(self, raise ValueError('Invalid numeric precision') if datatype == 'field': + # If dealing with fields, format ints and strings correctly dataframe[int_columns] = dataframe[int_columns] + 'i' dataframe[string_columns] = '"' + dataframe[string_columns] + '"' From c3832a1d6141188c247f969ee3de0b1f6c02b14b Mon Sep 17 00:00:00 2001 From: Matt Bartos Date: Mon, 29 Aug 2016 16:57:52 -0400 Subject: [PATCH 4/8] Rolled back changes to retention policy duration. --- influxdb/tests/misc.py | 30 ------------------- .../server_tests/client_test_with_server.py | 27 +++++------------ 2 files changed, 8 insertions(+), 49 deletions(-) diff --git a/influxdb/tests/misc.py b/influxdb/tests/misc.py index 36b5a253..7dffc219 100644 --- a/influxdb/tests/misc.py +++ b/influxdb/tests/misc.py @@ -5,11 +5,7 @@ from __future__ import print_function from __future__ import unicode_literals -import os -import sys -import subprocess import socket -import distutils def get_free_ports(num_ports, ip='127.0.0.1'): @@ -48,29 +44,3 @@ def is_port_open(port, ip='127.0.0.1'): return result == 0 finally: sock.close() - - -def find_influxd_path(): - influxdb_bin_path = os.environ.get( - 'INFLUXDB_PYTHON_INFLUXD_PATH', - None - ) - - if influxdb_bin_path is None: - influxdb_bin_path = distutils.spawn.find_executable('influxd') - if not influxdb_bin_path: - try: - influxdb_bin_path = subprocess.check_output( - ['which', 'influxd'] - ).strip() - except subprocess.CalledProcessError: - # fallback on : - influxdb_bin_path = '/opt/influxdb/influxd' - - if not os.path.isfile(influxdb_bin_path): - raise unittest.SkipTest("Could not find influxd binary") - - version = subprocess.check_output([influxdb_bin_path, 'version']) - print("InfluxDB version: %s" % version, file=sys.stderr) - - return influxdb_bin_path diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index 6efa5300..560c506a 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -19,7 +19,6 @@ import time import unittest import warnings -import subprocess # By default, raise exceptions on warnings warnings.simplefilter('error', FutureWarning) @@ -30,7 +29,6 @@ from influxdb.tests import skipIfPYpy, using_pypy, skipServerTests from influxdb.tests.server_tests.base import ManyTestCasesWithServerMixin from influxdb.tests.server_tests.base import SingleTestCaseWithServerMixin -from influxdb.tests.misc import find_influxd_path if not using_pypy: import pandas as pd @@ -38,15 +36,6 @@ THIS_DIR = os.path.abspath(os.path.dirname(__file__)) -INFLUXDB_BIN_PATH = find_influxd_path() -INFLUXDB_VERSION = (subprocess.check_output([INFLUXDB_BIN_PATH, 'version']) - .split()[1]) - -# Version-specific differences between 0.9 and 0.13 -if (INFLUXDB_VERSION.split(b'.')[:2] == [b'v0', b'13']): - rp_duration = '0s' -elif (INFLUXDB_VERSION.split(b'.')[:2] == [b'v0', b'9']): - rp_duration = '0' def point(serie_name, timestamp=None, tags=None, **fields): @@ -458,7 +447,7 @@ def test_default_retention_policy(self): self.assertEqual( [ {'name': 'default', - 'duration': rp_duration, + 'duration': '0', 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', 'default': True} @@ -473,7 +462,7 @@ def test_create_retention_policy_default(self): self.assertEqual( [ - {'duration': rp_duration, + {'duration': '0', 'default': False, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -497,7 +486,7 @@ def test_create_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': rp_duration, + {'duration': '0', 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -520,7 +509,7 @@ def test_alter_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': rp_duration, + {'duration': '0', 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -540,7 +529,7 @@ def test_alter_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': rp_duration, + {'duration': '0', 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -560,7 +549,7 @@ def test_alter_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': rp_duration, + {'duration': '0', 'default': False, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -584,7 +573,7 @@ def test_alter_retention_policy_invalid(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': rp_duration, + {'duration': '0', 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', @@ -606,7 +595,7 @@ def test_drop_retention_policy(self): rsp = self.cli.get_list_retention_policies() self.assertEqual( [ - {'duration': rp_duration, + {'duration': '0', 'default': True, 'replicaN': 1, 'shardGroupDuration': u'168h0m0s', From c8e3e9948f932c7172b27a0a79034ee2667d5799 Mon Sep 17 00:00:00 2001 From: Matt Bartos Date: Mon, 29 Aug 2016 17:26:33 -0400 Subject: [PATCH 5/8] Added comments to _dataframe_client. Re-pushing to try and fix travis build. --- influxdb/_dataframe_client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 40f0f111..fa9423a2 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -230,6 +230,7 @@ def _convert_dataframe_to_lines(self, raise TypeError('Must be DataFrame with DatetimeIndex or \ PeriodIndex.') + # Create a Series of columns for easier indexing column_series = pd.Series(dataframe.columns) if field_columns is None: @@ -303,6 +304,7 @@ def _stringify_dataframe(self, numeric_precision, datatype='field'): + # Find int and string columns for field-type data int_columns = dataframe.select_dtypes(include=['int']).columns string_columns = dataframe.select_dtypes(include=['object']).columns From 423b8c97231409d87354f6bd9e07b418bd429d16 Mon Sep 17 00:00:00 2001 From: Matt Bartos Date: Mon, 29 Aug 2016 17:40:40 -0400 Subject: [PATCH 6/8] Try rebuilding without cache --- .travis.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1aeb0949..868fe5cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,8 +33,8 @@ notifications: sudo: false # Travis caching -cache: - directories: - - $HOME/.cache/pip -before_cache: - - rm -f $HOME/.cache/pip/log/debug.log +cache: false +# directories: +# - $HOME/.cache/pip +#before_cache: +# - rm -f $HOME/.cache/pip/log/debug.log From 56062c5160bcc119ea420c1ed8a6ac5e7788e6c9 Mon Sep 17 00:00:00 2001 From: Matt Bartos Date: Tue, 30 Aug 2016 03:00:45 -0400 Subject: [PATCH 7/8] Minor changes to _stringify_dataframe. Added test for numeric precision. --- influxdb/_dataframe_client.py | 21 ++++++++--- influxdb/tests/dataframe_client_test.py | 50 +++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index fa9423a2..9794e954 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -305,7 +305,7 @@ def _stringify_dataframe(self, datatype='field'): # Find int and string columns for field-type data - int_columns = dataframe.select_dtypes(include=['int']).columns + int_columns = dataframe.select_dtypes(include=['integer']).columns string_columns = dataframe.select_dtypes(include=['object']).columns # Convert dataframe to string @@ -323,11 +323,20 @@ def _stringify_dataframe(self, .astype(str)) elif isinstance(numeric_precision, int): # If precision is specified, round to appropriate precision - numeric_columns = (dataframe.select_dtypes(include=['number']) - .columns) - dataframe[numeric_columns] = (dataframe[numeric_columns] - .round(numeric_precision)) - dataframe = dataframe.astype(str) + float_columns = (dataframe.select_dtypes(include=['floating']) + .columns) + nonfloat_columns = dataframe.columns[~dataframe.columns.isin( + float_columns)] + dataframe[float_columns] = (dataframe[float_columns] + .round(numeric_precision)) + # If desired precision is > 10 decimal places, need to use repr + if numeric_precision > 10: + dataframe[float_columns] = (dataframe[float_columns] + .applymap(repr)) + dataframe[nonfloat_columns] = (dataframe[nonfloat_columns] + .astype(str)) + else: + dataframe = dataframe.astype(str) else: raise ValueError('Invalid numeric precision') diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 9aa96f68..0b3b9b90 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -69,7 +69,7 @@ def test_write_points_from_dataframe_in_batches(self): cli = DataFrameClient(database='db') self.assertTrue(cli.write_points(dataframe, "foo", batch_size=1)) - def test_write_points_with_tag_columns(self): + def test_write_points_from_dataframe_with_tag_columns(self): now = pd.Timestamp('1970-01-01 00:00+00:00') dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, 1.0], ['red', 0, "2", 2, 2.0]], @@ -100,7 +100,7 @@ def test_write_points_with_tag_columns(self): tag_columns=['tag_one', 'tag_two'], tags=None) self.assertEqual(m.last_request.body, expected) - def test_write_points_with_tag_columns_and_global_tags(self): + def test_write_points_from_dataframe_with_tag_cols_and_global_tags(self): now = pd.Timestamp('1970-01-01 00:00+00:00') dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, 1.0], ['red', 0, "2", 2, 2.0]], @@ -128,7 +128,7 @@ def test_write_points_with_tag_columns_and_global_tags(self): tags={'global_tag': 'value'}) self.assertEqual(m.last_request.body, expected) - def test_write_points_with_tag_columns_and_defaults(self): + def test_write_points_from_dataframe_with_tag_cols_and_defaults(self): now = pd.Timestamp('1970-01-01 00:00+00:00') dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, 1.0, 'hot'], ['red', 0, "2", 2, 2.0, 'cold']], @@ -219,6 +219,50 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): self.assertEqual(m.last_request.body, expected) + def test_write_points_from_dataframe_with_numeric_precision(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + # df with numeric column names + dataframe = pd.DataFrame(data=[["1", 1, 1.1111111111111], + ["2", 2, 2.2222222222222]], + index=[now, now + timedelta(hours=1)]) + + expected_default_precision = ( + b'foo,hello=there 0=\"1\",1=1i,2=1.11111111111 0\n' + b'foo,hello=there 0=\"2\",1=2i,2=2.22222222222 3600000000000\n' + ) + + expected_specified_precision = ( + b'foo,hello=there 0=\"1\",1=1i,2=1.1111 0\n' + b'foo,hello=there 0=\"2\",1=2i,2=2.2222 3600000000000\n' + ) + + expected_full_precision = ( + b'foo,hello=there 0=\"1\",1=1i,2=1.1111111111111 0\n' + b'foo,hello=there 0=\"2\",1=2i,2=2.2222222222222 3600000000000\n' + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + cli.write_points(dataframe, "foo", {"hello": "there"}) + + self.assertEqual(m.last_request.body, expected_default_precision) + + cli = DataFrameClient(database='db') + cli.write_points(dataframe, "foo", {"hello": "there"}, + numeric_precision=4) + + self.assertEqual(m.last_request.body, expected_specified_precision) + + cli = DataFrameClient(database='db') + cli.write_points(dataframe, "foo", {"hello": "there"}, + numeric_precision='full') + + self.assertEqual(m.last_request.body, expected_full_precision) + def test_write_points_from_dataframe_with_period_index(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], index=[pd.Period('1970-01-01'), From 15a3d33747645c91fbb49a3fe3c80cdd6b2b5400 Mon Sep 17 00:00:00 2001 From: Matt Bartos Date: Fri, 2 Sep 2016 01:58:13 -0400 Subject: [PATCH 8/8] Incorporated fixes from @tzonghao. Fixed docstrings. --- influxdb/_dataframe_client.py | 8 ++++---- influxdb/client.py | 16 +++++++++++++--- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 9794e954..ddae0862 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -60,7 +60,7 @@ def write_points(self, one database to another or when doing a massive write operation :type batch_size: int :param protocol: Protocol for writing data. Either 'line' or 'json'. - :param numeric_precision: Precision for float or int datatypes. + :param numeric_precision: Precision for floating point values. Either None, 'full' or some int, where int is the desired decimal precision. 'full' preserves full precision for int and float datatypes. Defaults to None, which preserves 14-15 significant @@ -94,7 +94,7 @@ def write_points(self, time_precision, database, retention_policy, - protocol='line') + protocol=protocol) return True else: if protocol == 'line': @@ -119,7 +119,7 @@ def write_points(self, time_precision, database, retention_policy, - protocol='line') + protocol=protocol) return True def query(self, query, chunked=False, database=None): @@ -338,7 +338,7 @@ def _stringify_dataframe(self, else: dataframe = dataframe.astype(str) else: - raise ValueError('Invalid numeric precision') + raise ValueError('Invalid numeric precision.') if datatype == 'field': # If dealing with fields, format ints and strings correctly diff --git a/influxdb/client.py b/influxdb/client.py index aae44158..40978dd3 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -257,12 +257,15 @@ def write(self, data, params=None, expected_response_code=204, """Write data to InfluxDB. :param data: the data to be written - :type data: dict + :type data: (if protocol is 'json') dict + (if protocol is 'line') sequence of line protocol strings :param params: additional parameters for the request, defaults to None :type params: dict :param expected_response_code: the expected response code of the write operation, defaults to 204 :type expected_response_code: int + :param protocol: protocol of input data, either 'json' or 'line' + :type protocol: str :returns: True, if the write operation is successful :rtype: bool """ @@ -363,6 +366,9 @@ def write_points(self, :param points: the list of points to be written in the database :type points: list of dictionaries, each dictionary represents a point + :type data: (if protocol is 'json') list of dicts, where each dict + represents a point. + (if protocol is 'line') sequence of line protocol strings. :param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None :type time_precision: str :param database: the database to write the points to. Defaults to @@ -380,9 +386,10 @@ def write_points(self, one database to another or when doing a massive write operation, defaults to None :type batch_size: int + :param protocol: Protocol for writing data. Either 'line' or 'json'. + :type protocol: str :returns: True, if the operation is successful :rtype: bool - :param protocol: Protocol for writing data. Either 'line' or 'json'. .. note:: if no retention policy is specified, the default retention policy for the database is used @@ -754,7 +761,10 @@ def send_packet(self, packet, protocol='json'): """Send an UDP packet. :param packet: the packet to be sent - :type packet: dict + :type packet: (if protocol is 'json') dict + (if protocol is 'line') sequence of line protocol strings + :param protocol: protocol of input data, either 'json' or 'line' + :type protocol: str """ if protocol == 'json': data = make_lines(packet).encode('utf-8')