diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 00000000..0248ade1 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @aviau @sebito91 @xginn8 diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 00000000..7a7927c1 --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,4 @@ +- **InfluxDB version:** e.g. 1.7.7 (output of the `influx version` command) +- **InfluxDB-python version:** e.g. 5.2.2 (output of the `python -c "from __future__ import print_function; import influxdb; print(influxdb.__version__)"` command) +- **Python version:** e.g. 3.7.4 (output of the `python --version` command) +- **Operating system version:** e.g. Windows 10, Ubuntu 18.04, macOS 10.14.5 diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 00000000..84729d17 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,5 @@ +--- +##### Contributor checklist + +- [ ] Builds are passing +- [ ] New tests have been added (for feature additions) diff --git a/.gitignore b/.gitignore index 7720b658..d970c44c 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ build/ mock*/ nose*/ .pybuild/ +.mypy_cache/ debian/files debian/python-influxdb.debhelper.log debian/python-influxdb.postinst.debhelper diff --git a/.travis.yml b/.travis.yml index 7f3d4a5d..9d45f19b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,14 +4,17 @@ python: - "2.7" - "3.5" - "3.6" - - "pypy-5.3.1" + - "3.7" + - "pypy" - "pypy3" env: - - INFLUXDB_VER=1.2.4 - - INFLUXDB_VER=1.3.9 - - INFLUXDB_VER=1.4.2 - - INFLUXDB_VER=1.5.4 + - INFLUXDB_VER=1.2.4 # 2017-05-08 + - INFLUXDB_VER=1.3.9 # 2018-01-19 + - INFLUXDB_VER=1.4.3 # 2018-01-30 + - INFLUXDB_VER=1.5.4 # 2018-06-22 + - INFLUXDB_VER=1.6.4 # 2018-10-24 + - INFLUXDB_VER=1.7.4 # 2019-02-14 addons: apt: @@ -20,18 +23,20 @@ addons: matrix: include: - - python: 2.7 + - python: 3.7 env: TOX_ENV=pep257 - - python: 3.6 + - python: 3.7 env: TOX_ENV=docs - - python: 3.6 + - python: 3.7 env: TOX_ENV=flake8 - - python: 3.6 + - python: 3.7 env: TOX_ENV=coverage + - python: 3.7 + env: TOX_ENV=mypy install: - pip install tox-travis - - pip install setuptools==20.6.6 + - pip install setuptools - pip install coveralls - mkdir -p "influxdb_install/${INFLUXDB_VER}" - if [ -n "${INFLUXDB_VER}" ] ; then wget "https://dl.influxdata.com/influxdb/releases/influxdb_${INFLUXDB_VER}_amd64.deb" ; fi diff --git a/CHANGELOG.md b/CHANGELOG.md index c156b678..bfd27d38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,85 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [v5.3.2] - 2024-04-17 + +### Changed +- Correctly serialize nanosecond dataframe timestamps (#926) + +## [v5.3.1] - 2022-11-14 + +### Added +- Add support for custom headers in the InfluxDBClient (#710 thx @nathanielatom) +- Add support for custom indexes for query in the DataFrameClient (#785) + +### Changed +- Amend retry to avoid sleep after last retry before raising exception (#790 thx @krzysbaranski) +- Remove msgpack pinning for requirements (#818 thx @prometheanfire) +- Update support for HTTP headers in the InfluxDBClient (#851 thx @bednar) + +### Removed + +## [v5.3.0] - 2020-04-10 + +### Added +- Add mypy testing framework (#756) +- Add support for messagepack (#734 thx @lovasoa) +- Add support for 'show series' (#357 thx @gaker) +- Add support for custom request session in InfluxDBClient (#360 thx @dschien) +- Add support for handling np.nan and np.inf values in DataFrameClient (#436 thx @nmerket) +- Add support for optional `time_precision` in the SeriesHelper (#502 && #719 thx @appunni-dishq && @klDen) +- Add ability to specify retention policy in SeriesHelper (#723 thx @csanz91) +- Add gzip compression for post and response data (#732 thx @KEClaytor) +- Add support for chunked responses in ResultSet (#753 and #538 thx @hrbonz && @psy0rz) +- Add support for empty string fields (#766 thx @gregschrock) +- Add support for context managers to InfluxDBClient (#721 thx @JustusAdam) + +### Changed +- Clean up stale CI config (#755) +- Add legacy client test (#752 & #318 thx @oldmantaiter & @sebito91) +- Update make_lines section in line_protocol.py to split out core function (#375 thx @aisbaa) +- Fix nanosecond time resolution for points (#407 thx @AndreCAndersen && @clslgrnc) +- Fix import of distutils.spawn (#805 thx @Hawk777) +- Update repr of float values including properly handling of boolean (#488 thx @ghost) +- Update DataFrameClient to fix faulty empty tags (#770 thx @michelfripiat) +- Update DataFrameClient to properly return `dropna` values (#778 thx @jgspiro) +- Update DataFrameClient to test for pd.DataTimeIndex before blind conversion (#623 thx @testforvin) +- Update client to type-set UDP port to int (#651 thx @yifeikong) +- Update batched writing support for all iterables (#746 thx @JayH5) +- Update SeriesHelper to enable class instantiation when not initialized (#772 thx @ocworld) +- Update UDP test case to add proper timestamp to datapoints (#808 thx @shantanoo-desai) + +### Removed + +## [v5.2.3] - 2019-08-19 + +### Added +- Add consistency param to InfluxDBClient.write_points (#643 thx @RonRothman) +- Add UDP example (#648 thx @shantanoo-desai) +- Add consistency paramter to `write_points` (#664 tx @RonRothman) +- The query() function now accepts a bind_params argument for parameter binding (#678 thx @clslgrnc) +- Add `get_list_continuous_queries`, `drop_continuous_query`, and `create_continuous_query` management methods for + continuous queries (#681 thx @lukaszdudek-silvair && @smolse) +- Mutual TLS authentication (#702 thx @LloydW93) + +### Changed +- Update test suite to add support for Python 3.7 and InfluxDB v1.6.4 and 1.7.4 (#692 thx @clslgrnc) +- Update supported versions of influxdb + python (#693 thx @clslgrnc) +- Fix for the line protocol issue with leading comma (#694 thx @d3banjan) +- Update classifiers tuple to list in setup.py (#697 thx @Hanaasagi) +- Update documentation for empty `delete_series` confusion (#699 thx @xginn8) +- Fix newline character issue in tag value (#716 thx @syhan) +- Update tests/tutorials_pandas.py to reference `line` protocol, bug in `json` (#737 thx @Aeium) + +### Removed + +## [v5.2.2] - 2019-03-14 +### Added + +### Changed +- Fix 'TypeError: Already tz-aware' introduced with recent versions of Panda (#671, #676, thx @f4bsch @clslgrnc) + +## [v5.2.1] - 2018-12-07 ### Added ### Changed diff --git a/LICENSE b/LICENSE index 38ee2491..a49a5410 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2013 InfluxDB +Copyright (c) 2020 InfluxDB Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/README.rst b/README.rst index d4f9611c..048db045 100644 --- a/README.rst +++ b/README.rst @@ -15,36 +15,45 @@ InfluxDB-Python :target: https://pypi.python.org/pypi/influxdb :alt: PyPI Status -InfluxDB-Python is a client for interacting with InfluxDB_. -Development of this library is maintained by: +.. important:: -+-----------+-------------------------------+ -| Github ID | URL | -+===========+===============================+ -| @aviau | (https://github.com/aviau) | -+-----------+-------------------------------+ -| @xginn8 | (https://github.com/xginn8) | -+-----------+-------------------------------+ -| @sebito91 | (https://github.com/sebito91) | -+-----------+-------------------------------+ + **This project is no longer in development** + + This v1 client library is for interacting with `InfluxDB 1.x `_ and 1.x-compatible endpoints in `InfluxDB 2.x `_. + Use it to: + + - Write data in line protocol. + - Query data with `InfluxQL `_. + + If you use `InfluxDB 2.x (TSM storage engine) `_ and `Flux `_, see the `v2 client library `_. + + If you use `InfluxDB 3.0 `_, see the `v3 client library `_. + + For new projects, consider using InfluxDB 3.0 and v3 client libraries. + +Description +=========== + +InfluxDB-python, the InfluxDB Python Client (1.x), is a client library for interacting with `InfluxDB 1.x `_ instances. .. _readme-about: -InfluxDB is an open-source distributed time series database, find more about InfluxDB_ at https://docs.influxdata.com/influxdb/latest +`InfluxDB`_ is the time series platform designed to handle high write and query loads. .. _installation: -InfluxDB pre v1.1.0 users -------------------------- -This module is tested with InfluxDB versions: v1.2.4, v1.3.9, v1.4.2, and v1.5.4. +For InfluxDB pre-v1.1.0 users +----------------------------- -Those users still on InfluxDB v0.8.x users may still use the legacy client by importing ``from influxdb.influxdb08 import InfluxDBClient``. +This module is tested with InfluxDB versions v1.2.4, v1.3.9, v1.4.3, v1.5.4, v1.6.4, and 1.7.4. -Installation ------------- +Users on InfluxDB v0.8.x may still use the legacy client by importing ``from influxdb.influxdb08 import InfluxDBClient``. + +For InfluxDB v1.1+ users +------------------------ Install, upgrade and uninstall influxdb-python with these commands:: @@ -59,7 +68,7 @@ On Debian/Ubuntu, you can install it with this command:: Dependencies ------------ -The influxdb-python distribution is supported and tested on Python 2.7, 3.5, 3.6, PyPy and PyPy3. +The influxdb-python distribution is supported and tested on Python 2.7, 3.5, 3.6, 3.7, PyPy and PyPy3. **Note:** Python <3.5 are currently untested. See ``.travis.yml``. @@ -152,21 +161,33 @@ We are also lurking on the following: Development ----------- +The v1 client libraries for InfluxDB 1.x were typically developed and maintained by InfluxDB community members. If you are an InfluxDB v1 user interested in maintaining this client library (at a minimum, keeping it updated with security patches) please contact the InfluxDB team at on the `Community Forums `_ or +`InfluxData Slack `_. + All development is done on Github_. Use Issues_ to report problems or submit contributions. .. _Github: https://github.com/influxdb/influxdb-python/ .. _Issues: https://github.com/influxdb/influxdb-python/issues -Please note that we WILL get to your questions/issues/concerns as quickly as possible. We maintain many -software repositories and sometimes things may get pushed to the backburner. Please don't take offense, -we will do our best to reply as soon as possible! +Please note that we will answer you question as quickly as possible. + +Maintainers: ++-----------+-------------------------------+ +| Github ID | URL | ++===========+===============================+ +| @aviau | (https://github.com/aviau) | ++-----------+-------------------------------+ +| @xginn8 | (https://github.com/xginn8) | ++-----------+-------------------------------+ +| @sebito91 | (https://github.com/sebito91) | ++-----------+-------------------------------+ Source code ----------- -The source code is currently available on Github: https://github.com/influxdata/influxdb-python +The source code for the InfluxDB Python Client (1.x) is currently available on Github: https://github.com/influxdata/influxdb-python TODO @@ -175,6 +196,6 @@ TODO The TODO/Roadmap can be found in Github bug tracker: https://github.com/influxdata/influxdb-python/issues -.. _InfluxDB: https://influxdata.com/time-series-platform/influxdb/ +.. _InfluxDB: https://influxdata.com/ .. _Sphinx: http://sphinx.pocoo.org/ .. _Tox: https://tox.readthedocs.org diff --git a/docs/source/api-documentation.rst b/docs/source/api-documentation.rst index d00600e6..35fdb291 100644 --- a/docs/source/api-documentation.rst +++ b/docs/source/api-documentation.rst @@ -30,7 +30,7 @@ These clients are initiated in the same way as the client = DataFrameClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname') -.. note:: Only when using UDP (use_udp=True) the connections is established. +.. note:: Only when using UDP (use_udp=True) the connection is established. .. _InfluxDBClient-api: diff --git a/docs/source/conf.py b/docs/source/conf.py index 231c776c..efc22f88 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -117,7 +117,8 @@ # Add any paths that contain custom themes here, relative to this directory. #html_theme_path = [] -html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] +# Calling get_html_theme_path is deprecated. +# html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] # The name for this set of Sphinx documents. If None, it defaults to # " v documentation". diff --git a/docs/source/examples.rst b/docs/source/examples.rst index 2c85fbda..841ad8b1 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -25,3 +25,15 @@ Tutorials - SeriesHelper .. literalinclude:: ../../examples/tutorial_serieshelper.py :language: python + +Tutorials - UDP +=============== + +.. literalinclude:: ../../examples/tutorial_udp.py + :language: python + +Tutorials - Authorization by Token +================================== + +.. literalinclude:: ../../examples/tutorial_authorization.py + :language: python diff --git a/examples/tutorial.py b/examples/tutorial.py index 4083bfc5..12cd49c1 100644 --- a/examples/tutorial.py +++ b/examples/tutorial.py @@ -13,7 +13,9 @@ def main(host='localhost', port=8086): dbname = 'example' dbuser = 'smly' dbuser_password = 'my_secret_password' - query = 'select value from cpu_load_short;' + query = 'select Float_value from cpu_load_short;' + query_where = 'select Int_value from cpu_load_short where host=$host;' + bind_params = {'host': 'server01'} json_body = [ { "measurement": "cpu_load_short", @@ -50,6 +52,11 @@ def main(host='localhost', port=8086): print("Result: {0}".format(result)) + print("Querying data: " + query_where) + result = client.query(query_where, bind_params=bind_params) + + print("Result: {0}".format(result)) + print("Switch user: " + user) client.switch_user(user, password) diff --git a/examples/tutorial_authorization.py b/examples/tutorial_authorization.py new file mode 100644 index 00000000..9d9a800f --- /dev/null +++ b/examples/tutorial_authorization.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +"""Tutorial how to authorize InfluxDB client by custom Authorization token.""" + +import argparse +from influxdb import InfluxDBClient + + +def main(token='my-token'): + """Instantiate a connection to the InfluxDB.""" + client = InfluxDBClient(username=None, password=None, + headers={"Authorization": token}) + + print("Use authorization token: " + token) + + version = client.ping() + print("Successfully connected to InfluxDB: " + version) + pass + + +def parse_args(): + """Parse the args from main.""" + parser = argparse.ArgumentParser( + description='example code to play with InfluxDB') + parser.add_argument('--token', type=str, required=False, + default='my-token', + help='Authorization token for the proxy that is ahead the InfluxDB.') + return parser.parse_args() + + +if __name__ == '__main__': + args = parse_args() + main(token=args.token) diff --git a/examples/tutorial_pandas.py b/examples/tutorial_pandas.py index 67a5457d..13e72f8c 100644 --- a/examples/tutorial_pandas.py +++ b/examples/tutorial_pandas.py @@ -12,7 +12,7 @@ def main(host='localhost', port=8086): user = 'root' password = 'root' dbname = 'demo' - protocol = 'json' + protocol = 'line' client = DataFrameClient(host, port, user, password, dbname) diff --git a/examples/tutorial_udp.py b/examples/tutorial_udp.py new file mode 100644 index 00000000..93b923d7 --- /dev/null +++ b/examples/tutorial_udp.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +"""Example for sending batch information to InfluxDB via UDP.""" + +""" +INFO: In order to use UDP, one should enable the UDP service from the +`influxdb.conf` under section + [[udp]] + enabled = true + bind-address = ":8089" # port number for sending data via UDP + database = "udp1" # name of database to be stored + [[udp]] + enabled = true + bind-address = ":8090" + database = "udp2" +""" + + +import argparse + +from influxdb import InfluxDBClient + + +def main(uport): + """Instantiate connection to the InfluxDB.""" + # NOTE: structure of the UDP packet is different than that of information + # sent via HTTP + json_body = { + "tags": { + "host": "server01", + "region": "us-west" + }, + "points": [{ + "measurement": "cpu_load_short", + "fields": { + "value": 0.64 + }, + "time": "2009-11-10T23:00:00Z", + }, + { + "measurement": "cpu_load_short", + "fields": { + "value": 0.67 + }, + "time": "2009-11-10T23:05:00Z" + }] + } + + # make `use_udp` True and add `udp_port` number from `influxdb.conf` file + # no need to mention the database name since it is already configured + client = InfluxDBClient(use_udp=True, udp_port=uport) + + # Instead of `write_points` use `send_packet` + client.send_packet(json_body) + + +def parse_args(): + """Parse the args.""" + parser = argparse.ArgumentParser( + description='example code to play with InfluxDB along with UDP Port') + parser.add_argument('--uport', type=int, required=True, + help=' UDP port of InfluxDB') + return parser.parse_args() + + +if __name__ == '__main__': + args = parse_args() + main(uport=args.uport) diff --git a/influxdb/__init__.py b/influxdb/__init__.py index 03f74581..e66f80ea 100644 --- a/influxdb/__init__.py +++ b/influxdb/__init__.py @@ -18,4 +18,4 @@ ] -__version__ = '5.2.0' +__version__ = '5.3.2' diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 06da7ac4..907db2cb 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -59,6 +59,8 @@ def write_points(self, :param dataframe: data points in a DataFrame :param measurement: name of measurement :param tags: dictionary of tags, with string key-values + :param tag_columns: [Optional, default None] List of data tag names + :param field_columns: [Options, default None] List of data field names :param time_precision: [Optional, default None] Either 's', 'ms', 'u' or 'n'. :param batch_size: [Optional] Value to write the points in batches @@ -142,6 +144,7 @@ def write_points(self, def query(self, query, params=None, + bind_params=None, epoch=None, expected_response_code=200, database=None, @@ -149,12 +152,23 @@ def query(self, chunked=False, chunk_size=0, method="GET", - dropna=True): + dropna=True, + data_frame_index=None): """ Query data into a DataFrame. + .. danger:: + In order to avoid injection vulnerabilities (similar to `SQL + injection `_ + vulnerabilities), do not directly include untrusted data into the + ``query`` parameter, use ``bind_params`` instead. + :param query: the actual query string :param params: additional parameters for the request, defaults to {} + :param bind_params: bind parameters for the query: + any variable in the query written as ``'$var_name'`` will be + replaced with ``bind_params['var_name']``. Only works in the + ``WHERE`` clause and takes precedence over ``params['params']`` :param epoch: response timestamps to be in epoch format either 'h', 'm', 's', 'ms', 'u', or 'ns',defaults to `None` which is RFC3339 UTC format with nanosecond precision @@ -168,10 +182,13 @@ def query(self, containing all results within that chunk :param chunk_size: Size of each chunk to tell InfluxDB to use. :param dropna: drop columns where all values are missing + :param data_frame_index: the list of columns that + are used as DataFrame index :returns: the queried data :rtype: :class:`~.ResultSet` """ query_args = dict(params=params, + bind_params=bind_params, epoch=epoch, expected_response_code=expected_response_code, raise_errors=raise_errors, @@ -182,16 +199,18 @@ def query(self, results = super(DataFrameClient, self).query(query, **query_args) if query.strip().upper().startswith("SELECT"): if len(results) > 0: - return self._to_dataframe(results, dropna) + return self._to_dataframe(results, dropna, + data_frame_index=data_frame_index) else: return {} else: return results - def _to_dataframe(self, rs, dropna=True): + def _to_dataframe(self, rs, dropna=True, data_frame_index=None): result = defaultdict(list) if isinstance(rs, list): - return map(self._to_dataframe, rs) + return map(self._to_dataframe, rs, + [dropna for _ in range(len(rs))]) for key, data in rs.items(): name, tags = key @@ -201,9 +220,15 @@ def _to_dataframe(self, rs, dropna=True): key = (name, tuple(sorted(tags.items()))) df = pd.DataFrame(data) df.time = pd.to_datetime(df.time) - df.set_index('time', inplace=True) - df.index = df.index.tz_localize('UTC') - df.index.name = None + + if data_frame_index: + df.set_index(data_frame_index, inplace=True) + else: + df.set_index('time', inplace=True) + if df.index.tzinfo is None: + df.index = df.index.tz_localize('UTC') + df.index.name = None + result[key].append(df) for key, data in result.items(): df = pd.concat(data).sort_index() @@ -238,7 +263,8 @@ def _convert_dataframe_to_json(dataframe, field_columns = list( set(dataframe.columns).difference(set(tag_columns))) - dataframe.index = pd.to_datetime(dataframe.index) + if not isinstance(dataframe.index, pd.DatetimeIndex): + dataframe.index = pd.to_datetime(dataframe.index) if dataframe.index.tzinfo is None: dataframe.index = dataframe.index.tz_localize('UTC') @@ -257,14 +283,31 @@ def _convert_dataframe_to_json(dataframe, "h": 1e9 * 3600, }.get(time_precision, 1) + if not tag_columns: + points = [ + {'measurement': measurement, + 'fields': + rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(), + 'time': np.int64(ts.value / precision_factor)} + for ts, (_, rec) in zip( + dataframe.index, + dataframe[field_columns].iterrows() + ) + ] + + return points + points = [ {'measurement': measurement, 'tags': dict(list(tag.items()) + list(tags.items())), - 'fields': rec, + 'fields': + rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(), 'time': np.int64(ts.value / precision_factor)} - for ts, tag, rec in zip(dataframe.index, - dataframe[tag_columns].to_dict('record'), - dataframe[field_columns].to_dict('record')) + for ts, tag, (_, rec) in zip( + dataframe.index, + dataframe[tag_columns].to_dict('record'), + dataframe[field_columns].iterrows() + ) ] return points @@ -329,10 +372,10 @@ def _convert_dataframe_to_lines(self, # Make array of timestamp ints if isinstance(dataframe.index, pd.PeriodIndex): - time = ((dataframe.index.to_timestamp().values.astype(np.int64) / + time = ((dataframe.index.to_timestamp().values.astype(np.int64) // precision_factor).astype(np.int64).astype(str)) else: - time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) / + time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) // precision_factor).astype(np.int64).astype(str)) # If tag columns exist, make an array of formatted tag keys and values @@ -350,7 +393,7 @@ def _convert_dataframe_to_lines(self, tag_df = self._stringify_dataframe( tag_df, numeric_precision, datatype='tag') - # join preprendded tags, leaving None values out + # join prepended tags, leaving None values out tags = tag_df.apply( lambda s: [',' + s.name + '=' + v if v else '' for v in s]) tags = tags.sum(axis=1) @@ -358,7 +401,8 @@ def _convert_dataframe_to_lines(self, del tag_df elif global_tags: tag_string = ''.join( - [",{}={}".format(k, _escape_tag(v)) if v else '' + [",{}={}".format(k, _escape_tag(v)) + if v not in [None, ''] else "" for k, v in sorted(global_tags.items())] ) tags = pd.Series(tag_string, index=dataframe.index) @@ -366,19 +410,18 @@ def _convert_dataframe_to_lines(self, tags = '' # Make an array of formatted field keys and values - field_df = dataframe[field_columns] - # Keep the positions where Null values are found - mask_null = field_df.isnull().values + field_df = dataframe[field_columns].replace([np.inf, -np.inf], np.nan) + nans = pd.isnull(field_df) field_df = self._stringify_dataframe(field_df, numeric_precision, datatype='field') field_df = (field_df.columns.values + '=').tolist() + field_df - field_df[field_df.columns[1:]] = ',' + field_df[ - field_df.columns[1:]] - field_df = field_df.where(~mask_null, '') # drop Null entries - fields = field_df.sum(axis=1) + field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]] + field_df[nans] = '' + + fields = field_df.sum(axis=1).map(lambda x: x.lstrip(',')) del field_df # Generate line protocol string diff --git a/influxdb/client.py b/influxdb/client.py index 8f8b14ae..c535a3f1 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -6,14 +6,21 @@ from __future__ import print_function from __future__ import unicode_literals -import time -import random - +import datetime +import gzip +import itertools +import io import json +import random import socket +import struct +import time +from itertools import chain, islice + +import msgpack import requests import requests.exceptions -from six.moves import xrange +from requests.adapters import HTTPAdapter from six.moves.urllib.parse import urlparse from influxdb.line_protocol import make_lines, quote_ident, quote_literal @@ -29,6 +36,9 @@ class InfluxDBClient(object): connect to InfluxDB. Requests can be made to InfluxDB directly through the client. + The client supports the use as a `context manager + `_. + :param host: hostname to connect to InfluxDB, defaults to 'localhost' :type host: str :param port: port to connect to InfluxDB, defaults to 8086 @@ -50,8 +60,12 @@ class InfluxDBClient(object): :param timeout: number of seconds Requests will wait for your client to establish a connection, defaults to None :type timeout: int - :param retries: number of retries your client will try before aborting, - defaults to 3. 0 indicates try until success + :param retries: number of attempts your client will make before aborting, + defaults to 3 + 0 - try until success + 1 - attempt only once (without retry) + 2 - maximum two attempts (including one retry) + 3 - maximum three attempts (default option) :type retries: int :param use_udp: use UDP to connect to InfluxDB, defaults to False :type use_udp: bool @@ -61,6 +75,25 @@ class InfluxDBClient(object): :type proxies: dict :param path: path of InfluxDB on the server to connect, defaults to '' :type path: str + :param cert: Path to client certificate information to use for mutual TLS + authentication. You can specify a local cert to use + as a single file containing the private key and the certificate, or as + a tuple of both files’ paths, defaults to None + :type cert: str + :param gzip: use gzip content encoding to compress requests + :type gzip: bool + :param session: allow for the new client request to use an existing + requests Session, defaults to None + :type session: requests.Session + :param headers: headers to add to Requests, will add 'Content-Type' + and 'Accept' unless these are already present, defaults to {} + :type headers: dict + :param socket_options: use custom tcp socket options, + If not specified, then defaults are loaded from + ``HTTPConnection.default_socket_options`` + :type socket_options: list + + :raises ValueError: if cert is provided but ssl is disabled (set to False) """ def __init__(self, @@ -78,6 +111,11 @@ def __init__(self, proxies=None, pool_size=10, path='', + cert=None, + gzip=False, + session=None, + headers=None, + socket_options=None, ): """Construct a new InfluxDBClient object.""" self.__host = host @@ -91,11 +129,16 @@ def __init__(self, self._verify_ssl = verify_ssl self.__use_udp = use_udp - self.__udp_port = udp_port - self._session = requests.Session() - adapter = requests.adapters.HTTPAdapter( + self.__udp_port = int(udp_port) + + if not session: + session = requests.Session() + + self._session = session + adapter = _SocketOptionsAdapter( pool_connections=int(pool_size), - pool_maxsize=int(pool_size) + pool_maxsize=int(pool_size), + socket_options=socket_options ) if use_udp: @@ -120,16 +163,35 @@ def __init__(self, else: self._proxies = proxies + if cert: + if not ssl: + raise ValueError( + "Client certificate provided but ssl is disabled." + ) + else: + self._session.cert = cert + self.__baseurl = "{0}://{1}:{2}{3}".format( self._scheme, self._host, self._port, self._path) - self._headers = { - 'Content-Type': 'application/json', - 'Accept': 'text/plain' - } + if headers is None: + headers = {} + headers.setdefault('Content-Type', 'application/json') + headers.setdefault('Accept', 'application/x-msgpack') + self._headers = headers + + self._gzip = gzip + + def __enter__(self): + """Enter function as used by context manager.""" + return self + + def __exit__(self, _exc_type, _exc_value, _traceback): + """Exit function as used by context manager.""" + self.close() @property def _baseurl(self): @@ -215,7 +277,7 @@ def switch_user(self, username, password): self._username = username self._password = password - def request(self, url, method='GET', params=None, data=None, + def request(self, url, method='GET', params=None, data=None, stream=False, expected_response_code=200, headers=None): """Make a HTTP request to the InfluxDB API. @@ -227,6 +289,8 @@ def request(self, url, method='GET', params=None, data=None, :type params: dict :param data: the data of the request, defaults to None :type data: str + :param stream: True if a query uses chunked responses + :type stream: bool :param expected_response_code: the expected response code of the request, defaults to 200 :type expected_response_code: int @@ -250,17 +314,39 @@ def request(self, url, method='GET', params=None, data=None, if isinstance(data, (dict, list)): data = json.dumps(data) + if self._gzip: + # Receive and send compressed data + headers.update({ + 'Accept-Encoding': 'gzip', + 'Content-Encoding': 'gzip', + }) + if data is not None: + # For Py 2.7 compatability use Gzipfile + compressed = io.BytesIO() + with gzip.GzipFile( + compresslevel=9, + fileobj=compressed, + mode='w' + ) as f: + f.write(data) + data = compressed.getvalue() + # Try to send the request more than once by default (see #103) retry = True _try = 0 while retry: try: + if "Authorization" in headers: + auth = (None, None) + else: + auth = (self._username, self._password) response = self._session.request( method=method, url=url, - auth=(self._username, self._password), + auth=auth if None not in auth else None, params=params, data=data, + stream=stream, headers=headers, proxies=self._proxies, verify=self._verify_ssl, @@ -273,17 +359,34 @@ def request(self, url, method='GET', params=None, data=None, _try += 1 if self._retries != 0: retry = _try < self._retries - if method == "POST": - time.sleep((2 ** _try) * random.random() / 100.0) if not retry: raise + if method == "POST": + time.sleep((2 ** _try) * random.random() / 100.0) + + type_header = response.headers and response.headers.get("Content-Type") + if type_header == "application/x-msgpack" and response.content: + response._msgpack = msgpack.unpackb( + packed=response.content, + ext_hook=_msgpack_parse_hook, + raw=False) + else: + response._msgpack = None + + def reformat_error(response): + if response._msgpack: + return json.dumps(response._msgpack, separators=(',', ':')) + else: + return response.content + # if there's not an error, there must have been a successful response if 500 <= response.status_code < 600: - raise InfluxDBServerError(response.content) + raise InfluxDBServerError(reformat_error(response)) elif response.status_code == expected_response_code: return response else: - raise InfluxDBClientError(response.content, response.status_code) + err_msg = reformat_error(response) + raise InfluxDBClientError(err_msg, response.status_code) def write(self, data, params=None, expected_response_code=204, protocol='json'): @@ -292,7 +395,7 @@ def write(self, data, params=None, expected_response_code=204, :param data: the data to be written :type data: (if protocol is 'json') dict (if protocol is 'line') sequence of line protocol strings - or single string + or single string :param params: additional parameters for the request, defaults to None :type params: dict :param expected_response_code: the expected response code of the write @@ -303,7 +406,7 @@ def write(self, data, params=None, expected_response_code=204, :returns: True, if the write operation is successful :rtype: bool """ - headers = self._headers + headers = self._headers.copy() headers['Content-Type'] = 'application/octet-stream' if params: @@ -330,21 +433,22 @@ def write(self, data, params=None, expected_response_code=204, @staticmethod def _read_chunked_response(response, raise_errors=True): - result_set = {} for line in response.iter_lines(): if isinstance(line, bytes): line = line.decode('utf-8') data = json.loads(line) + result_set = {} for result in data.get('results', []): for _key in result: if isinstance(result[_key], list): result_set.setdefault( _key, []).extend(result[_key]) - return ResultSet(result_set, raise_errors=raise_errors) + yield ResultSet(result_set, raise_errors=raise_errors) def query(self, query, params=None, + bind_params=None, epoch=None, expected_response_code=200, database=None, @@ -354,6 +458,12 @@ def query(self, method="GET"): """Send a query to InfluxDB. + .. danger:: + In order to avoid injection vulnerabilities (similar to `SQL + injection `_ + vulnerabilities), do not directly include untrusted data into the + ``query`` parameter, use ``bind_params`` instead. + :param query: the actual query string :type query: str @@ -361,6 +471,12 @@ def query(self, defaults to {} :type params: dict + :param bind_params: bind parameters for the query: + any variable in the query written as ``'$var_name'`` will be + replaced with ``bind_params['var_name']``. Only works in the + ``WHERE`` clause and takes precedence over ``params['params']`` + :type bind_params: dict + :param epoch: response timestamps to be in epoch format either 'h', 'm', 's', 'ms', 'u', or 'ns',defaults to `None` which is RFC3339 UTC format with nanosecond precision @@ -394,6 +510,11 @@ def query(self, if params is None: params = {} + if bind_params is not None: + params_dict = json.loads(params.get('params', '{}')) + params_dict.update(bind_params) + params['params'] = json.dumps(params_dict) + params['q'] = query params['db'] = database or self._database @@ -413,13 +534,15 @@ def query(self, method=method, params=params, data=None, + stream=chunked, expected_response_code=expected_response_code ) - if chunked: - return self._read_chunked_response(response) - - data = response.json() + data = response._msgpack + if not data: + if chunked: + return self._read_chunked_response(response) + data = response.json() results = [ ResultSet(result, raise_errors=raise_errors) @@ -440,15 +563,17 @@ def write_points(self, retention_policy=None, tags=None, batch_size=None, - protocol='json' + protocol='json', + consistency=None ): """Write to multiple time series names. :param points: the list of points to be written in the database :type points: list of dictionaries, each dictionary represents a point :type points: (if protocol is 'json') list of dicts, where each dict - represents a point. - (if protocol is 'line') sequence of line protocol strings. + represents a point. + (if protocol is 'line') sequence of line protocol strings. + :param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None :type time_precision: str :param database: the database to write the points to. Defaults to @@ -468,6 +593,9 @@ def write_points(self, :type batch_size: int :param protocol: Protocol for writing data. Either 'line' or 'json'. :type protocol: str + :param consistency: Consistency for the points. + One of {'any','one','quorum','all'}. + :type consistency: str :returns: True, if the operation is successful :rtype: bool @@ -480,14 +608,16 @@ def write_points(self, time_precision=time_precision, database=database, retention_policy=retention_policy, - tags=tags, protocol=protocol) + tags=tags, protocol=protocol, + consistency=consistency) return True return self._write_points(points=points, time_precision=time_precision, database=database, retention_policy=retention_policy, - tags=tags, protocol=protocol) + tags=tags, protocol=protocol, + consistency=consistency) def ping(self): """Check connectivity to InfluxDB. @@ -504,8 +634,17 @@ def ping(self): @staticmethod def _batches(iterable, size): - for i in xrange(0, len(iterable), size): - yield iterable[i:i + size] + # Iterate over an iterable producing iterables of batches. Based on: + # http://code.activestate.com/recipes/303279-getting-items-in-batches/ + iterator = iter(iterable) + while True: + try: # Try get the first element in the iterator... + head = (next(iterator),) + except StopIteration: + return # ...so that we can stop if there isn't one + # Otherwise, lazily slice the rest of the batch + rest = islice(iterator, size - 1) + yield chain(head, rest) def _write_points(self, points, @@ -513,12 +652,16 @@ def _write_points(self, database, retention_policy, tags, - protocol='json'): + protocol='json', + consistency=None): if time_precision not in ['n', 'u', 'ms', 's', 'm', 'h', None]: raise ValueError( "Invalid time precision is given. " "(use 'n', 'u', 'ms', 's', 'm' or 'h')") + if consistency not in ['any', 'one', 'quorum', 'all', None]: + raise ValueError('Invalid consistency: {}'.format(consistency)) + if protocol == 'json': data = { 'points': points @@ -533,6 +676,9 @@ def _write_points(self, 'db': database or self._database } + if consistency is not None: + params['consistency'] = consistency + if time_precision is not None: params['precision'] = time_precision @@ -569,6 +715,40 @@ def get_list_database(self): """ return list(self.query("SHOW DATABASES").get_points()) + def get_list_series(self, database=None, measurement=None, tags=None): + """ + Query SHOW SERIES returns the distinct series in your database. + + FROM and WHERE clauses are optional. + + :param measurement: Show all series from a measurement + :type id: string + :param tags: Show all series that match given tags + :type id: dict + :param database: the database from which the series should be + shows, defaults to client's current database + :type database: str + """ + database = database or self._database + query_str = 'SHOW SERIES' + + if measurement: + query_str += ' FROM "{0}"'.format(measurement) + + if tags: + query_str += ' WHERE ' + ' and '.join(["{0}='{1}'".format(k, v) + for k, v in tags.items()]) + + return list( + itertools.chain.from_iterable( + [ + x.values() + for x in (self.query(query_str, database=database) + .get_points()) + ] + ) + ) + def create_database(self, dbname): """Create a new database in InfluxDB. @@ -693,7 +873,7 @@ def alter_retention_policy(self, name, database=None, query_string = ( "ALTER RETENTION POLICY {0} ON {1}" ).format(quote_ident(name), - quote_ident(database or self._database), shard_duration) + quote_ident(database or self._database)) if duration: query_string += " DURATION {0}".format(duration) if shard_duration: @@ -791,7 +971,7 @@ def drop_user(self, username): :param username: the username to drop :type username: str """ - text = "DROP USER {0}".format(quote_ident(username), method="POST") + text = "DROP USER {0}".format(quote_ident(username)) self.query(text, method="POST") def set_user_password(self, username, password): @@ -809,7 +989,9 @@ def set_user_password(self, username, password): def delete_series(self, database=None, measurement=None, tags=None): """Delete series from a database. - Series can be filtered by measurement and tags. + Series must be filtered by either measurement and tags. + This method cannot be used to delete all series, use + `drop_database` instead. :param database: the database from which the series should be deleted, defaults to client's current database @@ -908,6 +1090,98 @@ def get_list_privileges(self, username): text = "SHOW GRANTS FOR {0}".format(quote_ident(username)) return list(self.query(text).get_points()) + def get_list_continuous_queries(self): + """Get the list of continuous queries in InfluxDB. + + :return: all CQs in InfluxDB + :rtype: list of dictionaries + + :Example: + + :: + + >> cqs = client.get_list_cqs() + >> cqs + [ + { + u'db1': [] + }, + { + u'db2': [ + { + u'name': u'vampire', + u'query': u'CREATE CONTINUOUS QUERY vampire ON ' + 'mydb BEGIN SELECT count(dracula) INTO ' + 'mydb.autogen.all_of_them FROM ' + 'mydb.autogen.one GROUP BY time(5m) END' + } + ] + } + ] + """ + query_string = "SHOW CONTINUOUS QUERIES" + return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()] + + def create_continuous_query(self, name, select, database=None, + resample_opts=None): + r"""Create a continuous query for a database. + + :param name: the name of continuous query to create + :type name: str + :param select: select statement for the continuous query + :type select: str + :param database: the database for which the continuous query is + created. Defaults to current client's database + :type database: str + :param resample_opts: resample options + :type resample_opts: str + + :Example: + + :: + + >> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \ + ... 'FROM "cpu" GROUP BY time(1m)' + >> client.create_continuous_query( + ... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m' + ... ) + >> client.get_list_continuous_queries() + [ + { + 'db_name': [ + { + 'name': 'cpu_mean', + 'query': 'CREATE CONTINUOUS QUERY "cpu_mean" ' + 'ON "db_name" ' + 'RESAMPLE EVERY 10s FOR 2m ' + 'BEGIN SELECT mean("value") ' + 'INTO "cpu_mean" FROM "cpu" ' + 'GROUP BY time(1m) END' + } + ] + } + ] + """ + query_string = ( + "CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END" + ).format(quote_ident(name), quote_ident(database or self._database), + ' RESAMPLE ' + resample_opts if resample_opts else '', select) + self.query(query_string) + + def drop_continuous_query(self, name, database=None): + """Drop an existing continuous query for a database. + + :param name: the name of continuous query to drop + :type name: str + :param database: the database for which the continuous query is + dropped. Defaults to current client's database + :type database: str + """ + query_string = ( + "DROP CONTINUOUS QUERY {0} ON {1}" + ).format(quote_ident(name), quote_ident(database or self._database)) + self.query(query_string) + def send_packet(self, packet, protocol='json', time_precision=None): """Send an UDP packet. @@ -978,3 +1252,25 @@ def _parse_netloc(netloc): 'password': info.password or None, 'host': info.hostname or 'localhost', 'port': info.port or 8086} + + +def _msgpack_parse_hook(code, data): + if code == 5: + (epoch_s, epoch_ns) = struct.unpack(">QI", data) + timestamp = datetime.datetime.utcfromtimestamp(epoch_s) + timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000)) + return timestamp.isoformat() + 'Z' + return msgpack.ExtType(code, data) + + +class _SocketOptionsAdapter(HTTPAdapter): + """_SocketOptionsAdapter injects socket_options into HTTP Adapter.""" + + def __init__(self, *args, **kwargs): + self.socket_options = kwargs.pop("socket_options", None) + super(_SocketOptionsAdapter, self).__init__(*args, **kwargs) + + def init_poolmanager(self, *args, **kwargs): + if self.socket_options is not None: + kwargs["socket_options"] = self.socket_options + super(_SocketOptionsAdapter, self).init_poolmanager(*args, **kwargs) diff --git a/influxdb/dataframe_client.py b/influxdb/dataframe_client.py index 97258644..babfe0dd 100644 --- a/influxdb/dataframe_client.py +++ b/influxdb/dataframe_client.py @@ -25,4 +25,4 @@ def __init__(self, *a, **kw): raise ImportError("DataFrameClient requires Pandas " "which couldn't be imported: %s" % self.err) else: - from ._dataframe_client import DataFrameClient + from ._dataframe_client import DataFrameClient # type: ignore diff --git a/influxdb/helper.py b/influxdb/helper.py index e622526d..138cf6e8 100644 --- a/influxdb/helper.py +++ b/influxdb/helper.py @@ -41,6 +41,12 @@ class Meta: # Only applicable if autocommit is True. autocommit = True # If True and no bulk_size, then will set bulk_size to 1. + retention_policy = 'your_retention_policy' + # Specify the retention policy for the data points + time_precision = "h"|"m"|s"|"ms"|"u"|"ns" + # Default is ns (nanoseconds) + # Setting time precision while writing point + # You should also make sure time is set in the given precision """ @@ -71,6 +77,15 @@ def __new__(cls, *args, **kwargs): cls.__name__)) cls._autocommit = getattr(_meta, 'autocommit', False) + cls._time_precision = getattr(_meta, 'time_precision', None) + + allowed_time_precisions = ['h', 'm', 's', 'ms', 'u', 'ns', None] + if cls._time_precision not in allowed_time_precisions: + raise AttributeError( + 'In {}, time_precision is set, but invalid use any of {}.' + .format(cls.__name__, ','.join(allowed_time_precisions))) + + cls._retention_policy = getattr(_meta, 'retention_policy', None) cls._client = getattr(_meta, 'client', None) if cls._autocommit and not cls._client: @@ -116,11 +131,11 @@ def __init__(self, **kw): keys = set(kw.keys()) # all tags should be passed, and keys - tags should be a subset of keys - if not(tags <= keys): + if not (tags <= keys): raise NameError( 'Expected arguments to contain all tags {0}, instead got {1}.' .format(cls._tags, kw.keys())) - if not(keys - tags <= fields): + if not (keys - tags <= fields): raise NameError('Got arguments not in tags or fields: {0}' .format(keys - tags - fields)) @@ -143,7 +158,12 @@ def commit(cls, client=None): """ if not client: client = cls._client - rtn = client.write_points(cls._json_body_()) + + rtn = client.write_points( + cls._json_body_(), + time_precision=cls._time_precision, + retention_policy=cls._retention_policy) + # will be None if not set and will default to ns cls._reset_() return rtn @@ -154,6 +174,8 @@ def _json_body_(cls): :return: JSON body of these datapoints. """ json = [] + if not cls.__initialized__: + cls._reset_() for series_name, data in six.iteritems(cls._datapoints): for point in data: json_point = { diff --git a/influxdb/influxdb08/client.py b/influxdb/influxdb08/client.py index 965a91db..40c58145 100644 --- a/influxdb/influxdb08/client.py +++ b/influxdb/influxdb08/client.py @@ -292,10 +292,10 @@ def write_points(self, data, time_precision='s', *args, **kwargs): :type batch_size: int """ - def list_chunks(l, n): + def list_chunks(data_list, n): """Yield successive n-sized chunks from l.""" - for i in xrange(0, len(l), n): - yield l[i:i + n] + for i in xrange(0, len(data_list), n): + yield data_list[i:i + n] batch_size = kwargs.get('batch_size') if batch_size and batch_size > 0: diff --git a/influxdb/influxdb08/helper.py b/influxdb/influxdb08/helper.py index f3dec33c..5f2d4614 100644 --- a/influxdb/influxdb08/helper.py +++ b/influxdb/influxdb08/helper.py @@ -139,6 +139,8 @@ def _json_body_(cls): :return: JSON body of the datapoints. """ json = [] + if not cls.__initialized__: + cls._reset_() for series_name, data in six.iteritems(cls._datapoints): json.append({'name': series_name, 'columns': cls._fields, diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py index e8816fc0..25dd2ad7 100644 --- a/influxdb/line_protocol.py +++ b/influxdb/line_protocol.py @@ -11,11 +11,19 @@ from pytz import UTC from dateutil.parser import parse -from six import iteritems, binary_type, text_type, integer_types, PY2 +from six import binary_type, text_type, integer_types, PY2 EPOCH = UTC.localize(datetime.utcfromtimestamp(0)) +def _to_nanos(timestamp): + delta = timestamp - EPOCH + nanos_in_days = delta.days * 86400 * 10 ** 9 + nanos_in_seconds = delta.seconds * 10 ** 9 + nanos_in_micros = delta.microseconds * 10 ** 3 + return nanos_in_days + nanos_in_seconds + nanos_in_micros + + def _convert_timestamp(timestamp, precision=None): if isinstance(timestamp, Integral): return timestamp # assume precision is correct if timestamp is int @@ -27,19 +35,24 @@ def _convert_timestamp(timestamp, precision=None): if not timestamp.tzinfo: timestamp = UTC.localize(timestamp) - ns = (timestamp - EPOCH).total_seconds() * 1e9 + ns = _to_nanos(timestamp) if precision is None or precision == 'n': return ns - elif precision == 'u': - return ns / 1e3 - elif precision == 'ms': - return ns / 1e6 - elif precision == 's': - return ns / 1e9 - elif precision == 'm': - return ns / 1e9 / 60 - elif precision == 'h': - return ns / 1e9 / 3600 + + if precision == 'u': + return ns / 10**3 + + if precision == 'ms': + return ns / 10**6 + + if precision == 's': + return ns / 10**9 + + if precision == 'm': + return ns / 10**9 / 60 + + if precision == 'h': + return ns / 10**9 / 3600 raise ValueError(timestamp) @@ -54,6 +67,8 @@ def _escape_tag(tag): ",", "\\," ).replace( "=", "\\=" + ).replace( + "\n", "\\n" ) @@ -89,14 +104,21 @@ def _is_float(value): def _escape_value(value): - value = _get_unicode(value) + if value is None: + return '' - if isinstance(value, text_type) and value != '': + value = _get_unicode(value) + if isinstance(value, text_type): return quote_ident(value) - elif isinstance(value, integer_types) and not isinstance(value, bool): + + if isinstance(value, integer_types) and not isinstance(value, bool): return str(value) + 'i' - elif _is_float(value): - return repr(value) + + if isinstance(value, bool): + return str(value) + + if _is_float(value): + return repr(float(value)) return str(value) @@ -105,15 +127,60 @@ def _get_unicode(data, force=False): """Try to return a text aka unicode object from the given data.""" if isinstance(data, binary_type): return data.decode('utf-8') - elif data is None: + + if data is None: return '' - elif force: + + if force: if PY2: return unicode(data) - else: - return str(data) - else: - return data + return str(data) + + return data + + +def make_line(measurement, tags=None, fields=None, time=None, precision=None): + """Extract the actual point from a given measurement line.""" + tags = tags or {} + fields = fields or {} + + line = _escape_tag(_get_unicode(measurement)) + + # tags should be sorted client-side to take load off server + tag_list = [] + for tag_key in sorted(tags.keys()): + key = _escape_tag(tag_key) + value = _escape_tag(tags[tag_key]) + + if key != '' and value != '': + tag_list.append( + "{key}={value}".format(key=key, value=value) + ) + + if tag_list: + line += ',' + ','.join(tag_list) + + field_list = [] + for field_key in sorted(fields.keys()): + key = _escape_tag(field_key) + value = _escape_value(fields[field_key]) + + if key != '' and value != '': + field_list.append("{key}={value}".format( + key=key, + value=value + )) + + if field_list: + line += ' ' + ','.join(field_list) + + if time is not None: + timestamp = _get_unicode(str(int( + _convert_timestamp(time, precision) + ))) + line += ' ' + timestamp + + return line def make_lines(data, precision=None): @@ -125,48 +192,19 @@ def make_lines(data, precision=None): lines = [] static_tags = data.get('tags') for point in data['points']: - elements = [] - - # add measurement name - measurement = _escape_tag(_get_unicode( - point.get('measurement', data.get('measurement')))) - key_values = [measurement] - - # add tags if static_tags: tags = dict(static_tags) # make a copy, since we'll modify tags.update(point.get('tags') or {}) else: tags = point.get('tags') or {} - # tags should be sorted client-side to take load off server - for tag_key, tag_value in sorted(iteritems(tags)): - key = _escape_tag(tag_key) - value = _escape_tag_value(tag_value) - - if key != '' and value != '': - key_values.append(key + "=" + value) - - elements.append(','.join(key_values)) - - # add fields - field_values = [] - for field_key, field_value in sorted(iteritems(point['fields'])): - key = _escape_tag(field_key) - value = _escape_value(field_value) - - if key != '' and value != '': - field_values.append(key + "=" + value) - - elements.append(','.join(field_values)) - - # add timestamp - if 'time' in point: - timestamp = _get_unicode(str(int( - _convert_timestamp(point['time'], precision)))) - elements.append(timestamp) - - line = ' '.join(elements) + line = make_line( + point.get('measurement', data.get('measurement')), + tags=tags, + fields=point.get('fields'), + precision=precision, + time=point.get('time') + ) lines.append(line) return '\n'.join(lines) + '\n' diff --git a/influxdb/tests/__init__.py b/influxdb/tests/__init__.py index adf2f20c..f7c5dfb9 100644 --- a/influxdb/tests/__init__.py +++ b/influxdb/tests/__init__.py @@ -12,10 +12,10 @@ import unittest using_pypy = hasattr(sys, "pypy_version_info") -skipIfPYpy = unittest.skipIf(using_pypy, "Skipping this test on pypy.") +skip_if_pypy = unittest.skipIf(using_pypy, "Skipping this test on pypy.") _skip_server_tests = os.environ.get( 'INFLUXDB_PYTHON_SKIP_SERVER_TESTS', None) == 'True' -skipServerTests = unittest.skipIf(_skip_server_tests, - "Skipping server tests...") +skip_server_tests = unittest.skipIf(_skip_server_tests, + "Skipping server tests...") diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index e27eef17..115fbc48 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -24,6 +24,8 @@ import unittest import warnings +import io +import gzip import json import mock import requests @@ -31,6 +33,7 @@ import requests_mock from nose.tools import raises +from urllib3.connection import HTTPConnection from influxdb import InfluxDBClient from influxdb.resultset import ResultSet @@ -149,6 +152,14 @@ def test_dsn(self): **{'ssl': False}) self.assertEqual('http://my.host.fr:1886', cli._baseurl) + def test_cert(self): + """Test mutual TLS authentication for TestInfluxDBClient object.""" + cli = InfluxDBClient(ssl=True, cert='/etc/pki/tls/private/dummy.crt') + self.assertEqual(cli._session.cert, '/etc/pki/tls/private/dummy.crt') + + with self.assertRaises(ValueError): + cli = InfluxDBClient(cert='/etc/pki/tls/private/dummy.crt') + def test_switch_database(self): """Test switch database in TestInfluxDBClient object.""" cli = InfluxDBClient('host', 8086, 'username', 'password', 'database') @@ -206,6 +217,71 @@ def test_write_points(self): m.last_request.body.decode('utf-8'), ) + def test_write_gzip(self): + """Test write in TestInfluxDBClient object.""" + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.POST, + "http://localhost:8086/write", + status_code=204 + ) + + cli = InfluxDBClient(database='db', gzip=True) + cli.write( + {"database": "mydb", + "retentionPolicy": "mypolicy", + "points": [{"measurement": "cpu_load_short", + "tags": {"host": "server01", + "region": "us-west"}, + "time": "2009-11-10T23:00:00Z", + "fields": {"value": 0.64}}]} + ) + + compressed = io.BytesIO() + with gzip.GzipFile( + compresslevel=9, + fileobj=compressed, + mode='w' + ) as f: + f.write( + b"cpu_load_short,host=server01,region=us-west " + b"value=0.64 1257894000000000000\n" + ) + + self.assertEqual( + m.last_request.body, + compressed.getvalue(), + ) + + def test_write_points_gzip(self): + """Test write points for TestInfluxDBClient object.""" + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.POST, + "http://localhost:8086/write", + status_code=204 + ) + + cli = InfluxDBClient(database='db', gzip=True) + cli.write_points( + self.dummy_points, + ) + + compressed = io.BytesIO() + with gzip.GzipFile( + compresslevel=9, + fileobj=compressed, + mode='w' + ) as f: + f.write( + b'cpu_load_short,host=server01,region=us-west ' + b'value=0.64 1257894000123456000\n' + ) + self.assertEqual( + m.last_request.body, + compressed.getvalue(), + ) + def test_write_points_toplevel_attributes(self): """Test write points attrs for TestInfluxDBClient object.""" with requests_mock.Mocker() as m: @@ -257,6 +333,36 @@ def test_write_points_batch(self): self.assertEqual(expected_last_body, m.last_request.body.decode('utf-8')) + def test_write_points_batch_generator(self): + """Test write points batch from a generator for TestInfluxDBClient.""" + dummy_points = [ + {"measurement": "cpu_usage", "tags": {"unit": "percent"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, + {"measurement": "network", "tags": {"direction": "in"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, + {"measurement": "network", "tags": {"direction": "out"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} + ] + dummy_points_generator = (point for point in dummy_points) + expected_last_body = ( + "network,direction=out,host=server01,region=us-west " + "value=12.0 1257894000000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + cli = InfluxDBClient(database='db') + cli.write_points(points=dummy_points_generator, + database='db', + tags={"host": "server01", + "region": "us-west"}, + batch_size=2) + self.assertEqual(m.call_count, 2) + self.assertEqual(expected_last_body, + m.last_request.body.decode('utf-8')) + def test_write_points_udp(self): """Test write points UDP for TestInfluxDBClient object.""" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -337,6 +443,23 @@ def test_write_points_with_precision(self): m.last_request.body, ) + def test_write_points_with_consistency(self): + """Test write points with consistency for TestInfluxDBClient object.""" + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.POST, + 'http://localhost:8086/write', + status_code=204 + ) + + cli = InfluxDBClient(database='db') + + cli.write_points(self.dummy_points, consistency='any') + self.assertEqual( + m.last_request.qs, + {'db': ['db'], 'consistency': ['any']} + ) + def test_write_points_with_precision_udp(self): """Test write points with precision for TestInfluxDBClient object.""" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -409,6 +532,15 @@ def test_write_points_bad_precision(self): time_precision='g' ) + def test_write_points_bad_consistency(self): + """Test write points w/bad consistency value.""" + cli = InfluxDBClient() + with self.assertRaises(ValueError): + cli.write_points( + self.dummy_points, + consistency='boo' + ) + @raises(Exception) def test_write_points_with_precision_fails(self): """Test write points w/precision fail for TestInfluxDBClient object.""" @@ -439,6 +571,29 @@ def test_query(self): [{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}] ) + def test_query_msgpack(self): + """Test query method with a messagepack response.""" + example_response = bytes(bytearray.fromhex( + "81a7726573756c74739182ac73746174656d656e745f696400a673657269" + "65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661" + "6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000" + )) + + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + request_headers={"Accept": "application/x-msgpack"}, + headers={"Content-Type": "application/x-msgpack"}, + content=example_response + ) + rs = self.cli.query('select * from a') + + self.assertListEqual( + list(rs.get_points()), + [{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}] + ) + def test_select_into_post(self): """Test SELECT.*INTO is POSTed.""" example_response = ( @@ -632,6 +787,66 @@ def test_get_list_measurements(self): [{'name': 'cpu'}, {'name': 'disk'}] ) + def test_get_list_series(self): + """Test get a list of series from the database.""" + data = {'results': [ + {'series': [ + { + 'values': [ + ['cpu_load_short,host=server01,region=us-west'], + ['memory_usage,host=server02,region=us-east']], + 'columns': ['key'] + } + ]} + ]} + + with _mocked_session(self.cli, 'get', 200, json.dumps(data)): + self.assertListEqual( + self.cli.get_list_series(), + ['cpu_load_short,host=server01,region=us-west', + 'memory_usage,host=server02,region=us-east']) + + def test_get_list_series_with_measurement(self): + """Test get a list of series from the database by filter.""" + data = {'results': [ + {'series': [ + { + 'values': [ + ['cpu_load_short,host=server01,region=us-west']], + 'columns': ['key'] + } + ]} + ]} + + with _mocked_session(self.cli, 'get', 200, json.dumps(data)): + self.assertListEqual( + self.cli.get_list_series(measurement='cpu_load_short'), + ['cpu_load_short,host=server01,region=us-west']) + + def test_get_list_series_with_tags(self): + """Test get a list of series from the database by tags.""" + data = {'results': [ + {'series': [ + { + 'values': [ + ['cpu_load_short,host=server01,region=us-west']], + 'columns': ['key'] + } + ]} + ]} + + with _mocked_session(self.cli, 'get', 200, json.dumps(data)): + self.assertListEqual( + self.cli.get_list_series(tags={'region': 'us-west'}), + ['cpu_load_short,host=server01,region=us-west']) + + @raises(Exception) + def test_get_list_series_fails(self): + """Test get a list of series from the database but fail.""" + cli = InfluxDBClient('host', 8086, 'username', 'password') + with _mocked_session(cli, 'get', 401): + cli.get_list_series() + def test_create_retention_policy_default(self): """Test create default ret policy for TestInfluxDBClient object.""" example_response = '{"results":[{}]}' @@ -672,6 +887,49 @@ def test_create_retention_policy(self): '"db" duration 1d replication 4 shard duration 0s' ) + def test_create_retention_policy_shard_duration(self): + """Test create retention policy with a custom shard duration.""" + example_response = '{"results":[{}]}' + + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.POST, + "http://localhost:8086/query", + text=example_response + ) + self.cli.create_retention_policy( + 'somename2', '1d', 4, database='db', + shard_duration='1h' + ) + + self.assertEqual( + m.last_request.qs['q'][0], + 'create retention policy "somename2" on ' + '"db" duration 1d replication 4 shard duration 1h' + ) + + def test_create_retention_policy_shard_duration_default(self): + """Test create retention policy with a default shard duration.""" + example_response = '{"results":[{}]}' + + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.POST, + "http://localhost:8086/query", + text=example_response + ) + self.cli.create_retention_policy( + 'somename3', '1d', 4, database='db', + shard_duration='1h', default=True + ) + + self.assertEqual( + m.last_request.qs['q'][0], + 'create retention policy "somename3" on ' + '"db" duration 1d replication 4 shard duration 1h ' + 'default' + ) + def test_alter_retention_policy(self): """Test alter retention policy for TestInfluxDBClient object.""" example_response = '{"results":[{}]}' @@ -1001,7 +1259,7 @@ def test_revoke_privilege_invalid(self): self.cli.revoke_privilege('', 'testdb', 'test') def test_get_list_privileges(self): - """Tst get list of privs for TestInfluxDBClient object.""" + """Test get list of privs for TestInfluxDBClient object.""" data = {'results': [ {'series': [ {'columns': ['database', 'privilege'], @@ -1027,24 +1285,127 @@ def test_get_list_privileges_fails(self): with _mocked_session(cli, 'get', 401): cli.get_list_privileges('test') + def test_get_list_continuous_queries(self): + """Test getting a list of continuous queries.""" + data = { + "results": [ + { + "statement_id": 0, + "series": [ + { + "name": "testdb01", + "columns": ["name", "query"], + "values": [["testname01", "testquery01"], + ["testname02", "testquery02"]] + }, + { + "name": "testdb02", + "columns": ["name", "query"], + "values": [["testname03", "testquery03"]] + }, + { + "name": "testdb03", + "columns": ["name", "query"] + } + ] + } + ] + } + + with _mocked_session(self.cli, 'get', 200, json.dumps(data)): + self.assertListEqual( + self.cli.get_list_continuous_queries(), + [ + { + 'testdb01': [ + {'name': 'testname01', 'query': 'testquery01'}, + {'name': 'testname02', 'query': 'testquery02'} + ] + }, + { + 'testdb02': [ + {'name': 'testname03', 'query': 'testquery03'} + ] + }, + { + 'testdb03': [] + } + ] + ) + + @raises(Exception) + def test_get_list_continuous_queries_fails(self): + """Test failing to get a list of continuous queries.""" + with _mocked_session(self.cli, 'get', 400): + self.cli.get_list_continuous_queries() + + def test_create_continuous_query(self): + """Test continuous query creation.""" + data = {"results": [{}]} + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + text=json.dumps(data) + ) + query = 'SELECT count("value") INTO "6_months"."events" FROM ' \ + '"events" GROUP BY time(10m)' + self.cli.create_continuous_query('cq_name', query, 'db_name') + self.assertEqual( + m.last_request.qs['q'][0], + 'create continuous query "cq_name" on "db_name" begin select ' + 'count("value") into "6_months"."events" from "events" group ' + 'by time(10m) end' + ) + self.cli.create_continuous_query('cq_name', query, 'db_name', + 'EVERY 10s FOR 2m') + self.assertEqual( + m.last_request.qs['q'][0], + 'create continuous query "cq_name" on "db_name" resample ' + 'every 10s for 2m begin select count("value") into ' + '"6_months"."events" from "events" group by time(10m) end' + ) + + @raises(Exception) + def test_create_continuous_query_fails(self): + """Test failing to create a continuous query.""" + with _mocked_session(self.cli, 'get', 400): + self.cli.create_continuous_query('cq_name', 'select', 'db_name') + + def test_drop_continuous_query(self): + """Test dropping a continuous query.""" + data = {"results": [{}]} + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + text=json.dumps(data) + ) + self.cli.drop_continuous_query('cq_name', 'db_name') + self.assertEqual( + m.last_request.qs['q'][0], + 'drop continuous query "cq_name" on "db_name"' + ) + + @raises(Exception) + def test_drop_continuous_query_fails(self): + """Test failing to drop a continuous query.""" + with _mocked_session(self.cli, 'get', 400): + self.cli.drop_continuous_query('cq_name', 'db_name') + def test_invalid_port_fails(self): """Test invalid port fail for TestInfluxDBClient object.""" with self.assertRaises(ValueError): InfluxDBClient('host', '80/redir', 'username', 'password') def test_chunked_response(self): - """Test chunked reponse for TestInfluxDBClient object.""" + """Test chunked response for TestInfluxDBClient object.""" example_response = \ - u'{"results":[{"statement_id":0,"series":' \ - '[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \ - '[["value","integer"]]}],"partial":true}]}\n{"results":' \ - '[{"statement_id":0,"series":[{"name":"iops","columns":' \ - '["fieldKey","fieldType"],"values":[["value","integer"]]}],' \ - '"partial":true}]}\n{"results":[{"statement_id":0,"series":' \ - '[{"name":"load","columns":["fieldKey","fieldType"],"values":' \ - '[["value","integer"]]}],"partial":true}]}\n{"results":' \ - '[{"statement_id":0,"series":[{"name":"memory","columns":' \ - '["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n' + u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \ + '"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \ + 'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \ + '"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \ + '["df"],["mount"]]}]}]}\n' with requests_mock.Mocker() as m: m.register_uri( @@ -1052,23 +1413,125 @@ def test_chunked_response(self): "http://localhost:8086/query", text=example_response ) - response = self.cli.query('show series limit 4 offset 0', + response = self.cli.query('show series', chunked=True, chunk_size=4) - self.assertTrue(len(response) == 4) - self.assertEqual(response.__repr__(), ResultSet( - {'series': [{'values': [['value', 'integer']], - 'name': 'cpu', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'iops', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'load', - 'columns': ['fieldKey', 'fieldType']}, - {'values': [['value', 'integer']], - 'name': 'memory', - 'columns': ['fieldKey', 'fieldType']}]} - ).__repr__()) + res = list(response) + self.assertTrue(len(res) == 2) + self.assertEqual(res[0].__repr__(), ResultSet( + {'series': [{ + 'columns': ['key'], + 'values': [['cpu'], ['memory'], ['iops'], ['network']] + }]}).__repr__()) + self.assertEqual(res[1].__repr__(), ResultSet( + {'series': [{ + 'columns': ['key'], + 'values': [['qps'], ['uptime'], ['df'], ['mount']] + }]}).__repr__()) + + def test_auth_default(self): + """Test auth with default settings.""" + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/ping", + status_code=204, + headers={'X-Influxdb-Version': '1.2.3'} + ) + + cli = InfluxDBClient() + cli.ping() + + self.assertEqual(m.last_request.headers["Authorization"], + "Basic cm9vdDpyb290") + + def test_auth_username_password(self): + """Test auth with custom username and password.""" + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/ping", + status_code=204, + headers={'X-Influxdb-Version': '1.2.3'} + ) + + cli = InfluxDBClient(username='my-username', + password='my-password') + cli.ping() + + self.assertEqual(m.last_request.headers["Authorization"], + "Basic bXktdXNlcm5hbWU6bXktcGFzc3dvcmQ=") + + def test_auth_username_password_none(self): + """Test auth with not defined username or password.""" + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/ping", + status_code=204, + headers={'X-Influxdb-Version': '1.2.3'} + ) + + cli = InfluxDBClient(username=None, password=None) + cli.ping() + self.assertFalse('Authorization' in m.last_request.headers) + + cli = InfluxDBClient(username=None) + cli.ping() + self.assertFalse('Authorization' in m.last_request.headers) + + cli = InfluxDBClient(password=None) + cli.ping() + self.assertFalse('Authorization' in m.last_request.headers) + + def test_auth_token(self): + """Test auth with custom authorization header.""" + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/ping", + status_code=204, + headers={'X-Influxdb-Version': '1.2.3'} + ) + + cli = InfluxDBClient(username=None, password=None, + headers={"Authorization": "my-token"}) + cli.ping() + self.assertEqual(m.last_request.headers["Authorization"], + "my-token") + + def test_custom_socket_options(self): + """Test custom socket options.""" + test_socket_options = HTTPConnection.default_socket_options + \ + [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), + (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60), + (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 15)] + + cli = InfluxDBClient(username=None, password=None, + socket_options=test_socket_options) + + self.assertEquals(cli._session.adapters.get("http://").socket_options, + test_socket_options) + self.assertEquals(cli._session.adapters.get("http://").poolmanager. + connection_pool_kw.get("socket_options"), + test_socket_options) + + connection_pool = cli._session.adapters.get("http://").poolmanager \ + .connection_from_url( + url="http://localhost:8086") + new_connection = connection_pool._new_conn() + self.assertEquals(new_connection.socket_options, test_socket_options) + + def test_none_socket_options(self): + """Test default socket options.""" + cli = InfluxDBClient(username=None, password=None) + self.assertEquals(cli._session.adapters.get("http://").socket_options, + None) + connection_pool = cli._session.adapters.get("http://").poolmanager \ + .connection_from_url( + url="http://localhost:8086") + new_connection = connection_pool._new_conn() + self.assertEquals(new_connection.socket_options, + HTTPConnection.default_socket_options) class FakeClient(InfluxDBClient): diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 72447c89..87b8e0d8 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -13,8 +13,8 @@ import warnings import requests_mock -from influxdb.tests import skipIfPYpy, using_pypy from nose.tools import raises +from influxdb.tests import skip_if_pypy, using_pypy from .client_test import _mocked_session @@ -22,9 +22,10 @@ import pandas as pd from pandas.util.testing import assert_frame_equal from influxdb import DataFrameClient + import numpy as np -@skipIfPYpy +@skip_if_pypy class TestDataFrameClient(unittest.TestCase): """Set up a test DataFrameClient object.""" @@ -388,6 +389,71 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): self.assertEqual(m.last_request.body, expected) + def test_write_points_from_dataframe_with_leading_none_column(self): + """write_points detect erroneous leading comma for null first field.""" + dataframe = pd.DataFrame( + dict( + first=[1, None, None, 8, 9], + second=[2, None, None, None, 10], + third=[3, 4.1, None, None, 11], + first_tag=["one", None, None, "eight", None], + second_tag=["two", None, None, None, None], + third_tag=["three", "four", None, None, None], + comment=[ + "All columns filled", + "First two of three empty", + "All empty", + "Last two of three empty", + "Empty tags with values", + ] + ), + index=pd.date_range( + start=pd.to_datetime('2018-01-01'), + freq='1D', + periods=5, + ) + ) + expected = ( + b'foo,first_tag=one,second_tag=two,third_tag=three' + b' comment="All columns filled",first=1.0,second=2.0,third=3.0' + b' 1514764800000000000\n' + b'foo,third_tag=four' + b' comment="First two of three empty",third=4.1' + b' 1514851200000000000\n' + b'foo comment="All empty" 1514937600000000000\n' + b'foo,first_tag=eight' + b' comment="Last two of three empty",first=8.0' + b' 1515024000000000000\n' + b'foo' + b' comment="Empty tags with values",first=9.0,second=10.0' + b',third=11.0' + b' 1515110400000000000\n' + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + colnames = [ + "first_tag", + "second_tag", + "third_tag", + "comment", + "first", + "second", + "third" + ] + cli.write_points(dataframe.loc[:, colnames], 'foo', + tag_columns=[ + "first_tag", + "second_tag", + "third_tag"]) + + self.assertEqual(m.last_request.body, expected) + def test_write_points_from_dataframe_with_numeric_precision(self): """Test write points from df with numeric precision.""" now = pd.Timestamp('1970-01-01 00:00+00:00') @@ -396,10 +462,16 @@ def test_write_points_from_dataframe_with_numeric_precision(self): ["2", 2, 2.2222222222222]], index=[now, now + timedelta(hours=1)]) - expected_default_precision = ( - b'foo,hello=there 0=\"1\",1=1i,2=1.11111111111 0\n' - b'foo,hello=there 0=\"2\",1=2i,2=2.22222222222 3600000000000\n' - ) + if np.lib.NumpyVersion(np.__version__) <= '1.13.3': + expected_default_precision = ( + b'foo,hello=there 0=\"1\",1=1i,2=1.11111111111 0\n' + b'foo,hello=there 0=\"2\",1=2i,2=2.22222222222 3600000000000\n' + ) + else: + expected_default_precision = ( + b'foo,hello=there 0=\"1\",1=1i,2=1.1111111111111 0\n' + b'foo,hello=there 0=\"2\",1=2i,2=2.2222222222222 3600000000000\n' # noqa E501 line too long + ) expected_specified_precision = ( b'foo,hello=there 0=\"1\",1=1i,2=1.1111 0\n' @@ -419,6 +491,9 @@ def test_write_points_from_dataframe_with_numeric_precision(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, "foo", {"hello": "there"}) + print(expected_default_precision) + print(m.last_request.body) + self.assertEqual(m.last_request.body, expected_default_precision) cli = DataFrameClient(database='db') @@ -802,7 +877,7 @@ def test_query_into_dataframe(self): {"measurement": "network", "tags": {"direction": ""}, "columns": ["time", "value"], - "values":[["2009-11-10T23:00:00Z", 23422]] + "values": [["2009-11-10T23:00:00Z", 23422]] }, {"measurement": "network", "tags": {"direction": "in"}, @@ -818,13 +893,15 @@ def test_query_into_dataframe(self): pd1 = pd.DataFrame( [[23422]], columns=['value'], index=pd.to_datetime(["2009-11-10T23:00:00Z"])) - pd1.index = pd1.index.tz_localize('UTC') + if pd1.index.tzinfo is None: + pd1.index = pd1.index.tz_localize('UTC') pd2 = pd.DataFrame( [[23422], [23422], [23422]], columns=['value'], index=pd.to_datetime(["2009-11-10T23:00:00Z", "2009-11-10T23:00:00Z", "2009-11-10T23:00:00Z"])) - pd2.index = pd2.index.tz_localize('UTC') + if pd2.index.tzinfo is None: + pd2.index = pd2.index.tz_localize('UTC') expected = { ('network', (('direction', ''),)): pd1, ('network', (('direction', 'in'),)): pd2 @@ -837,7 +914,7 @@ def test_query_into_dataframe(self): assert_frame_equal(expected[k], result[k]) def test_multiquery_into_dataframe(self): - """Test multiquyer into df for TestDataFrameClient object.""" + """Test multiquery into df for TestDataFrameClient object.""" data = { "results": [ { @@ -871,22 +948,118 @@ def test_multiquery_into_dataframe(self): index=pd.to_datetime([ "2015-01-29 21:55:43.702900257+0000", "2015-01-29 21:55:43.702900257+0000", - "2015-06-11 20:46:02+0000"])).tz_localize('UTC') + "2015-06-11 20:46:02+0000"])) + if pd1.index.tzinfo is None: + pd1.index = pd1.index.tz_localize('UTC') pd2 = pd.DataFrame( [[3]], columns=['count'], - index=pd.to_datetime(["1970-01-01 00:00:00+00:00"]))\ - .tz_localize('UTC') + index=pd.to_datetime(["1970-01-01 00:00:00+00:00"])) + if pd2.index.tzinfo is None: + pd2.index = pd2.index.tz_localize('UTC') expected = [{'cpu_load_short': pd1}, {'cpu_load_short': pd2}] cli = DataFrameClient('host', 8086, 'username', 'password', 'db') - iql = "SELECT value FROM cpu_load_short WHERE region='us-west';"\ - "SELECT count(value) FROM cpu_load_short WHERE region='us-west'" + iql = "SELECT value FROM cpu_load_short WHERE region=$region;"\ + "SELECT count(value) FROM cpu_load_short WHERE region=$region" + bind_params = {'region': 'us-west'} with _mocked_session(cli, 'GET', 200, data): - result = cli.query(iql) + result = cli.query(iql, bind_params=bind_params) for r, e in zip(result, expected): for k in e: assert_frame_equal(e[k], r[k]) + def test_multiquery_into_dataframe_dropna(self): + """Test multiquery into df for TestDataFrameClient object.""" + data = { + "results": [ + { + "series": [ + { + "name": "cpu_load_short", + "columns": ["time", "value", "value2", "value3"], + "values": [ + ["2015-01-29T21:55:43.702900257Z", + 0.55, 0.254, np.NaN], + ["2015-01-29T21:55:43.702900257Z", + 23422, 122878, np.NaN], + ["2015-06-11T20:46:02Z", + 0.64, 0.5434, np.NaN] + ] + } + ] + }, { + "series": [ + { + "name": "cpu_load_short", + "columns": ["time", "count"], + "values": [ + ["1970-01-01T00:00:00Z", 3] + ] + } + ] + } + ] + } + + pd1 = pd.DataFrame( + [[0.55, 0.254, np.NaN], + [23422.0, 122878, np.NaN], + [0.64, 0.5434, np.NaN]], + columns=['value', 'value2', 'value3'], + index=pd.to_datetime([ + "2015-01-29 21:55:43.702900257+0000", + "2015-01-29 21:55:43.702900257+0000", + "2015-06-11 20:46:02+0000"])) + + if pd1.index.tzinfo is None: + pd1.index = pd1.index.tz_localize('UTC') + + pd1_dropna = pd.DataFrame( + [[0.55, 0.254], [23422.0, 122878], [0.64, 0.5434]], + columns=['value', 'value2'], + index=pd.to_datetime([ + "2015-01-29 21:55:43.702900257+0000", + "2015-01-29 21:55:43.702900257+0000", + "2015-06-11 20:46:02+0000"])) + + if pd1_dropna.index.tzinfo is None: + pd1_dropna.index = pd1_dropna.index.tz_localize('UTC') + + pd2 = pd.DataFrame( + [[3]], columns=['count'], + index=pd.to_datetime(["1970-01-01 00:00:00+00:00"])) + + if pd2.index.tzinfo is None: + pd2.index = pd2.index.tz_localize('UTC') + + expected_dropna_true = [ + {'cpu_load_short': pd1_dropna}, + {'cpu_load_short': pd2}] + expected_dropna_false = [ + {'cpu_load_short': pd1}, + {'cpu_load_short': pd2}] + + cli = DataFrameClient('host', 8086, 'username', 'password', 'db') + iql = "SELECT value FROM cpu_load_short WHERE region=$region;" \ + "SELECT count(value) FROM cpu_load_short WHERE region=$region" + bind_params = {'region': 'us-west'} + + for dropna in [True, False]: + with _mocked_session(cli, 'GET', 200, data): + result = cli.query(iql, bind_params=bind_params, dropna=dropna) + expected = \ + expected_dropna_true if dropna else expected_dropna_false + for r, e in zip(result, expected): + for k in e: + assert_frame_equal(e[k], r[k]) + + # test default value (dropna = True) + with _mocked_session(cli, 'GET', 200, data): + result = cli.query(iql, bind_params=bind_params) + for r, e in zip(result, expected_dropna_true): + for k in e: + assert_frame_equal(e[k], r[k]) + def test_query_with_empty_result(self): """Test query with empty results in TestDataFrameClient object.""" cli = DataFrameClient('host', 8086, 'username', 'password', 'db') @@ -951,3 +1124,225 @@ def test_dsn_constructor(self): client = DataFrameClient.from_dsn('influxdb://localhost:8086') self.assertIsInstance(client, DataFrameClient) self.assertEqual('http://localhost:8086', client._baseurl) + + def test_write_points_from_dataframe_with_nan_line(self): + """Test write points from dataframe with Nan lines.""" + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]], + index=[now, now + timedelta(hours=1)], + columns=["column_one", "column_two", + "column_three"]) + expected = ( + b"foo column_one=\"1\",column_two=1i 0\n" + b"foo column_one=\"2\",column_two=2i " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', protocol='line') + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', tags=None, protocol='line') + self.assertEqual(m.last_request.body, expected) + + def test_write_points_from_dataframe_with_nan_json(self): + """Test write points from json with NaN lines.""" + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]], + index=[now, now + timedelta(hours=1)], + columns=["column_one", "column_two", + "column_three"]) + expected = ( + b"foo column_one=\"1\",column_two=1i 0\n" + b"foo column_one=\"2\",column_two=2i " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', protocol='json') + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', tags=None, protocol='json') + self.assertEqual(m.last_request.body, expected) + + def test_write_points_from_dataframe_with_tags_and_nan_line(self): + """Test write points from dataframe with NaN lines and tags.""" + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, np.inf], + ['red', 0, "2", 2, np.nan]], + index=[now, now + timedelta(hours=1)], + columns=["tag_one", "tag_two", "column_one", + "column_two", "column_three"]) + expected = ( + b"foo,tag_one=blue,tag_two=1 " + b"column_one=\"1\",column_two=1i " + b"0\n" + b"foo,tag_one=red,tag_two=0 " + b"column_one=\"2\",column_two=2i " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', protocol='line', + tag_columns=['tag_one', 'tag_two']) + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', tags=None, protocol='line', + tag_columns=['tag_one', 'tag_two']) + self.assertEqual(m.last_request.body, expected) + + def test_write_points_from_dataframe_with_tags_and_nan_json(self): + """Test write points from json with NaN lines and tags.""" + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, np.inf], + ['red', 0, "2", 2, np.nan]], + index=[now, now + timedelta(hours=1)], + columns=["tag_one", "tag_two", "column_one", + "column_two", "column_three"]) + expected = ( + b"foo,tag_one=blue,tag_two=1 " + b"column_one=\"1\",column_two=1i " + b"0\n" + b"foo,tag_one=red,tag_two=0 " + b"column_one=\"2\",column_two=2i " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', protocol='json', + tag_columns=['tag_one', 'tag_two']) + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', tags=None, protocol='json', + tag_columns=['tag_one', 'tag_two']) + self.assertEqual(m.last_request.body, expected) + + def test_query_custom_index(self): + """Test query with custom indexes.""" + data = { + "results": [ + { + "series": [ + { + "name": "cpu_load_short", + "columns": ["time", "value", "host"], + "values": [ + [1, 0.55, "local"], + [2, 23422, "local"], + [3, 0.64, "local"] + ] + } + ] + } + ] + } + + cli = DataFrameClient('host', 8086, 'username', 'password', 'db') + iql = "SELECT value FROM cpu_load_short WHERE region=$region;" \ + "SELECT count(value) FROM cpu_load_short WHERE region=$region" + bind_params = {'region': 'us-west'} + with _mocked_session(cli, 'GET', 200, data): + result = cli.query(iql, bind_params=bind_params, + data_frame_index=["time", "host"]) + + _data_frame = result['cpu_load_short'] + print(_data_frame) + + self.assertListEqual(["time", "host"], + list(_data_frame.index.names)) + + def test_dataframe_nanosecond_precision(self): + """Test nanosecond precision.""" + for_df_dict = { + "nanFloats": [1.1, float('nan'), 3.3, 4.4], + "onlyFloats": [1.1, 2.2, 3.3, 4.4], + "strings": ['one_one', 'two_two', 'three_three', 'four_four'] + } + df = pd.DataFrame.from_dict(for_df_dict) + df['time'] = ['2019-10-04 06:27:19.850557111+00:00', + '2019-10-04 06:27:19.850557184+00:00', + '2019-10-04 06:27:42.251396864+00:00', + '2019-10-04 06:27:42.251396974+00:00'] + df['time'] = pd.to_datetime(df['time'], unit='ns') + df = df.set_index('time') + + expected = ( + b'foo nanFloats=1.1,onlyFloats=1.1,strings="one_one" 1570170439850557111\n' # noqa E501 line too long + b'foo onlyFloats=2.2,strings="two_two" 1570170439850557184\n' # noqa E501 line too long + b'foo nanFloats=3.3,onlyFloats=3.3,strings="three_three" 1570170462251396864\n' # noqa E501 line too long + b'foo nanFloats=4.4,onlyFloats=4.4,strings="four_four" 1570170462251396974\n' # noqa E501 line too long + ) + + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.POST, + "http://localhost:8086/write", + status_code=204 + ) + + cli = DataFrameClient(database='db') + cli.write_points(df, 'foo', time_precision='n') + + self.assertEqual(m.last_request.body, expected) + + def test_dataframe_nanosecond_precision_one_microsecond(self): + """Test nanosecond precision within one microsecond.""" + # 1 microsecond = 1000 nanoseconds + start = np.datetime64('2019-10-04T06:27:19.850557000') + end = np.datetime64('2019-10-04T06:27:19.850558000') + + # generate timestamps with nanosecond precision + timestamps = np.arange( + start, + end + np.timedelta64(1, 'ns'), + np.timedelta64(1, 'ns') + ) + # generate values + values = np.arange(0.0, len(timestamps)) + + df = pd.DataFrame({'value': values}, index=timestamps) + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.POST, + "http://localhost:8086/write", + status_code=204 + ) + + cli = DataFrameClient(database='db') + cli.write_points(df, 'foo', time_precision='n') + + lines = m.last_request.body.decode('utf-8').split('\n') + self.assertEqual(len(lines), 1002) + + for index, line in enumerate(lines): + if index == 1001: + self.assertEqual(line, '') + continue + self.assertEqual( + line, + f"foo value={index}.0 157017043985055{7000 + index:04}" + ) diff --git a/influxdb/tests/helper_test.py b/influxdb/tests/helper_test.py index 6f24e85d..6737f921 100644 --- a/influxdb/tests/helper_test.py +++ b/influxdb/tests/helper_test.py @@ -47,6 +47,14 @@ class Meta: TestSeriesHelper.MySeriesHelper = MySeriesHelper + def setUp(self): + """Check that MySeriesHelper has empty datapoints.""" + super(TestSeriesHelper, self).setUp() + self.assertEqual( + TestSeriesHelper.MySeriesHelper._json_body_(), + [], + 'Resetting helper in teardown did not empty datapoints.') + def tearDown(self): """Deconstruct the TestSeriesHelper object.""" super(TestSeriesHelper, self).tearDown() @@ -310,8 +318,19 @@ class Meta: series_name = 'events.stats.{server_name}' + class InvalidTimePrecision(SeriesHelper): + """Define instance of SeriesHelper for invalid time precision.""" + + class Meta: + """Define metadata for InvalidTimePrecision.""" + + series_name = 'events.stats.{server_name}' + time_precision = "ks" + fields = ['time', 'server_name'] + autocommit = True + for cls in [MissingMeta, MissingClient, MissingFields, - MissingSeriesName]: + MissingSeriesName, InvalidTimePrecision]: self.assertRaises( AttributeError, cls, **{'time': 159, 'server_name': 'us.east-1'}) @@ -365,3 +384,54 @@ class Meta: .format(WarnBulkSizeNoEffect)) self.assertIn('has no affect', str(w[-1].message), 'Warning message did not contain "has not affect".') + + def testSeriesWithRetentionPolicy(self): + """Test that the data is saved with the specified retention policy.""" + my_policy = 'my_policy' + + class RetentionPolicySeriesHelper(SeriesHelper): + + class Meta: + client = InfluxDBClient() + series_name = 'events.stats.{server_name}' + fields = ['some_stat', 'time'] + tags = ['server_name', 'other_tag'] + bulk_size = 2 + autocommit = True + retention_policy = my_policy + + fake_write_points = mock.MagicMock() + RetentionPolicySeriesHelper( + server_name='us.east-1', some_stat=159, other_tag='gg') + RetentionPolicySeriesHelper._client.write_points = fake_write_points + RetentionPolicySeriesHelper( + server_name='us.east-1', some_stat=158, other_tag='aa') + + kall = fake_write_points.call_args + args, kwargs = kall + self.assertTrue('retention_policy' in kwargs) + self.assertEqual(kwargs['retention_policy'], my_policy) + + def testSeriesWithoutRetentionPolicy(self): + """Test that the data is saved without any retention policy.""" + class NoRetentionPolicySeriesHelper(SeriesHelper): + + class Meta: + client = InfluxDBClient() + series_name = 'events.stats.{server_name}' + fields = ['some_stat', 'time'] + tags = ['server_name', 'other_tag'] + bulk_size = 2 + autocommit = True + + fake_write_points = mock.MagicMock() + NoRetentionPolicySeriesHelper( + server_name='us.east-1', some_stat=159, other_tag='gg') + NoRetentionPolicySeriesHelper._client.write_points = fake_write_points + NoRetentionPolicySeriesHelper( + server_name='us.east-1', some_stat=158, other_tag='aa') + + kall = fake_write_points.call_args + args, kwargs = kall + self.assertTrue('retention_policy' in kwargs) + self.assertEqual(kwargs['retention_policy'], None) diff --git a/influxdb/tests/influxdb08/dataframe_client_test.py b/influxdb/tests/influxdb08/dataframe_client_test.py index 6e6fa2cc..0a766af0 100644 --- a/influxdb/tests/influxdb08/dataframe_client_test.py +++ b/influxdb/tests/influxdb08/dataframe_client_test.py @@ -12,7 +12,7 @@ from nose.tools import raises -from influxdb.tests import skipIfPYpy, using_pypy +from influxdb.tests import skip_if_pypy, using_pypy from .client_test import _mocked_session @@ -22,7 +22,7 @@ from influxdb.influxdb08 import DataFrameClient -@skipIfPYpy +@skip_if_pypy class TestDataFrameClient(unittest.TestCase): """Define the DataFramClient test object.""" diff --git a/influxdb/tests/server_tests/base.py b/influxdb/tests/server_tests/base.py index f4bd3ff9..45a9ec80 100644 --- a/influxdb/tests/server_tests/base.py +++ b/influxdb/tests/server_tests/base.py @@ -36,6 +36,15 @@ def _setup_influxdb_server(inst): database='db') +def _setup_gzip_client(inst): + inst.cli = InfluxDBClient('localhost', + inst.influxd_inst.http_port, + 'root', + '', + database='db', + gzip=True) + + def _teardown_influxdb_server(inst): remove_tree = sys.exc_info() == (None, None, None) inst.influxd_inst.close(remove_tree=remove_tree) @@ -51,8 +60,15 @@ class SingleTestCaseWithServerMixin(object): # 'influxdb_template_conf' attribute must be set # on the TestCase class or instance. - setUp = _setup_influxdb_server - tearDown = _teardown_influxdb_server + @classmethod + def setUp(cls): + """Set up an instance of the SingleTestCaseWithServerMixin.""" + _setup_influxdb_server(cls) + + @classmethod + def tearDown(cls): + """Tear down an instance of the SingleTestCaseWithServerMixin.""" + _teardown_influxdb_server(cls) class ManyTestCasesWithServerMixin(object): @@ -82,3 +98,41 @@ def tearDownClass(cls): def tearDown(self): """Deconstruct an instance of ManyTestCasesWithServerMixin.""" self.cli.drop_database('db') + + +class SingleTestCaseWithServerGzipMixin(object): + """Define the single testcase with server with gzip client mixin. + + Same as the SingleTestCaseWithServerGzipMixin but the InfluxDBClient has + gzip=True + """ + + @classmethod + def setUp(cls): + """Set up an instance of the SingleTestCaseWithServerGzipMixin.""" + _setup_influxdb_server(cls) + _setup_gzip_client(cls) + + @classmethod + def tearDown(cls): + """Tear down an instance of the SingleTestCaseWithServerMixin.""" + _teardown_influxdb_server(cls) + + +class ManyTestCasesWithServerGzipMixin(object): + """Define the many testcase with server with gzip client mixin. + + Same as the ManyTestCasesWithServerMixin but the InfluxDBClient has + gzip=True. + """ + + @classmethod + def setUpClass(cls): + """Set up an instance of the ManyTestCasesWithServerGzipMixin.""" + _setup_influxdb_server(cls) + _setup_gzip_client(cls) + + @classmethod + def tearDown(cls): + """Tear down an instance of the SingleTestCaseWithServerMixin.""" + _teardown_influxdb_server(cls) diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index 2f8a2097..a0263243 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -23,9 +23,11 @@ from influxdb import InfluxDBClient from influxdb.exceptions import InfluxDBClientError -from influxdb.tests import skipIfPYpy, using_pypy, skipServerTests +from influxdb.tests import skip_if_pypy, using_pypy, skip_server_tests from influxdb.tests.server_tests.base import ManyTestCasesWithServerMixin from influxdb.tests.server_tests.base import SingleTestCaseWithServerMixin +from influxdb.tests.server_tests.base import ManyTestCasesWithServerGzipMixin +from influxdb.tests.server_tests.base import SingleTestCaseWithServerGzipMixin # By default, raise exceptions on warnings warnings.simplefilter('error', FutureWarning) @@ -82,7 +84,7 @@ def point(series_name, timestamp=None, tags=None, **fields): ] if not using_pypy: - dummy_pointDF = { + dummy_point_df = { "measurement": "cpu_load_short", "tags": {"host": "server01", "region": "us-west"}, @@ -90,7 +92,7 @@ def point(series_name, timestamp=None, tags=None, **fields): [[0.64]], columns=['value'], index=pd.to_datetime(["2009-11-10T23:00:00Z"])) } - dummy_pointsDF = [{ + dummy_points_df = [{ "measurement": "cpu_load_short", "tags": {"host": "server01", "region": "us-west"}, "dataframe": pd.DataFrame( @@ -120,7 +122,7 @@ def point(series_name, timestamp=None, tags=None, **fields): ] -@skipServerTests +@skip_server_tests class SimpleTests(SingleTestCaseWithServerMixin, unittest.TestCase): """Define the class of simple tests.""" @@ -267,7 +269,7 @@ def test_invalid_port_fails(self): InfluxDBClient('host', '80/redir', 'username', 'password') -@skipServerTests +@skip_server_tests class CommonTests(ManyTestCasesWithServerMixin, unittest.TestCase): """Define a class to handle common tests for the server.""" @@ -293,15 +295,15 @@ def test_write_points(self): """Test writing points to the server.""" self.assertIs(True, self.cli.write_points(dummy_point)) - @skipIfPYpy + @skip_if_pypy def test_write_points_DF(self): """Test writing points with dataframe.""" self.assertIs( True, self.cliDF.write_points( - dummy_pointDF['dataframe'], - dummy_pointDF['measurement'], - dummy_pointDF['tags'] + dummy_point_df['dataframe'], + dummy_point_df['measurement'], + dummy_point_df['tags'] ) ) @@ -342,7 +344,7 @@ def test_write_points_check_read_DF(self): rsp = self.cliDF.query('SELECT * FROM cpu_load_short') assert_frame_equal( rsp['cpu_load_short'], - dummy_pointDF['dataframe'] + dummy_point_df['dataframe'] ) # Query with Tags @@ -351,7 +353,7 @@ def test_write_points_check_read_DF(self): assert_frame_equal( rsp[('cpu_load_short', (('host', 'server01'), ('region', 'us-west')))], - dummy_pointDF['dataframe'] + dummy_point_df['dataframe'] ) def test_write_multiple_points_different_series(self): @@ -407,21 +409,21 @@ def test_write_multiple_points_different_series_DF(self): for i in range(2): self.assertIs( True, self.cliDF.write_points( - dummy_pointsDF[i]['dataframe'], - dummy_pointsDF[i]['measurement'], - dummy_pointsDF[i]['tags'])) + dummy_points_df[i]['dataframe'], + dummy_points_df[i]['measurement'], + dummy_points_df[i]['tags'])) time.sleep(1) rsp = self.cliDF.query('SELECT * FROM cpu_load_short') assert_frame_equal( rsp['cpu_load_short'], - dummy_pointsDF[0]['dataframe'] + dummy_points_df[0]['dataframe'] ) rsp = self.cliDF.query('SELECT * FROM memory') assert_frame_equal( rsp['memory'], - dummy_pointsDF[1]['dataframe'] + dummy_points_df[1]['dataframe'] ) def test_write_points_batch(self): @@ -440,7 +442,36 @@ def test_write_points_batch(self): batch_size=2) time.sleep(5) net_in = self.cli.query("SELECT value FROM network " - "WHERE direction='in'").raw + "WHERE direction=$dir", + bind_params={'dir': 'in'} + ).raw + net_out = self.cli.query("SELECT value FROM network " + "WHERE direction='out'").raw + cpu = self.cli.query("SELECT value FROM cpu_usage").raw + self.assertIn(123, net_in['series'][0]['values'][0]) + self.assertIn(12, net_out['series'][0]['values'][0]) + self.assertIn(12.34, cpu['series'][0]['values'][0]) + + def test_write_points_batch_generator(self): + """Test writing points in a batch from a generator.""" + dummy_points = [ + {"measurement": "cpu_usage", "tags": {"unit": "percent"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, + {"measurement": "network", "tags": {"direction": "in"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, + {"measurement": "network", "tags": {"direction": "out"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} + ] + dummy_points_generator = (point for point in dummy_points) + self.cli.write_points(points=dummy_points_generator, + tags={"host": "server01", + "region": "us-west"}, + batch_size=2) + time.sleep(5) + net_in = self.cli.query("SELECT value FROM network " + "WHERE direction=$dir", + bind_params={'dir': 'in'} + ).raw net_out = self.cli.query("SELECT value FROM network " "WHERE direction='out'").raw cpu = self.cli.query("SELECT value FROM cpu_usage").raw @@ -720,6 +751,36 @@ def test_drop_retention_policy(self): rsp ) + def test_create_continuous_query(self): + """Test continuous query creation.""" + self.cli.create_retention_policy('some_rp', '1d', 1) + query = 'select count("value") into "some_rp"."events" from ' \ + '"events" group by time(10m)' + self.cli.create_continuous_query('test_cq', query, 'db') + cqs = self.cli.get_list_continuous_queries() + expected_cqs = [ + { + 'db': [ + { + 'name': 'test_cq', + 'query': 'CREATE CONTINUOUS QUERY test_cq ON db ' + 'BEGIN SELECT count(value) INTO ' + 'db.some_rp.events FROM db.autogen.events ' + 'GROUP BY time(10m) END' + } + ] + } + ] + self.assertEqual(cqs, expected_cqs) + + def test_drop_continuous_query(self): + """Test continuous query drop.""" + self.test_create_continuous_query() + self.cli.drop_continuous_query('test_cq', 'db') + cqs = self.cli.get_list_continuous_queries() + expected_cqs = [{'db': []}] + self.assertEqual(cqs, expected_cqs) + def test_issue_143(self): """Test for PR#143 from repo.""" pt = partial(point, 'a_series_name', timestamp='2015-03-30T16:16:37Z') @@ -785,8 +846,66 @@ def test_query_multiple_series(self): ] self.cli.write_points(pts) + def test_get_list_series(self): + """Test get a list of series from the database.""" + dummy_points = [ + { + "measurement": "cpu_load_short", + "tags": { + "host": "server01", + "region": "us-west" + }, + "time": "2009-11-10T23:00:00.123456Z", + "fields": { + "value": 0.64 + } + } + ] -@skipServerTests + dummy_points_2 = [ + { + "measurement": "memory_usage", + "tags": { + "host": "server02", + "region": "us-east" + }, + "time": "2009-11-10T23:00:00.123456Z", + "fields": { + "value": 80 + } + } + ] + + self.cli.write_points(dummy_points) + self.cli.write_points(dummy_points_2) + + self.assertEquals( + self.cli.get_list_series(), + ['cpu_load_short,host=server01,region=us-west', + 'memory_usage,host=server02,region=us-east'] + ) + + self.assertEquals( + self.cli.get_list_series(measurement='memory_usage'), + ['memory_usage,host=server02,region=us-east'] + ) + + self.assertEquals( + self.cli.get_list_series(measurement='memory_usage'), + ['memory_usage,host=server02,region=us-east'] + ) + + self.assertEquals( + self.cli.get_list_series(tags={'host': 'server02'}), + ['memory_usage,host=server02,region=us-east']) + + self.assertEquals( + self.cli.get_list_series( + measurement='cpu_load_short', tags={'host': 'server02'}), + []) + + +@skip_server_tests class UdpTests(ManyTestCasesWithServerMixin, unittest.TestCase): """Define a class to test UDP series.""" @@ -823,3 +942,25 @@ def test_write_points_udp(self): ], list(rsp['cpu_load_short']) ) + + +# Run the tests again, but with gzip enabled this time +@skip_server_tests +class GzipSimpleTests(SimpleTests, SingleTestCaseWithServerGzipMixin): + """Repeat the simple tests with InfluxDBClient where gzip=True.""" + + pass + + +@skip_server_tests +class GzipCommonTests(CommonTests, ManyTestCasesWithServerGzipMixin): + """Repeat the common tests with InfluxDBClient where gzip=True.""" + + pass + + +@skip_server_tests +class GzipUdpTests(UdpTests, ManyTestCasesWithServerGzipMixin): + """Repeat the UDP tests with InfluxDBClient where gzip=True.""" + + pass diff --git a/influxdb/tests/server_tests/influxdb_instance.py b/influxdb/tests/server_tests/influxdb_instance.py index 1dcd7567..2dd823ff 100644 --- a/influxdb/tests/server_tests/influxdb_instance.py +++ b/influxdb/tests/server_tests/influxdb_instance.py @@ -7,7 +7,7 @@ from __future__ import unicode_literals import datetime -import distutils +import distutils.spawn import os import tempfile import shutil diff --git a/influxdb/tests/test_line_protocol.py b/influxdb/tests/test_line_protocol.py index a3d84793..5b344990 100644 --- a/influxdb/tests/test_line_protocol.py +++ b/influxdb/tests/test_line_protocol.py @@ -6,10 +6,12 @@ from __future__ import print_function from __future__ import unicode_literals -from datetime import datetime import unittest -from pytz import UTC, timezone +from datetime import datetime +from decimal import Decimal + +from pytz import UTC, timezone from influxdb import line_protocol @@ -42,7 +44,7 @@ def test_make_lines(self): self.assertEqual( line_protocol.make_lines(data), - 'test,backslash_tag=C:\\\\ ,integer_tag=2,string_tag=hello ' + 'test,backslash_tag=C:\\\\,integer_tag=2,string_tag=hello ' 'bool_val=True,float_val=1.1,int_val=1i,string_val="hello!"\n' ) @@ -115,6 +117,45 @@ def test_make_lines_unicode(self): 'test,unicode_tag=\'Привет!\' unicode_val="Привет!"\n' ) + def test_make_lines_empty_field_string(self): + """Test make lines with an empty string field.""" + data = { + "points": [ + { + "measurement": "test", + "fields": { + "string": "", + } + } + ] + } + + self.assertEqual( + line_protocol.make_lines(data), + 'test string=""\n' + ) + + def test_tag_value_newline(self): + """Test make lines with tag value contains newline.""" + data = { + "tags": { + "t1": "line1\nline2" + }, + "points": [ + { + "measurement": "test", + "fields": { + "val": "hello" + } + } + ] + } + + self.assertEqual( + line_protocol.make_lines(data), + 'test,t1=line1\\nline2 val="hello"\n' + ) + def test_quote_ident(self): """Test quote indentation in TestLineProtocol object.""" self.assertEqual( @@ -145,3 +186,20 @@ def test_float_with_long_decimal_fraction(self): line_protocol.make_lines(data), 'test float_val=1.0000000000000009\n' ) + + def test_float_with_long_decimal_fraction_as_type_decimal(self): + """Ensure precision is preserved when casting Decimal into strings.""" + data = { + "points": [ + { + "measurement": "test", + "fields": { + "float_val": Decimal(0.8289445733333332), + } + } + ] + } + self.assertEqual( + line_protocol.make_lines(data), + 'test float_val=0.8289445733333332\n' + ) diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..308aa62d --- /dev/null +++ b/mypy.ini @@ -0,0 +1,8 @@ +[mypy] +ignore_missing_imports = True +warn_unused_ignores = True +warn_unused_configs = True +warn_redundant_casts = True +warn_no_return = True +no_implicit_optional = True +strict_equality = True diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..1b68d94e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index db5f6f85..a3df3154 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ python-dateutil>=2.6.0 -pytz +pytz>=2016.10 requests>=2.17.0 six>=1.10.0 +msgpack>=0.5.0 diff --git a/setup.py b/setup.py index cd6e4e9b..8ac7d1a7 100755 --- a/setup.py +++ b/setup.py @@ -23,6 +23,11 @@ with open('requirements.txt', 'r') as f: requires = [x.strip() for x in f if x.strip()] +# Debugging: Print the requires values +print("install_requires values:") +for req in requires: + print(f"- {req}") + with open('test-requirements.txt', 'r') as f: test_requires = [x.strip() for x in f if x.strip()] @@ -42,7 +47,7 @@ tests_require=test_requires, install_requires=requires, extras_require={'test': test_requires}, - classifiers=( + classifiers=[ 'Development Status :: 3 - Alpha', 'Intended Audience :: Developers', 'License :: OSI Approved :: MIT License', @@ -55,5 +60,5 @@ 'Programming Language :: Python :: 3.6', 'Topic :: Software Development :: Libraries', 'Topic :: Software Development :: Libraries :: Python Modules', - ), + ], ) diff --git a/tox.ini b/tox.ini index d0d87fec..a1005abb 100644 --- a/tox.ini +++ b/tox.ini @@ -1,14 +1,21 @@ [tox] -envlist = py27, py35, py36, pypy, pypy3, flake8, pep257, coverage, docs +envlist = py27, py35, py36, py37, pypy, pypy3, flake8, pep257, coverage, docs, mypy [testenv] passenv = INFLUXDB_PYTHON_INFLUXD_PATH setenv = INFLUXDB_PYTHON_SKIP_SERVER_TESTS=False deps = -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt - py27,py34,py35,py36: pandas==0.20.1 - py27,py34,py35,py36: numpy==1.13.3 + py27: pandas==0.21.1 + py27: numpy==1.13.3 + py35: pandas==0.22.0 + py35: numpy==1.14.6 + py36: pandas==0.23.4 + py36: numpy==1.15.4 + py37: pandas>=0.24.2 + py37: numpy>=1.16.2 # Only install pandas with non-pypy interpreters +# Testing all combinations would be too expensive commands = nosetests -v --with-doctest {posargs} [testenv:flake8] @@ -24,21 +31,29 @@ commands = pydocstyle --count -ve examples influxdb [testenv:coverage] deps = -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt - pandas + pandas==0.24.2 coverage - numpy==1.13.3 + numpy commands = nosetests -v --with-coverage --cover-html --cover-package=influxdb [testenv:docs] deps = -r{toxinidir}/requirements.txt - pandas==0.20.1 - numpy==1.13.3 - Sphinx==1.5.5 + pandas>=0.24.2 + numpy>=1.16.2 + Sphinx>=1.8.5 sphinx_rtd_theme commands = sphinx-build -b html docs/source docs/build +[testenv:mypy] +deps = -r{toxinidir}/test-requirements.txt + mypy==0.720 +commands = mypy --config-file mypy.ini -p influxdb + [flake8] -ignore = N802,F821,E402 -# E402: module level import not at top of file +ignore = W503,W504,W605,N802,F821,E402 +# W503: Line break occurred before a binary operator +# W504: Line break occurred after a binary operator +# W605: invalid escape sequence # N802: nosetests's setUp function # F821: False positive in intluxdb/dataframe_client.py +# E402: module level import not at top of file