diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
new file mode 100644
index 00000000..0248ade1
--- /dev/null
+++ b/.github/CODEOWNERS
@@ -0,0 +1 @@
+* @aviau @sebito91 @xginn8
diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md
new file mode 100644
index 00000000..7a7927c1
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE.md
@@ -0,0 +1,4 @@
+- **InfluxDB version:** e.g. 1.7.7 (output of the `influx version` command)
+- **InfluxDB-python version:** e.g. 5.2.2 (output of the `python -c "from __future__ import print_function; import influxdb; print(influxdb.__version__)"` command)
+- **Python version:** e.g. 3.7.4 (output of the `python --version` command)
+- **Operating system version:** e.g. Windows 10, Ubuntu 18.04, macOS 10.14.5
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
new file mode 100644
index 00000000..84729d17
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -0,0 +1,5 @@
+---
+##### Contributor checklist
+
+- [ ] Builds are passing
+- [ ] New tests have been added (for feature additions)
diff --git a/.gitignore b/.gitignore
index 7720b658..d970c44c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@ build/
mock*/
nose*/
.pybuild/
+.mypy_cache/
debian/files
debian/python-influxdb.debhelper.log
debian/python-influxdb.postinst.debhelper
diff --git a/.travis.yml b/.travis.yml
index 7f3d4a5d..9d45f19b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -4,14 +4,17 @@ python:
- "2.7"
- "3.5"
- "3.6"
- - "pypy-5.3.1"
+ - "3.7"
+ - "pypy"
- "pypy3"
env:
- - INFLUXDB_VER=1.2.4
- - INFLUXDB_VER=1.3.9
- - INFLUXDB_VER=1.4.2
- - INFLUXDB_VER=1.5.4
+ - INFLUXDB_VER=1.2.4 # 2017-05-08
+ - INFLUXDB_VER=1.3.9 # 2018-01-19
+ - INFLUXDB_VER=1.4.3 # 2018-01-30
+ - INFLUXDB_VER=1.5.4 # 2018-06-22
+ - INFLUXDB_VER=1.6.4 # 2018-10-24
+ - INFLUXDB_VER=1.7.4 # 2019-02-14
addons:
apt:
@@ -20,18 +23,20 @@ addons:
matrix:
include:
- - python: 2.7
+ - python: 3.7
env: TOX_ENV=pep257
- - python: 3.6
+ - python: 3.7
env: TOX_ENV=docs
- - python: 3.6
+ - python: 3.7
env: TOX_ENV=flake8
- - python: 3.6
+ - python: 3.7
env: TOX_ENV=coverage
+ - python: 3.7
+ env: TOX_ENV=mypy
install:
- pip install tox-travis
- - pip install setuptools==20.6.6
+ - pip install setuptools
- pip install coveralls
- mkdir -p "influxdb_install/${INFLUXDB_VER}"
- if [ -n "${INFLUXDB_VER}" ] ; then wget "https://dl.influxdata.com/influxdb/releases/influxdb_${INFLUXDB_VER}_amd64.deb" ; fi
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c156b678..bfd27d38 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,7 +4,85 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
-## [Unreleased]
+## [v5.3.2] - 2024-04-17
+
+### Changed
+- Correctly serialize nanosecond dataframe timestamps (#926)
+
+## [v5.3.1] - 2022-11-14
+
+### Added
+- Add support for custom headers in the InfluxDBClient (#710 thx @nathanielatom)
+- Add support for custom indexes for query in the DataFrameClient (#785)
+
+### Changed
+- Amend retry to avoid sleep after last retry before raising exception (#790 thx @krzysbaranski)
+- Remove msgpack pinning for requirements (#818 thx @prometheanfire)
+- Update support for HTTP headers in the InfluxDBClient (#851 thx @bednar)
+
+### Removed
+
+## [v5.3.0] - 2020-04-10
+
+### Added
+- Add mypy testing framework (#756)
+- Add support for messagepack (#734 thx @lovasoa)
+- Add support for 'show series' (#357 thx @gaker)
+- Add support for custom request session in InfluxDBClient (#360 thx @dschien)
+- Add support for handling np.nan and np.inf values in DataFrameClient (#436 thx @nmerket)
+- Add support for optional `time_precision` in the SeriesHelper (#502 && #719 thx @appunni-dishq && @klDen)
+- Add ability to specify retention policy in SeriesHelper (#723 thx @csanz91)
+- Add gzip compression for post and response data (#732 thx @KEClaytor)
+- Add support for chunked responses in ResultSet (#753 and #538 thx @hrbonz && @psy0rz)
+- Add support for empty string fields (#766 thx @gregschrock)
+- Add support for context managers to InfluxDBClient (#721 thx @JustusAdam)
+
+### Changed
+- Clean up stale CI config (#755)
+- Add legacy client test (#752 & #318 thx @oldmantaiter & @sebito91)
+- Update make_lines section in line_protocol.py to split out core function (#375 thx @aisbaa)
+- Fix nanosecond time resolution for points (#407 thx @AndreCAndersen && @clslgrnc)
+- Fix import of distutils.spawn (#805 thx @Hawk777)
+- Update repr of float values including properly handling of boolean (#488 thx @ghost)
+- Update DataFrameClient to fix faulty empty tags (#770 thx @michelfripiat)
+- Update DataFrameClient to properly return `dropna` values (#778 thx @jgspiro)
+- Update DataFrameClient to test for pd.DataTimeIndex before blind conversion (#623 thx @testforvin)
+- Update client to type-set UDP port to int (#651 thx @yifeikong)
+- Update batched writing support for all iterables (#746 thx @JayH5)
+- Update SeriesHelper to enable class instantiation when not initialized (#772 thx @ocworld)
+- Update UDP test case to add proper timestamp to datapoints (#808 thx @shantanoo-desai)
+
+### Removed
+
+## [v5.2.3] - 2019-08-19
+
+### Added
+- Add consistency param to InfluxDBClient.write_points (#643 thx @RonRothman)
+- Add UDP example (#648 thx @shantanoo-desai)
+- Add consistency paramter to `write_points` (#664 tx @RonRothman)
+- The query() function now accepts a bind_params argument for parameter binding (#678 thx @clslgrnc)
+- Add `get_list_continuous_queries`, `drop_continuous_query`, and `create_continuous_query` management methods for
+ continuous queries (#681 thx @lukaszdudek-silvair && @smolse)
+- Mutual TLS authentication (#702 thx @LloydW93)
+
+### Changed
+- Update test suite to add support for Python 3.7 and InfluxDB v1.6.4 and 1.7.4 (#692 thx @clslgrnc)
+- Update supported versions of influxdb + python (#693 thx @clslgrnc)
+- Fix for the line protocol issue with leading comma (#694 thx @d3banjan)
+- Update classifiers tuple to list in setup.py (#697 thx @Hanaasagi)
+- Update documentation for empty `delete_series` confusion (#699 thx @xginn8)
+- Fix newline character issue in tag value (#716 thx @syhan)
+- Update tests/tutorials_pandas.py to reference `line` protocol, bug in `json` (#737 thx @Aeium)
+
+### Removed
+
+## [v5.2.2] - 2019-03-14
+### Added
+
+### Changed
+- Fix 'TypeError: Already tz-aware' introduced with recent versions of Panda (#671, #676, thx @f4bsch @clslgrnc)
+
+## [v5.2.1] - 2018-12-07
### Added
### Changed
diff --git a/LICENSE b/LICENSE
index 38ee2491..a49a5410 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2013 InfluxDB
+Copyright (c) 2020 InfluxDB
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
diff --git a/README.rst b/README.rst
index d4f9611c..048db045 100644
--- a/README.rst
+++ b/README.rst
@@ -15,36 +15,45 @@ InfluxDB-Python
:target: https://pypi.python.org/pypi/influxdb
:alt: PyPI Status
-InfluxDB-Python is a client for interacting with InfluxDB_.
-Development of this library is maintained by:
+.. important::
-+-----------+-------------------------------+
-| Github ID | URL |
-+===========+===============================+
-| @aviau | (https://github.com/aviau) |
-+-----------+-------------------------------+
-| @xginn8 | (https://github.com/xginn8) |
-+-----------+-------------------------------+
-| @sebito91 | (https://github.com/sebito91) |
-+-----------+-------------------------------+
+ **This project is no longer in development**
+
+ This v1 client library is for interacting with `InfluxDB 1.x `_ and 1.x-compatible endpoints in `InfluxDB 2.x `_.
+ Use it to:
+
+ - Write data in line protocol.
+ - Query data with `InfluxQL `_.
+
+ If you use `InfluxDB 2.x (TSM storage engine) `_ and `Flux `_, see the `v2 client library `_.
+
+ If you use `InfluxDB 3.0 `_, see the `v3 client library `_.
+
+ For new projects, consider using InfluxDB 3.0 and v3 client libraries.
+
+Description
+===========
+
+InfluxDB-python, the InfluxDB Python Client (1.x), is a client library for interacting with `InfluxDB 1.x `_ instances.
.. _readme-about:
-InfluxDB is an open-source distributed time series database, find more about InfluxDB_ at https://docs.influxdata.com/influxdb/latest
+`InfluxDB`_ is the time series platform designed to handle high write and query loads.
.. _installation:
-InfluxDB pre v1.1.0 users
--------------------------
-This module is tested with InfluxDB versions: v1.2.4, v1.3.9, v1.4.2, and v1.5.4.
+For InfluxDB pre-v1.1.0 users
+-----------------------------
-Those users still on InfluxDB v0.8.x users may still use the legacy client by importing ``from influxdb.influxdb08 import InfluxDBClient``.
+This module is tested with InfluxDB versions v1.2.4, v1.3.9, v1.4.3, v1.5.4, v1.6.4, and 1.7.4.
-Installation
-------------
+Users on InfluxDB v0.8.x may still use the legacy client by importing ``from influxdb.influxdb08 import InfluxDBClient``.
+
+For InfluxDB v1.1+ users
+------------------------
Install, upgrade and uninstall influxdb-python with these commands::
@@ -59,7 +68,7 @@ On Debian/Ubuntu, you can install it with this command::
Dependencies
------------
-The influxdb-python distribution is supported and tested on Python 2.7, 3.5, 3.6, PyPy and PyPy3.
+The influxdb-python distribution is supported and tested on Python 2.7, 3.5, 3.6, 3.7, PyPy and PyPy3.
**Note:** Python <3.5 are currently untested. See ``.travis.yml``.
@@ -152,21 +161,33 @@ We are also lurking on the following:
Development
-----------
+The v1 client libraries for InfluxDB 1.x were typically developed and maintained by InfluxDB community members. If you are an InfluxDB v1 user interested in maintaining this client library (at a minimum, keeping it updated with security patches) please contact the InfluxDB team at on the `Community Forums `_ or
+`InfluxData Slack `_.
+
All development is done on Github_. Use Issues_ to report
problems or submit contributions.
.. _Github: https://github.com/influxdb/influxdb-python/
.. _Issues: https://github.com/influxdb/influxdb-python/issues
-Please note that we WILL get to your questions/issues/concerns as quickly as possible. We maintain many
-software repositories and sometimes things may get pushed to the backburner. Please don't take offense,
-we will do our best to reply as soon as possible!
+Please note that we will answer you question as quickly as possible.
+
+Maintainers:
++-----------+-------------------------------+
+| Github ID | URL |
++===========+===============================+
+| @aviau | (https://github.com/aviau) |
++-----------+-------------------------------+
+| @xginn8 | (https://github.com/xginn8) |
++-----------+-------------------------------+
+| @sebito91 | (https://github.com/sebito91) |
++-----------+-------------------------------+
Source code
-----------
-The source code is currently available on Github: https://github.com/influxdata/influxdb-python
+The source code for the InfluxDB Python Client (1.x) is currently available on Github: https://github.com/influxdata/influxdb-python
TODO
@@ -175,6 +196,6 @@ TODO
The TODO/Roadmap can be found in Github bug tracker: https://github.com/influxdata/influxdb-python/issues
-.. _InfluxDB: https://influxdata.com/time-series-platform/influxdb/
+.. _InfluxDB: https://influxdata.com/
.. _Sphinx: http://sphinx.pocoo.org/
.. _Tox: https://tox.readthedocs.org
diff --git a/docs/source/api-documentation.rst b/docs/source/api-documentation.rst
index d00600e6..35fdb291 100644
--- a/docs/source/api-documentation.rst
+++ b/docs/source/api-documentation.rst
@@ -30,7 +30,7 @@ These clients are initiated in the same way as the
client = DataFrameClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname')
-.. note:: Only when using UDP (use_udp=True) the connections is established.
+.. note:: Only when using UDP (use_udp=True) the connection is established.
.. _InfluxDBClient-api:
diff --git a/docs/source/conf.py b/docs/source/conf.py
index 231c776c..efc22f88 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -117,7 +117,8 @@
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
-html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
+# Calling get_html_theme_path is deprecated.
+# html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# The name for this set of Sphinx documents. If None, it defaults to
# " v documentation".
diff --git a/docs/source/examples.rst b/docs/source/examples.rst
index 2c85fbda..841ad8b1 100644
--- a/docs/source/examples.rst
+++ b/docs/source/examples.rst
@@ -25,3 +25,15 @@ Tutorials - SeriesHelper
.. literalinclude:: ../../examples/tutorial_serieshelper.py
:language: python
+
+Tutorials - UDP
+===============
+
+.. literalinclude:: ../../examples/tutorial_udp.py
+ :language: python
+
+Tutorials - Authorization by Token
+==================================
+
+.. literalinclude:: ../../examples/tutorial_authorization.py
+ :language: python
diff --git a/examples/tutorial.py b/examples/tutorial.py
index 4083bfc5..12cd49c1 100644
--- a/examples/tutorial.py
+++ b/examples/tutorial.py
@@ -13,7 +13,9 @@ def main(host='localhost', port=8086):
dbname = 'example'
dbuser = 'smly'
dbuser_password = 'my_secret_password'
- query = 'select value from cpu_load_short;'
+ query = 'select Float_value from cpu_load_short;'
+ query_where = 'select Int_value from cpu_load_short where host=$host;'
+ bind_params = {'host': 'server01'}
json_body = [
{
"measurement": "cpu_load_short",
@@ -50,6 +52,11 @@ def main(host='localhost', port=8086):
print("Result: {0}".format(result))
+ print("Querying data: " + query_where)
+ result = client.query(query_where, bind_params=bind_params)
+
+ print("Result: {0}".format(result))
+
print("Switch user: " + user)
client.switch_user(user, password)
diff --git a/examples/tutorial_authorization.py b/examples/tutorial_authorization.py
new file mode 100644
index 00000000..9d9a800f
--- /dev/null
+++ b/examples/tutorial_authorization.py
@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+"""Tutorial how to authorize InfluxDB client by custom Authorization token."""
+
+import argparse
+from influxdb import InfluxDBClient
+
+
+def main(token='my-token'):
+ """Instantiate a connection to the InfluxDB."""
+ client = InfluxDBClient(username=None, password=None,
+ headers={"Authorization": token})
+
+ print("Use authorization token: " + token)
+
+ version = client.ping()
+ print("Successfully connected to InfluxDB: " + version)
+ pass
+
+
+def parse_args():
+ """Parse the args from main."""
+ parser = argparse.ArgumentParser(
+ description='example code to play with InfluxDB')
+ parser.add_argument('--token', type=str, required=False,
+ default='my-token',
+ help='Authorization token for the proxy that is ahead the InfluxDB.')
+ return parser.parse_args()
+
+
+if __name__ == '__main__':
+ args = parse_args()
+ main(token=args.token)
diff --git a/examples/tutorial_pandas.py b/examples/tutorial_pandas.py
index 67a5457d..13e72f8c 100644
--- a/examples/tutorial_pandas.py
+++ b/examples/tutorial_pandas.py
@@ -12,7 +12,7 @@ def main(host='localhost', port=8086):
user = 'root'
password = 'root'
dbname = 'demo'
- protocol = 'json'
+ protocol = 'line'
client = DataFrameClient(host, port, user, password, dbname)
diff --git a/examples/tutorial_udp.py b/examples/tutorial_udp.py
new file mode 100644
index 00000000..93b923d7
--- /dev/null
+++ b/examples/tutorial_udp.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+"""Example for sending batch information to InfluxDB via UDP."""
+
+"""
+INFO: In order to use UDP, one should enable the UDP service from the
+`influxdb.conf` under section
+ [[udp]]
+ enabled = true
+ bind-address = ":8089" # port number for sending data via UDP
+ database = "udp1" # name of database to be stored
+ [[udp]]
+ enabled = true
+ bind-address = ":8090"
+ database = "udp2"
+"""
+
+
+import argparse
+
+from influxdb import InfluxDBClient
+
+
+def main(uport):
+ """Instantiate connection to the InfluxDB."""
+ # NOTE: structure of the UDP packet is different than that of information
+ # sent via HTTP
+ json_body = {
+ "tags": {
+ "host": "server01",
+ "region": "us-west"
+ },
+ "points": [{
+ "measurement": "cpu_load_short",
+ "fields": {
+ "value": 0.64
+ },
+ "time": "2009-11-10T23:00:00Z",
+ },
+ {
+ "measurement": "cpu_load_short",
+ "fields": {
+ "value": 0.67
+ },
+ "time": "2009-11-10T23:05:00Z"
+ }]
+ }
+
+ # make `use_udp` True and add `udp_port` number from `influxdb.conf` file
+ # no need to mention the database name since it is already configured
+ client = InfluxDBClient(use_udp=True, udp_port=uport)
+
+ # Instead of `write_points` use `send_packet`
+ client.send_packet(json_body)
+
+
+def parse_args():
+ """Parse the args."""
+ parser = argparse.ArgumentParser(
+ description='example code to play with InfluxDB along with UDP Port')
+ parser.add_argument('--uport', type=int, required=True,
+ help=' UDP port of InfluxDB')
+ return parser.parse_args()
+
+
+if __name__ == '__main__':
+ args = parse_args()
+ main(uport=args.uport)
diff --git a/influxdb/__init__.py b/influxdb/__init__.py
index 03f74581..e66f80ea 100644
--- a/influxdb/__init__.py
+++ b/influxdb/__init__.py
@@ -18,4 +18,4 @@
]
-__version__ = '5.2.0'
+__version__ = '5.3.2'
diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py
index 06da7ac4..907db2cb 100644
--- a/influxdb/_dataframe_client.py
+++ b/influxdb/_dataframe_client.py
@@ -59,6 +59,8 @@ def write_points(self,
:param dataframe: data points in a DataFrame
:param measurement: name of measurement
:param tags: dictionary of tags, with string key-values
+ :param tag_columns: [Optional, default None] List of data tag names
+ :param field_columns: [Options, default None] List of data field names
:param time_precision: [Optional, default None] Either 's', 'ms', 'u'
or 'n'.
:param batch_size: [Optional] Value to write the points in batches
@@ -142,6 +144,7 @@ def write_points(self,
def query(self,
query,
params=None,
+ bind_params=None,
epoch=None,
expected_response_code=200,
database=None,
@@ -149,12 +152,23 @@ def query(self,
chunked=False,
chunk_size=0,
method="GET",
- dropna=True):
+ dropna=True,
+ data_frame_index=None):
"""
Query data into a DataFrame.
+ .. danger::
+ In order to avoid injection vulnerabilities (similar to `SQL
+ injection `_
+ vulnerabilities), do not directly include untrusted data into the
+ ``query`` parameter, use ``bind_params`` instead.
+
:param query: the actual query string
:param params: additional parameters for the request, defaults to {}
+ :param bind_params: bind parameters for the query:
+ any variable in the query written as ``'$var_name'`` will be
+ replaced with ``bind_params['var_name']``. Only works in the
+ ``WHERE`` clause and takes precedence over ``params['params']``
:param epoch: response timestamps to be in epoch format either 'h',
'm', 's', 'ms', 'u', or 'ns',defaults to `None` which is
RFC3339 UTC format with nanosecond precision
@@ -168,10 +182,13 @@ def query(self,
containing all results within that chunk
:param chunk_size: Size of each chunk to tell InfluxDB to use.
:param dropna: drop columns where all values are missing
+ :param data_frame_index: the list of columns that
+ are used as DataFrame index
:returns: the queried data
:rtype: :class:`~.ResultSet`
"""
query_args = dict(params=params,
+ bind_params=bind_params,
epoch=epoch,
expected_response_code=expected_response_code,
raise_errors=raise_errors,
@@ -182,16 +199,18 @@ def query(self,
results = super(DataFrameClient, self).query(query, **query_args)
if query.strip().upper().startswith("SELECT"):
if len(results) > 0:
- return self._to_dataframe(results, dropna)
+ return self._to_dataframe(results, dropna,
+ data_frame_index=data_frame_index)
else:
return {}
else:
return results
- def _to_dataframe(self, rs, dropna=True):
+ def _to_dataframe(self, rs, dropna=True, data_frame_index=None):
result = defaultdict(list)
if isinstance(rs, list):
- return map(self._to_dataframe, rs)
+ return map(self._to_dataframe, rs,
+ [dropna for _ in range(len(rs))])
for key, data in rs.items():
name, tags = key
@@ -201,9 +220,15 @@ def _to_dataframe(self, rs, dropna=True):
key = (name, tuple(sorted(tags.items())))
df = pd.DataFrame(data)
df.time = pd.to_datetime(df.time)
- df.set_index('time', inplace=True)
- df.index = df.index.tz_localize('UTC')
- df.index.name = None
+
+ if data_frame_index:
+ df.set_index(data_frame_index, inplace=True)
+ else:
+ df.set_index('time', inplace=True)
+ if df.index.tzinfo is None:
+ df.index = df.index.tz_localize('UTC')
+ df.index.name = None
+
result[key].append(df)
for key, data in result.items():
df = pd.concat(data).sort_index()
@@ -238,7 +263,8 @@ def _convert_dataframe_to_json(dataframe,
field_columns = list(
set(dataframe.columns).difference(set(tag_columns)))
- dataframe.index = pd.to_datetime(dataframe.index)
+ if not isinstance(dataframe.index, pd.DatetimeIndex):
+ dataframe.index = pd.to_datetime(dataframe.index)
if dataframe.index.tzinfo is None:
dataframe.index = dataframe.index.tz_localize('UTC')
@@ -257,14 +283,31 @@ def _convert_dataframe_to_json(dataframe,
"h": 1e9 * 3600,
}.get(time_precision, 1)
+ if not tag_columns:
+ points = [
+ {'measurement': measurement,
+ 'fields':
+ rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(),
+ 'time': np.int64(ts.value / precision_factor)}
+ for ts, (_, rec) in zip(
+ dataframe.index,
+ dataframe[field_columns].iterrows()
+ )
+ ]
+
+ return points
+
points = [
{'measurement': measurement,
'tags': dict(list(tag.items()) + list(tags.items())),
- 'fields': rec,
+ 'fields':
+ rec.replace([np.inf, -np.inf], np.nan).dropna().to_dict(),
'time': np.int64(ts.value / precision_factor)}
- for ts, tag, rec in zip(dataframe.index,
- dataframe[tag_columns].to_dict('record'),
- dataframe[field_columns].to_dict('record'))
+ for ts, tag, (_, rec) in zip(
+ dataframe.index,
+ dataframe[tag_columns].to_dict('record'),
+ dataframe[field_columns].iterrows()
+ )
]
return points
@@ -329,10 +372,10 @@ def _convert_dataframe_to_lines(self,
# Make array of timestamp ints
if isinstance(dataframe.index, pd.PeriodIndex):
- time = ((dataframe.index.to_timestamp().values.astype(np.int64) /
+ time = ((dataframe.index.to_timestamp().values.astype(np.int64) //
precision_factor).astype(np.int64).astype(str))
else:
- time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) /
+ time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) //
precision_factor).astype(np.int64).astype(str))
# If tag columns exist, make an array of formatted tag keys and values
@@ -350,7 +393,7 @@ def _convert_dataframe_to_lines(self,
tag_df = self._stringify_dataframe(
tag_df, numeric_precision, datatype='tag')
- # join preprendded tags, leaving None values out
+ # join prepended tags, leaving None values out
tags = tag_df.apply(
lambda s: [',' + s.name + '=' + v if v else '' for v in s])
tags = tags.sum(axis=1)
@@ -358,7 +401,8 @@ def _convert_dataframe_to_lines(self,
del tag_df
elif global_tags:
tag_string = ''.join(
- [",{}={}".format(k, _escape_tag(v)) if v else ''
+ [",{}={}".format(k, _escape_tag(v))
+ if v not in [None, ''] else ""
for k, v in sorted(global_tags.items())]
)
tags = pd.Series(tag_string, index=dataframe.index)
@@ -366,19 +410,18 @@ def _convert_dataframe_to_lines(self,
tags = ''
# Make an array of formatted field keys and values
- field_df = dataframe[field_columns]
- # Keep the positions where Null values are found
- mask_null = field_df.isnull().values
+ field_df = dataframe[field_columns].replace([np.inf, -np.inf], np.nan)
+ nans = pd.isnull(field_df)
field_df = self._stringify_dataframe(field_df,
numeric_precision,
datatype='field')
field_df = (field_df.columns.values + '=').tolist() + field_df
- field_df[field_df.columns[1:]] = ',' + field_df[
- field_df.columns[1:]]
- field_df = field_df.where(~mask_null, '') # drop Null entries
- fields = field_df.sum(axis=1)
+ field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
+ field_df[nans] = ''
+
+ fields = field_df.sum(axis=1).map(lambda x: x.lstrip(','))
del field_df
# Generate line protocol string
diff --git a/influxdb/client.py b/influxdb/client.py
index 8f8b14ae..c535a3f1 100644
--- a/influxdb/client.py
+++ b/influxdb/client.py
@@ -6,14 +6,21 @@
from __future__ import print_function
from __future__ import unicode_literals
-import time
-import random
-
+import datetime
+import gzip
+import itertools
+import io
import json
+import random
import socket
+import struct
+import time
+from itertools import chain, islice
+
+import msgpack
import requests
import requests.exceptions
-from six.moves import xrange
+from requests.adapters import HTTPAdapter
from six.moves.urllib.parse import urlparse
from influxdb.line_protocol import make_lines, quote_ident, quote_literal
@@ -29,6 +36,9 @@ class InfluxDBClient(object):
connect to InfluxDB. Requests can be made to InfluxDB directly through
the client.
+ The client supports the use as a `context manager
+ `_.
+
:param host: hostname to connect to InfluxDB, defaults to 'localhost'
:type host: str
:param port: port to connect to InfluxDB, defaults to 8086
@@ -50,8 +60,12 @@ class InfluxDBClient(object):
:param timeout: number of seconds Requests will wait for your client to
establish a connection, defaults to None
:type timeout: int
- :param retries: number of retries your client will try before aborting,
- defaults to 3. 0 indicates try until success
+ :param retries: number of attempts your client will make before aborting,
+ defaults to 3
+ 0 - try until success
+ 1 - attempt only once (without retry)
+ 2 - maximum two attempts (including one retry)
+ 3 - maximum three attempts (default option)
:type retries: int
:param use_udp: use UDP to connect to InfluxDB, defaults to False
:type use_udp: bool
@@ -61,6 +75,25 @@ class InfluxDBClient(object):
:type proxies: dict
:param path: path of InfluxDB on the server to connect, defaults to ''
:type path: str
+ :param cert: Path to client certificate information to use for mutual TLS
+ authentication. You can specify a local cert to use
+ as a single file containing the private key and the certificate, or as
+ a tuple of both files’ paths, defaults to None
+ :type cert: str
+ :param gzip: use gzip content encoding to compress requests
+ :type gzip: bool
+ :param session: allow for the new client request to use an existing
+ requests Session, defaults to None
+ :type session: requests.Session
+ :param headers: headers to add to Requests, will add 'Content-Type'
+ and 'Accept' unless these are already present, defaults to {}
+ :type headers: dict
+ :param socket_options: use custom tcp socket options,
+ If not specified, then defaults are loaded from
+ ``HTTPConnection.default_socket_options``
+ :type socket_options: list
+
+ :raises ValueError: if cert is provided but ssl is disabled (set to False)
"""
def __init__(self,
@@ -78,6 +111,11 @@ def __init__(self,
proxies=None,
pool_size=10,
path='',
+ cert=None,
+ gzip=False,
+ session=None,
+ headers=None,
+ socket_options=None,
):
"""Construct a new InfluxDBClient object."""
self.__host = host
@@ -91,11 +129,16 @@ def __init__(self,
self._verify_ssl = verify_ssl
self.__use_udp = use_udp
- self.__udp_port = udp_port
- self._session = requests.Session()
- adapter = requests.adapters.HTTPAdapter(
+ self.__udp_port = int(udp_port)
+
+ if not session:
+ session = requests.Session()
+
+ self._session = session
+ adapter = _SocketOptionsAdapter(
pool_connections=int(pool_size),
- pool_maxsize=int(pool_size)
+ pool_maxsize=int(pool_size),
+ socket_options=socket_options
)
if use_udp:
@@ -120,16 +163,35 @@ def __init__(self,
else:
self._proxies = proxies
+ if cert:
+ if not ssl:
+ raise ValueError(
+ "Client certificate provided but ssl is disabled."
+ )
+ else:
+ self._session.cert = cert
+
self.__baseurl = "{0}://{1}:{2}{3}".format(
self._scheme,
self._host,
self._port,
self._path)
- self._headers = {
- 'Content-Type': 'application/json',
- 'Accept': 'text/plain'
- }
+ if headers is None:
+ headers = {}
+ headers.setdefault('Content-Type', 'application/json')
+ headers.setdefault('Accept', 'application/x-msgpack')
+ self._headers = headers
+
+ self._gzip = gzip
+
+ def __enter__(self):
+ """Enter function as used by context manager."""
+ return self
+
+ def __exit__(self, _exc_type, _exc_value, _traceback):
+ """Exit function as used by context manager."""
+ self.close()
@property
def _baseurl(self):
@@ -215,7 +277,7 @@ def switch_user(self, username, password):
self._username = username
self._password = password
- def request(self, url, method='GET', params=None, data=None,
+ def request(self, url, method='GET', params=None, data=None, stream=False,
expected_response_code=200, headers=None):
"""Make a HTTP request to the InfluxDB API.
@@ -227,6 +289,8 @@ def request(self, url, method='GET', params=None, data=None,
:type params: dict
:param data: the data of the request, defaults to None
:type data: str
+ :param stream: True if a query uses chunked responses
+ :type stream: bool
:param expected_response_code: the expected response code of
the request, defaults to 200
:type expected_response_code: int
@@ -250,17 +314,39 @@ def request(self, url, method='GET', params=None, data=None,
if isinstance(data, (dict, list)):
data = json.dumps(data)
+ if self._gzip:
+ # Receive and send compressed data
+ headers.update({
+ 'Accept-Encoding': 'gzip',
+ 'Content-Encoding': 'gzip',
+ })
+ if data is not None:
+ # For Py 2.7 compatability use Gzipfile
+ compressed = io.BytesIO()
+ with gzip.GzipFile(
+ compresslevel=9,
+ fileobj=compressed,
+ mode='w'
+ ) as f:
+ f.write(data)
+ data = compressed.getvalue()
+
# Try to send the request more than once by default (see #103)
retry = True
_try = 0
while retry:
try:
+ if "Authorization" in headers:
+ auth = (None, None)
+ else:
+ auth = (self._username, self._password)
response = self._session.request(
method=method,
url=url,
- auth=(self._username, self._password),
+ auth=auth if None not in auth else None,
params=params,
data=data,
+ stream=stream,
headers=headers,
proxies=self._proxies,
verify=self._verify_ssl,
@@ -273,17 +359,34 @@ def request(self, url, method='GET', params=None, data=None,
_try += 1
if self._retries != 0:
retry = _try < self._retries
- if method == "POST":
- time.sleep((2 ** _try) * random.random() / 100.0)
if not retry:
raise
+ if method == "POST":
+ time.sleep((2 ** _try) * random.random() / 100.0)
+
+ type_header = response.headers and response.headers.get("Content-Type")
+ if type_header == "application/x-msgpack" and response.content:
+ response._msgpack = msgpack.unpackb(
+ packed=response.content,
+ ext_hook=_msgpack_parse_hook,
+ raw=False)
+ else:
+ response._msgpack = None
+
+ def reformat_error(response):
+ if response._msgpack:
+ return json.dumps(response._msgpack, separators=(',', ':'))
+ else:
+ return response.content
+
# if there's not an error, there must have been a successful response
if 500 <= response.status_code < 600:
- raise InfluxDBServerError(response.content)
+ raise InfluxDBServerError(reformat_error(response))
elif response.status_code == expected_response_code:
return response
else:
- raise InfluxDBClientError(response.content, response.status_code)
+ err_msg = reformat_error(response)
+ raise InfluxDBClientError(err_msg, response.status_code)
def write(self, data, params=None, expected_response_code=204,
protocol='json'):
@@ -292,7 +395,7 @@ def write(self, data, params=None, expected_response_code=204,
:param data: the data to be written
:type data: (if protocol is 'json') dict
(if protocol is 'line') sequence of line protocol strings
- or single string
+ or single string
:param params: additional parameters for the request, defaults to None
:type params: dict
:param expected_response_code: the expected response code of the write
@@ -303,7 +406,7 @@ def write(self, data, params=None, expected_response_code=204,
:returns: True, if the write operation is successful
:rtype: bool
"""
- headers = self._headers
+ headers = self._headers.copy()
headers['Content-Type'] = 'application/octet-stream'
if params:
@@ -330,21 +433,22 @@ def write(self, data, params=None, expected_response_code=204,
@staticmethod
def _read_chunked_response(response, raise_errors=True):
- result_set = {}
for line in response.iter_lines():
if isinstance(line, bytes):
line = line.decode('utf-8')
data = json.loads(line)
+ result_set = {}
for result in data.get('results', []):
for _key in result:
if isinstance(result[_key], list):
result_set.setdefault(
_key, []).extend(result[_key])
- return ResultSet(result_set, raise_errors=raise_errors)
+ yield ResultSet(result_set, raise_errors=raise_errors)
def query(self,
query,
params=None,
+ bind_params=None,
epoch=None,
expected_response_code=200,
database=None,
@@ -354,6 +458,12 @@ def query(self,
method="GET"):
"""Send a query to InfluxDB.
+ .. danger::
+ In order to avoid injection vulnerabilities (similar to `SQL
+ injection `_
+ vulnerabilities), do not directly include untrusted data into the
+ ``query`` parameter, use ``bind_params`` instead.
+
:param query: the actual query string
:type query: str
@@ -361,6 +471,12 @@ def query(self,
defaults to {}
:type params: dict
+ :param bind_params: bind parameters for the query:
+ any variable in the query written as ``'$var_name'`` will be
+ replaced with ``bind_params['var_name']``. Only works in the
+ ``WHERE`` clause and takes precedence over ``params['params']``
+ :type bind_params: dict
+
:param epoch: response timestamps to be in epoch format either 'h',
'm', 's', 'ms', 'u', or 'ns',defaults to `None` which is
RFC3339 UTC format with nanosecond precision
@@ -394,6 +510,11 @@ def query(self,
if params is None:
params = {}
+ if bind_params is not None:
+ params_dict = json.loads(params.get('params', '{}'))
+ params_dict.update(bind_params)
+ params['params'] = json.dumps(params_dict)
+
params['q'] = query
params['db'] = database or self._database
@@ -413,13 +534,15 @@ def query(self,
method=method,
params=params,
data=None,
+ stream=chunked,
expected_response_code=expected_response_code
)
- if chunked:
- return self._read_chunked_response(response)
-
- data = response.json()
+ data = response._msgpack
+ if not data:
+ if chunked:
+ return self._read_chunked_response(response)
+ data = response.json()
results = [
ResultSet(result, raise_errors=raise_errors)
@@ -440,15 +563,17 @@ def write_points(self,
retention_policy=None,
tags=None,
batch_size=None,
- protocol='json'
+ protocol='json',
+ consistency=None
):
"""Write to multiple time series names.
:param points: the list of points to be written in the database
:type points: list of dictionaries, each dictionary represents a point
:type points: (if protocol is 'json') list of dicts, where each dict
- represents a point.
- (if protocol is 'line') sequence of line protocol strings.
+ represents a point.
+ (if protocol is 'line') sequence of line protocol strings.
+
:param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None
:type time_precision: str
:param database: the database to write the points to. Defaults to
@@ -468,6 +593,9 @@ def write_points(self,
:type batch_size: int
:param protocol: Protocol for writing data. Either 'line' or 'json'.
:type protocol: str
+ :param consistency: Consistency for the points.
+ One of {'any','one','quorum','all'}.
+ :type consistency: str
:returns: True, if the operation is successful
:rtype: bool
@@ -480,14 +608,16 @@ def write_points(self,
time_precision=time_precision,
database=database,
retention_policy=retention_policy,
- tags=tags, protocol=protocol)
+ tags=tags, protocol=protocol,
+ consistency=consistency)
return True
return self._write_points(points=points,
time_precision=time_precision,
database=database,
retention_policy=retention_policy,
- tags=tags, protocol=protocol)
+ tags=tags, protocol=protocol,
+ consistency=consistency)
def ping(self):
"""Check connectivity to InfluxDB.
@@ -504,8 +634,17 @@ def ping(self):
@staticmethod
def _batches(iterable, size):
- for i in xrange(0, len(iterable), size):
- yield iterable[i:i + size]
+ # Iterate over an iterable producing iterables of batches. Based on:
+ # http://code.activestate.com/recipes/303279-getting-items-in-batches/
+ iterator = iter(iterable)
+ while True:
+ try: # Try get the first element in the iterator...
+ head = (next(iterator),)
+ except StopIteration:
+ return # ...so that we can stop if there isn't one
+ # Otherwise, lazily slice the rest of the batch
+ rest = islice(iterator, size - 1)
+ yield chain(head, rest)
def _write_points(self,
points,
@@ -513,12 +652,16 @@ def _write_points(self,
database,
retention_policy,
tags,
- protocol='json'):
+ protocol='json',
+ consistency=None):
if time_precision not in ['n', 'u', 'ms', 's', 'm', 'h', None]:
raise ValueError(
"Invalid time precision is given. "
"(use 'n', 'u', 'ms', 's', 'm' or 'h')")
+ if consistency not in ['any', 'one', 'quorum', 'all', None]:
+ raise ValueError('Invalid consistency: {}'.format(consistency))
+
if protocol == 'json':
data = {
'points': points
@@ -533,6 +676,9 @@ def _write_points(self,
'db': database or self._database
}
+ if consistency is not None:
+ params['consistency'] = consistency
+
if time_precision is not None:
params['precision'] = time_precision
@@ -569,6 +715,40 @@ def get_list_database(self):
"""
return list(self.query("SHOW DATABASES").get_points())
+ def get_list_series(self, database=None, measurement=None, tags=None):
+ """
+ Query SHOW SERIES returns the distinct series in your database.
+
+ FROM and WHERE clauses are optional.
+
+ :param measurement: Show all series from a measurement
+ :type id: string
+ :param tags: Show all series that match given tags
+ :type id: dict
+ :param database: the database from which the series should be
+ shows, defaults to client's current database
+ :type database: str
+ """
+ database = database or self._database
+ query_str = 'SHOW SERIES'
+
+ if measurement:
+ query_str += ' FROM "{0}"'.format(measurement)
+
+ if tags:
+ query_str += ' WHERE ' + ' and '.join(["{0}='{1}'".format(k, v)
+ for k, v in tags.items()])
+
+ return list(
+ itertools.chain.from_iterable(
+ [
+ x.values()
+ for x in (self.query(query_str, database=database)
+ .get_points())
+ ]
+ )
+ )
+
def create_database(self, dbname):
"""Create a new database in InfluxDB.
@@ -693,7 +873,7 @@ def alter_retention_policy(self, name, database=None,
query_string = (
"ALTER RETENTION POLICY {0} ON {1}"
).format(quote_ident(name),
- quote_ident(database or self._database), shard_duration)
+ quote_ident(database or self._database))
if duration:
query_string += " DURATION {0}".format(duration)
if shard_duration:
@@ -791,7 +971,7 @@ def drop_user(self, username):
:param username: the username to drop
:type username: str
"""
- text = "DROP USER {0}".format(quote_ident(username), method="POST")
+ text = "DROP USER {0}".format(quote_ident(username))
self.query(text, method="POST")
def set_user_password(self, username, password):
@@ -809,7 +989,9 @@ def set_user_password(self, username, password):
def delete_series(self, database=None, measurement=None, tags=None):
"""Delete series from a database.
- Series can be filtered by measurement and tags.
+ Series must be filtered by either measurement and tags.
+ This method cannot be used to delete all series, use
+ `drop_database` instead.
:param database: the database from which the series should be
deleted, defaults to client's current database
@@ -908,6 +1090,98 @@ def get_list_privileges(self, username):
text = "SHOW GRANTS FOR {0}".format(quote_ident(username))
return list(self.query(text).get_points())
+ def get_list_continuous_queries(self):
+ """Get the list of continuous queries in InfluxDB.
+
+ :return: all CQs in InfluxDB
+ :rtype: list of dictionaries
+
+ :Example:
+
+ ::
+
+ >> cqs = client.get_list_cqs()
+ >> cqs
+ [
+ {
+ u'db1': []
+ },
+ {
+ u'db2': [
+ {
+ u'name': u'vampire',
+ u'query': u'CREATE CONTINUOUS QUERY vampire ON '
+ 'mydb BEGIN SELECT count(dracula) INTO '
+ 'mydb.autogen.all_of_them FROM '
+ 'mydb.autogen.one GROUP BY time(5m) END'
+ }
+ ]
+ }
+ ]
+ """
+ query_string = "SHOW CONTINUOUS QUERIES"
+ return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()]
+
+ def create_continuous_query(self, name, select, database=None,
+ resample_opts=None):
+ r"""Create a continuous query for a database.
+
+ :param name: the name of continuous query to create
+ :type name: str
+ :param select: select statement for the continuous query
+ :type select: str
+ :param database: the database for which the continuous query is
+ created. Defaults to current client's database
+ :type database: str
+ :param resample_opts: resample options
+ :type resample_opts: str
+
+ :Example:
+
+ ::
+
+ >> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \
+ ... 'FROM "cpu" GROUP BY time(1m)'
+ >> client.create_continuous_query(
+ ... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m'
+ ... )
+ >> client.get_list_continuous_queries()
+ [
+ {
+ 'db_name': [
+ {
+ 'name': 'cpu_mean',
+ 'query': 'CREATE CONTINUOUS QUERY "cpu_mean" '
+ 'ON "db_name" '
+ 'RESAMPLE EVERY 10s FOR 2m '
+ 'BEGIN SELECT mean("value") '
+ 'INTO "cpu_mean" FROM "cpu" '
+ 'GROUP BY time(1m) END'
+ }
+ ]
+ }
+ ]
+ """
+ query_string = (
+ "CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
+ ).format(quote_ident(name), quote_ident(database or self._database),
+ ' RESAMPLE ' + resample_opts if resample_opts else '', select)
+ self.query(query_string)
+
+ def drop_continuous_query(self, name, database=None):
+ """Drop an existing continuous query for a database.
+
+ :param name: the name of continuous query to drop
+ :type name: str
+ :param database: the database for which the continuous query is
+ dropped. Defaults to current client's database
+ :type database: str
+ """
+ query_string = (
+ "DROP CONTINUOUS QUERY {0} ON {1}"
+ ).format(quote_ident(name), quote_ident(database or self._database))
+ self.query(query_string)
+
def send_packet(self, packet, protocol='json', time_precision=None):
"""Send an UDP packet.
@@ -978,3 +1252,25 @@ def _parse_netloc(netloc):
'password': info.password or None,
'host': info.hostname or 'localhost',
'port': info.port or 8086}
+
+
+def _msgpack_parse_hook(code, data):
+ if code == 5:
+ (epoch_s, epoch_ns) = struct.unpack(">QI", data)
+ timestamp = datetime.datetime.utcfromtimestamp(epoch_s)
+ timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000))
+ return timestamp.isoformat() + 'Z'
+ return msgpack.ExtType(code, data)
+
+
+class _SocketOptionsAdapter(HTTPAdapter):
+ """_SocketOptionsAdapter injects socket_options into HTTP Adapter."""
+
+ def __init__(self, *args, **kwargs):
+ self.socket_options = kwargs.pop("socket_options", None)
+ super(_SocketOptionsAdapter, self).__init__(*args, **kwargs)
+
+ def init_poolmanager(self, *args, **kwargs):
+ if self.socket_options is not None:
+ kwargs["socket_options"] = self.socket_options
+ super(_SocketOptionsAdapter, self).init_poolmanager(*args, **kwargs)
diff --git a/influxdb/dataframe_client.py b/influxdb/dataframe_client.py
index 97258644..babfe0dd 100644
--- a/influxdb/dataframe_client.py
+++ b/influxdb/dataframe_client.py
@@ -25,4 +25,4 @@ def __init__(self, *a, **kw):
raise ImportError("DataFrameClient requires Pandas "
"which couldn't be imported: %s" % self.err)
else:
- from ._dataframe_client import DataFrameClient
+ from ._dataframe_client import DataFrameClient # type: ignore
diff --git a/influxdb/helper.py b/influxdb/helper.py
index e622526d..138cf6e8 100644
--- a/influxdb/helper.py
+++ b/influxdb/helper.py
@@ -41,6 +41,12 @@ class Meta:
# Only applicable if autocommit is True.
autocommit = True
# If True and no bulk_size, then will set bulk_size to 1.
+ retention_policy = 'your_retention_policy'
+ # Specify the retention policy for the data points
+ time_precision = "h"|"m"|s"|"ms"|"u"|"ns"
+ # Default is ns (nanoseconds)
+ # Setting time precision while writing point
+ # You should also make sure time is set in the given precision
"""
@@ -71,6 +77,15 @@ def __new__(cls, *args, **kwargs):
cls.__name__))
cls._autocommit = getattr(_meta, 'autocommit', False)
+ cls._time_precision = getattr(_meta, 'time_precision', None)
+
+ allowed_time_precisions = ['h', 'm', 's', 'ms', 'u', 'ns', None]
+ if cls._time_precision not in allowed_time_precisions:
+ raise AttributeError(
+ 'In {}, time_precision is set, but invalid use any of {}.'
+ .format(cls.__name__, ','.join(allowed_time_precisions)))
+
+ cls._retention_policy = getattr(_meta, 'retention_policy', None)
cls._client = getattr(_meta, 'client', None)
if cls._autocommit and not cls._client:
@@ -116,11 +131,11 @@ def __init__(self, **kw):
keys = set(kw.keys())
# all tags should be passed, and keys - tags should be a subset of keys
- if not(tags <= keys):
+ if not (tags <= keys):
raise NameError(
'Expected arguments to contain all tags {0}, instead got {1}.'
.format(cls._tags, kw.keys()))
- if not(keys - tags <= fields):
+ if not (keys - tags <= fields):
raise NameError('Got arguments not in tags or fields: {0}'
.format(keys - tags - fields))
@@ -143,7 +158,12 @@ def commit(cls, client=None):
"""
if not client:
client = cls._client
- rtn = client.write_points(cls._json_body_())
+
+ rtn = client.write_points(
+ cls._json_body_(),
+ time_precision=cls._time_precision,
+ retention_policy=cls._retention_policy)
+ # will be None if not set and will default to ns
cls._reset_()
return rtn
@@ -154,6 +174,8 @@ def _json_body_(cls):
:return: JSON body of these datapoints.
"""
json = []
+ if not cls.__initialized__:
+ cls._reset_()
for series_name, data in six.iteritems(cls._datapoints):
for point in data:
json_point = {
diff --git a/influxdb/influxdb08/client.py b/influxdb/influxdb08/client.py
index 965a91db..40c58145 100644
--- a/influxdb/influxdb08/client.py
+++ b/influxdb/influxdb08/client.py
@@ -292,10 +292,10 @@ def write_points(self, data, time_precision='s', *args, **kwargs):
:type batch_size: int
"""
- def list_chunks(l, n):
+ def list_chunks(data_list, n):
"""Yield successive n-sized chunks from l."""
- for i in xrange(0, len(l), n):
- yield l[i:i + n]
+ for i in xrange(0, len(data_list), n):
+ yield data_list[i:i + n]
batch_size = kwargs.get('batch_size')
if batch_size and batch_size > 0:
diff --git a/influxdb/influxdb08/helper.py b/influxdb/influxdb08/helper.py
index f3dec33c..5f2d4614 100644
--- a/influxdb/influxdb08/helper.py
+++ b/influxdb/influxdb08/helper.py
@@ -139,6 +139,8 @@ def _json_body_(cls):
:return: JSON body of the datapoints.
"""
json = []
+ if not cls.__initialized__:
+ cls._reset_()
for series_name, data in six.iteritems(cls._datapoints):
json.append({'name': series_name,
'columns': cls._fields,
diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py
index e8816fc0..25dd2ad7 100644
--- a/influxdb/line_protocol.py
+++ b/influxdb/line_protocol.py
@@ -11,11 +11,19 @@
from pytz import UTC
from dateutil.parser import parse
-from six import iteritems, binary_type, text_type, integer_types, PY2
+from six import binary_type, text_type, integer_types, PY2
EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
+def _to_nanos(timestamp):
+ delta = timestamp - EPOCH
+ nanos_in_days = delta.days * 86400 * 10 ** 9
+ nanos_in_seconds = delta.seconds * 10 ** 9
+ nanos_in_micros = delta.microseconds * 10 ** 3
+ return nanos_in_days + nanos_in_seconds + nanos_in_micros
+
+
def _convert_timestamp(timestamp, precision=None):
if isinstance(timestamp, Integral):
return timestamp # assume precision is correct if timestamp is int
@@ -27,19 +35,24 @@ def _convert_timestamp(timestamp, precision=None):
if not timestamp.tzinfo:
timestamp = UTC.localize(timestamp)
- ns = (timestamp - EPOCH).total_seconds() * 1e9
+ ns = _to_nanos(timestamp)
if precision is None or precision == 'n':
return ns
- elif precision == 'u':
- return ns / 1e3
- elif precision == 'ms':
- return ns / 1e6
- elif precision == 's':
- return ns / 1e9
- elif precision == 'm':
- return ns / 1e9 / 60
- elif precision == 'h':
- return ns / 1e9 / 3600
+
+ if precision == 'u':
+ return ns / 10**3
+
+ if precision == 'ms':
+ return ns / 10**6
+
+ if precision == 's':
+ return ns / 10**9
+
+ if precision == 'm':
+ return ns / 10**9 / 60
+
+ if precision == 'h':
+ return ns / 10**9 / 3600
raise ValueError(timestamp)
@@ -54,6 +67,8 @@ def _escape_tag(tag):
",", "\\,"
).replace(
"=", "\\="
+ ).replace(
+ "\n", "\\n"
)
@@ -89,14 +104,21 @@ def _is_float(value):
def _escape_value(value):
- value = _get_unicode(value)
+ if value is None:
+ return ''
- if isinstance(value, text_type) and value != '':
+ value = _get_unicode(value)
+ if isinstance(value, text_type):
return quote_ident(value)
- elif isinstance(value, integer_types) and not isinstance(value, bool):
+
+ if isinstance(value, integer_types) and not isinstance(value, bool):
return str(value) + 'i'
- elif _is_float(value):
- return repr(value)
+
+ if isinstance(value, bool):
+ return str(value)
+
+ if _is_float(value):
+ return repr(float(value))
return str(value)
@@ -105,15 +127,60 @@ def _get_unicode(data, force=False):
"""Try to return a text aka unicode object from the given data."""
if isinstance(data, binary_type):
return data.decode('utf-8')
- elif data is None:
+
+ if data is None:
return ''
- elif force:
+
+ if force:
if PY2:
return unicode(data)
- else:
- return str(data)
- else:
- return data
+ return str(data)
+
+ return data
+
+
+def make_line(measurement, tags=None, fields=None, time=None, precision=None):
+ """Extract the actual point from a given measurement line."""
+ tags = tags or {}
+ fields = fields or {}
+
+ line = _escape_tag(_get_unicode(measurement))
+
+ # tags should be sorted client-side to take load off server
+ tag_list = []
+ for tag_key in sorted(tags.keys()):
+ key = _escape_tag(tag_key)
+ value = _escape_tag(tags[tag_key])
+
+ if key != '' and value != '':
+ tag_list.append(
+ "{key}={value}".format(key=key, value=value)
+ )
+
+ if tag_list:
+ line += ',' + ','.join(tag_list)
+
+ field_list = []
+ for field_key in sorted(fields.keys()):
+ key = _escape_tag(field_key)
+ value = _escape_value(fields[field_key])
+
+ if key != '' and value != '':
+ field_list.append("{key}={value}".format(
+ key=key,
+ value=value
+ ))
+
+ if field_list:
+ line += ' ' + ','.join(field_list)
+
+ if time is not None:
+ timestamp = _get_unicode(str(int(
+ _convert_timestamp(time, precision)
+ )))
+ line += ' ' + timestamp
+
+ return line
def make_lines(data, precision=None):
@@ -125,48 +192,19 @@ def make_lines(data, precision=None):
lines = []
static_tags = data.get('tags')
for point in data['points']:
- elements = []
-
- # add measurement name
- measurement = _escape_tag(_get_unicode(
- point.get('measurement', data.get('measurement'))))
- key_values = [measurement]
-
- # add tags
if static_tags:
tags = dict(static_tags) # make a copy, since we'll modify
tags.update(point.get('tags') or {})
else:
tags = point.get('tags') or {}
- # tags should be sorted client-side to take load off server
- for tag_key, tag_value in sorted(iteritems(tags)):
- key = _escape_tag(tag_key)
- value = _escape_tag_value(tag_value)
-
- if key != '' and value != '':
- key_values.append(key + "=" + value)
-
- elements.append(','.join(key_values))
-
- # add fields
- field_values = []
- for field_key, field_value in sorted(iteritems(point['fields'])):
- key = _escape_tag(field_key)
- value = _escape_value(field_value)
-
- if key != '' and value != '':
- field_values.append(key + "=" + value)
-
- elements.append(','.join(field_values))
-
- # add timestamp
- if 'time' in point:
- timestamp = _get_unicode(str(int(
- _convert_timestamp(point['time'], precision))))
- elements.append(timestamp)
-
- line = ' '.join(elements)
+ line = make_line(
+ point.get('measurement', data.get('measurement')),
+ tags=tags,
+ fields=point.get('fields'),
+ precision=precision,
+ time=point.get('time')
+ )
lines.append(line)
return '\n'.join(lines) + '\n'
diff --git a/influxdb/tests/__init__.py b/influxdb/tests/__init__.py
index adf2f20c..f7c5dfb9 100644
--- a/influxdb/tests/__init__.py
+++ b/influxdb/tests/__init__.py
@@ -12,10 +12,10 @@
import unittest
using_pypy = hasattr(sys, "pypy_version_info")
-skipIfPYpy = unittest.skipIf(using_pypy, "Skipping this test on pypy.")
+skip_if_pypy = unittest.skipIf(using_pypy, "Skipping this test on pypy.")
_skip_server_tests = os.environ.get(
'INFLUXDB_PYTHON_SKIP_SERVER_TESTS',
None) == 'True'
-skipServerTests = unittest.skipIf(_skip_server_tests,
- "Skipping server tests...")
+skip_server_tests = unittest.skipIf(_skip_server_tests,
+ "Skipping server tests...")
diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py
index e27eef17..115fbc48 100644
--- a/influxdb/tests/client_test.py
+++ b/influxdb/tests/client_test.py
@@ -24,6 +24,8 @@
import unittest
import warnings
+import io
+import gzip
import json
import mock
import requests
@@ -31,6 +33,7 @@
import requests_mock
from nose.tools import raises
+from urllib3.connection import HTTPConnection
from influxdb import InfluxDBClient
from influxdb.resultset import ResultSet
@@ -149,6 +152,14 @@ def test_dsn(self):
**{'ssl': False})
self.assertEqual('http://my.host.fr:1886', cli._baseurl)
+ def test_cert(self):
+ """Test mutual TLS authentication for TestInfluxDBClient object."""
+ cli = InfluxDBClient(ssl=True, cert='/etc/pki/tls/private/dummy.crt')
+ self.assertEqual(cli._session.cert, '/etc/pki/tls/private/dummy.crt')
+
+ with self.assertRaises(ValueError):
+ cli = InfluxDBClient(cert='/etc/pki/tls/private/dummy.crt')
+
def test_switch_database(self):
"""Test switch database in TestInfluxDBClient object."""
cli = InfluxDBClient('host', 8086, 'username', 'password', 'database')
@@ -206,6 +217,71 @@ def test_write_points(self):
m.last_request.body.decode('utf-8'),
)
+ def test_write_gzip(self):
+ """Test write in TestInfluxDBClient object."""
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204
+ )
+
+ cli = InfluxDBClient(database='db', gzip=True)
+ cli.write(
+ {"database": "mydb",
+ "retentionPolicy": "mypolicy",
+ "points": [{"measurement": "cpu_load_short",
+ "tags": {"host": "server01",
+ "region": "us-west"},
+ "time": "2009-11-10T23:00:00Z",
+ "fields": {"value": 0.64}}]}
+ )
+
+ compressed = io.BytesIO()
+ with gzip.GzipFile(
+ compresslevel=9,
+ fileobj=compressed,
+ mode='w'
+ ) as f:
+ f.write(
+ b"cpu_load_short,host=server01,region=us-west "
+ b"value=0.64 1257894000000000000\n"
+ )
+
+ self.assertEqual(
+ m.last_request.body,
+ compressed.getvalue(),
+ )
+
+ def test_write_points_gzip(self):
+ """Test write points for TestInfluxDBClient object."""
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204
+ )
+
+ cli = InfluxDBClient(database='db', gzip=True)
+ cli.write_points(
+ self.dummy_points,
+ )
+
+ compressed = io.BytesIO()
+ with gzip.GzipFile(
+ compresslevel=9,
+ fileobj=compressed,
+ mode='w'
+ ) as f:
+ f.write(
+ b'cpu_load_short,host=server01,region=us-west '
+ b'value=0.64 1257894000123456000\n'
+ )
+ self.assertEqual(
+ m.last_request.body,
+ compressed.getvalue(),
+ )
+
def test_write_points_toplevel_attributes(self):
"""Test write points attrs for TestInfluxDBClient object."""
with requests_mock.Mocker() as m:
@@ -257,6 +333,36 @@ def test_write_points_batch(self):
self.assertEqual(expected_last_body,
m.last_request.body.decode('utf-8'))
+ def test_write_points_batch_generator(self):
+ """Test write points batch from a generator for TestInfluxDBClient."""
+ dummy_points = [
+ {"measurement": "cpu_usage", "tags": {"unit": "percent"},
+ "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
+ {"measurement": "network", "tags": {"direction": "in"},
+ "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
+ {"measurement": "network", "tags": {"direction": "out"},
+ "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
+ ]
+ dummy_points_generator = (point for point in dummy_points)
+ expected_last_body = (
+ "network,direction=out,host=server01,region=us-west "
+ "value=12.0 1257894000000000000\n"
+ )
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204)
+ cli = InfluxDBClient(database='db')
+ cli.write_points(points=dummy_points_generator,
+ database='db',
+ tags={"host": "server01",
+ "region": "us-west"},
+ batch_size=2)
+ self.assertEqual(m.call_count, 2)
+ self.assertEqual(expected_last_body,
+ m.last_request.body.decode('utf-8'))
+
def test_write_points_udp(self):
"""Test write points UDP for TestInfluxDBClient object."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -337,6 +443,23 @@ def test_write_points_with_precision(self):
m.last_request.body,
)
+ def test_write_points_with_consistency(self):
+ """Test write points with consistency for TestInfluxDBClient object."""
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.POST,
+ 'http://localhost:8086/write',
+ status_code=204
+ )
+
+ cli = InfluxDBClient(database='db')
+
+ cli.write_points(self.dummy_points, consistency='any')
+ self.assertEqual(
+ m.last_request.qs,
+ {'db': ['db'], 'consistency': ['any']}
+ )
+
def test_write_points_with_precision_udp(self):
"""Test write points with precision for TestInfluxDBClient object."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -409,6 +532,15 @@ def test_write_points_bad_precision(self):
time_precision='g'
)
+ def test_write_points_bad_consistency(self):
+ """Test write points w/bad consistency value."""
+ cli = InfluxDBClient()
+ with self.assertRaises(ValueError):
+ cli.write_points(
+ self.dummy_points,
+ consistency='boo'
+ )
+
@raises(Exception)
def test_write_points_with_precision_fails(self):
"""Test write points w/precision fail for TestInfluxDBClient object."""
@@ -439,6 +571,29 @@ def test_query(self):
[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
)
+ def test_query_msgpack(self):
+ """Test query method with a messagepack response."""
+ example_response = bytes(bytearray.fromhex(
+ "81a7726573756c74739182ac73746174656d656e745f696400a673657269"
+ "65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661"
+ "6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000"
+ ))
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.GET,
+ "http://localhost:8086/query",
+ request_headers={"Accept": "application/x-msgpack"},
+ headers={"Content-Type": "application/x-msgpack"},
+ content=example_response
+ )
+ rs = self.cli.query('select * from a')
+
+ self.assertListEqual(
+ list(rs.get_points()),
+ [{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}]
+ )
+
def test_select_into_post(self):
"""Test SELECT.*INTO is POSTed."""
example_response = (
@@ -632,6 +787,66 @@ def test_get_list_measurements(self):
[{'name': 'cpu'}, {'name': 'disk'}]
)
+ def test_get_list_series(self):
+ """Test get a list of series from the database."""
+ data = {'results': [
+ {'series': [
+ {
+ 'values': [
+ ['cpu_load_short,host=server01,region=us-west'],
+ ['memory_usage,host=server02,region=us-east']],
+ 'columns': ['key']
+ }
+ ]}
+ ]}
+
+ with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
+ self.assertListEqual(
+ self.cli.get_list_series(),
+ ['cpu_load_short,host=server01,region=us-west',
+ 'memory_usage,host=server02,region=us-east'])
+
+ def test_get_list_series_with_measurement(self):
+ """Test get a list of series from the database by filter."""
+ data = {'results': [
+ {'series': [
+ {
+ 'values': [
+ ['cpu_load_short,host=server01,region=us-west']],
+ 'columns': ['key']
+ }
+ ]}
+ ]}
+
+ with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
+ self.assertListEqual(
+ self.cli.get_list_series(measurement='cpu_load_short'),
+ ['cpu_load_short,host=server01,region=us-west'])
+
+ def test_get_list_series_with_tags(self):
+ """Test get a list of series from the database by tags."""
+ data = {'results': [
+ {'series': [
+ {
+ 'values': [
+ ['cpu_load_short,host=server01,region=us-west']],
+ 'columns': ['key']
+ }
+ ]}
+ ]}
+
+ with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
+ self.assertListEqual(
+ self.cli.get_list_series(tags={'region': 'us-west'}),
+ ['cpu_load_short,host=server01,region=us-west'])
+
+ @raises(Exception)
+ def test_get_list_series_fails(self):
+ """Test get a list of series from the database but fail."""
+ cli = InfluxDBClient('host', 8086, 'username', 'password')
+ with _mocked_session(cli, 'get', 401):
+ cli.get_list_series()
+
def test_create_retention_policy_default(self):
"""Test create default ret policy for TestInfluxDBClient object."""
example_response = '{"results":[{}]}'
@@ -672,6 +887,49 @@ def test_create_retention_policy(self):
'"db" duration 1d replication 4 shard duration 0s'
)
+ def test_create_retention_policy_shard_duration(self):
+ """Test create retention policy with a custom shard duration."""
+ example_response = '{"results":[{}]}'
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.POST,
+ "http://localhost:8086/query",
+ text=example_response
+ )
+ self.cli.create_retention_policy(
+ 'somename2', '1d', 4, database='db',
+ shard_duration='1h'
+ )
+
+ self.assertEqual(
+ m.last_request.qs['q'][0],
+ 'create retention policy "somename2" on '
+ '"db" duration 1d replication 4 shard duration 1h'
+ )
+
+ def test_create_retention_policy_shard_duration_default(self):
+ """Test create retention policy with a default shard duration."""
+ example_response = '{"results":[{}]}'
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.POST,
+ "http://localhost:8086/query",
+ text=example_response
+ )
+ self.cli.create_retention_policy(
+ 'somename3', '1d', 4, database='db',
+ shard_duration='1h', default=True
+ )
+
+ self.assertEqual(
+ m.last_request.qs['q'][0],
+ 'create retention policy "somename3" on '
+ '"db" duration 1d replication 4 shard duration 1h '
+ 'default'
+ )
+
def test_alter_retention_policy(self):
"""Test alter retention policy for TestInfluxDBClient object."""
example_response = '{"results":[{}]}'
@@ -1001,7 +1259,7 @@ def test_revoke_privilege_invalid(self):
self.cli.revoke_privilege('', 'testdb', 'test')
def test_get_list_privileges(self):
- """Tst get list of privs for TestInfluxDBClient object."""
+ """Test get list of privs for TestInfluxDBClient object."""
data = {'results': [
{'series': [
{'columns': ['database', 'privilege'],
@@ -1027,24 +1285,127 @@ def test_get_list_privileges_fails(self):
with _mocked_session(cli, 'get', 401):
cli.get_list_privileges('test')
+ def test_get_list_continuous_queries(self):
+ """Test getting a list of continuous queries."""
+ data = {
+ "results": [
+ {
+ "statement_id": 0,
+ "series": [
+ {
+ "name": "testdb01",
+ "columns": ["name", "query"],
+ "values": [["testname01", "testquery01"],
+ ["testname02", "testquery02"]]
+ },
+ {
+ "name": "testdb02",
+ "columns": ["name", "query"],
+ "values": [["testname03", "testquery03"]]
+ },
+ {
+ "name": "testdb03",
+ "columns": ["name", "query"]
+ }
+ ]
+ }
+ ]
+ }
+
+ with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
+ self.assertListEqual(
+ self.cli.get_list_continuous_queries(),
+ [
+ {
+ 'testdb01': [
+ {'name': 'testname01', 'query': 'testquery01'},
+ {'name': 'testname02', 'query': 'testquery02'}
+ ]
+ },
+ {
+ 'testdb02': [
+ {'name': 'testname03', 'query': 'testquery03'}
+ ]
+ },
+ {
+ 'testdb03': []
+ }
+ ]
+ )
+
+ @raises(Exception)
+ def test_get_list_continuous_queries_fails(self):
+ """Test failing to get a list of continuous queries."""
+ with _mocked_session(self.cli, 'get', 400):
+ self.cli.get_list_continuous_queries()
+
+ def test_create_continuous_query(self):
+ """Test continuous query creation."""
+ data = {"results": [{}]}
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.GET,
+ "http://localhost:8086/query",
+ text=json.dumps(data)
+ )
+ query = 'SELECT count("value") INTO "6_months"."events" FROM ' \
+ '"events" GROUP BY time(10m)'
+ self.cli.create_continuous_query('cq_name', query, 'db_name')
+ self.assertEqual(
+ m.last_request.qs['q'][0],
+ 'create continuous query "cq_name" on "db_name" begin select '
+ 'count("value") into "6_months"."events" from "events" group '
+ 'by time(10m) end'
+ )
+ self.cli.create_continuous_query('cq_name', query, 'db_name',
+ 'EVERY 10s FOR 2m')
+ self.assertEqual(
+ m.last_request.qs['q'][0],
+ 'create continuous query "cq_name" on "db_name" resample '
+ 'every 10s for 2m begin select count("value") into '
+ '"6_months"."events" from "events" group by time(10m) end'
+ )
+
+ @raises(Exception)
+ def test_create_continuous_query_fails(self):
+ """Test failing to create a continuous query."""
+ with _mocked_session(self.cli, 'get', 400):
+ self.cli.create_continuous_query('cq_name', 'select', 'db_name')
+
+ def test_drop_continuous_query(self):
+ """Test dropping a continuous query."""
+ data = {"results": [{}]}
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.GET,
+ "http://localhost:8086/query",
+ text=json.dumps(data)
+ )
+ self.cli.drop_continuous_query('cq_name', 'db_name')
+ self.assertEqual(
+ m.last_request.qs['q'][0],
+ 'drop continuous query "cq_name" on "db_name"'
+ )
+
+ @raises(Exception)
+ def test_drop_continuous_query_fails(self):
+ """Test failing to drop a continuous query."""
+ with _mocked_session(self.cli, 'get', 400):
+ self.cli.drop_continuous_query('cq_name', 'db_name')
+
def test_invalid_port_fails(self):
"""Test invalid port fail for TestInfluxDBClient object."""
with self.assertRaises(ValueError):
InfluxDBClient('host', '80/redir', 'username', 'password')
def test_chunked_response(self):
- """Test chunked reponse for TestInfluxDBClient object."""
+ """Test chunked response for TestInfluxDBClient object."""
example_response = \
- u'{"results":[{"statement_id":0,"series":' \
- '[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
- '[["value","integer"]]}],"partial":true}]}\n{"results":' \
- '[{"statement_id":0,"series":[{"name":"iops","columns":' \
- '["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
- '"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
- '[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
- '[["value","integer"]]}],"partial":true}]}\n{"results":' \
- '[{"statement_id":0,"series":[{"name":"memory","columns":' \
- '["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'
+ u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \
+ '"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \
+ 'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \
+ '"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \
+ '["df"],["mount"]]}]}]}\n'
with requests_mock.Mocker() as m:
m.register_uri(
@@ -1052,23 +1413,125 @@ def test_chunked_response(self):
"http://localhost:8086/query",
text=example_response
)
- response = self.cli.query('show series limit 4 offset 0',
+ response = self.cli.query('show series',
chunked=True, chunk_size=4)
- self.assertTrue(len(response) == 4)
- self.assertEqual(response.__repr__(), ResultSet(
- {'series': [{'values': [['value', 'integer']],
- 'name': 'cpu',
- 'columns': ['fieldKey', 'fieldType']},
- {'values': [['value', 'integer']],
- 'name': 'iops',
- 'columns': ['fieldKey', 'fieldType']},
- {'values': [['value', 'integer']],
- 'name': 'load',
- 'columns': ['fieldKey', 'fieldType']},
- {'values': [['value', 'integer']],
- 'name': 'memory',
- 'columns': ['fieldKey', 'fieldType']}]}
- ).__repr__())
+ res = list(response)
+ self.assertTrue(len(res) == 2)
+ self.assertEqual(res[0].__repr__(), ResultSet(
+ {'series': [{
+ 'columns': ['key'],
+ 'values': [['cpu'], ['memory'], ['iops'], ['network']]
+ }]}).__repr__())
+ self.assertEqual(res[1].__repr__(), ResultSet(
+ {'series': [{
+ 'columns': ['key'],
+ 'values': [['qps'], ['uptime'], ['df'], ['mount']]
+ }]}).__repr__())
+
+ def test_auth_default(self):
+ """Test auth with default settings."""
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.GET,
+ "http://localhost:8086/ping",
+ status_code=204,
+ headers={'X-Influxdb-Version': '1.2.3'}
+ )
+
+ cli = InfluxDBClient()
+ cli.ping()
+
+ self.assertEqual(m.last_request.headers["Authorization"],
+ "Basic cm9vdDpyb290")
+
+ def test_auth_username_password(self):
+ """Test auth with custom username and password."""
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.GET,
+ "http://localhost:8086/ping",
+ status_code=204,
+ headers={'X-Influxdb-Version': '1.2.3'}
+ )
+
+ cli = InfluxDBClient(username='my-username',
+ password='my-password')
+ cli.ping()
+
+ self.assertEqual(m.last_request.headers["Authorization"],
+ "Basic bXktdXNlcm5hbWU6bXktcGFzc3dvcmQ=")
+
+ def test_auth_username_password_none(self):
+ """Test auth with not defined username or password."""
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.GET,
+ "http://localhost:8086/ping",
+ status_code=204,
+ headers={'X-Influxdb-Version': '1.2.3'}
+ )
+
+ cli = InfluxDBClient(username=None, password=None)
+ cli.ping()
+ self.assertFalse('Authorization' in m.last_request.headers)
+
+ cli = InfluxDBClient(username=None)
+ cli.ping()
+ self.assertFalse('Authorization' in m.last_request.headers)
+
+ cli = InfluxDBClient(password=None)
+ cli.ping()
+ self.assertFalse('Authorization' in m.last_request.headers)
+
+ def test_auth_token(self):
+ """Test auth with custom authorization header."""
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.GET,
+ "http://localhost:8086/ping",
+ status_code=204,
+ headers={'X-Influxdb-Version': '1.2.3'}
+ )
+
+ cli = InfluxDBClient(username=None, password=None,
+ headers={"Authorization": "my-token"})
+ cli.ping()
+ self.assertEqual(m.last_request.headers["Authorization"],
+ "my-token")
+
+ def test_custom_socket_options(self):
+ """Test custom socket options."""
+ test_socket_options = HTTPConnection.default_socket_options + \
+ [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
+ (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60),
+ (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 15)]
+
+ cli = InfluxDBClient(username=None, password=None,
+ socket_options=test_socket_options)
+
+ self.assertEquals(cli._session.adapters.get("http://").socket_options,
+ test_socket_options)
+ self.assertEquals(cli._session.adapters.get("http://").poolmanager.
+ connection_pool_kw.get("socket_options"),
+ test_socket_options)
+
+ connection_pool = cli._session.adapters.get("http://").poolmanager \
+ .connection_from_url(
+ url="http://localhost:8086")
+ new_connection = connection_pool._new_conn()
+ self.assertEquals(new_connection.socket_options, test_socket_options)
+
+ def test_none_socket_options(self):
+ """Test default socket options."""
+ cli = InfluxDBClient(username=None, password=None)
+ self.assertEquals(cli._session.adapters.get("http://").socket_options,
+ None)
+ connection_pool = cli._session.adapters.get("http://").poolmanager \
+ .connection_from_url(
+ url="http://localhost:8086")
+ new_connection = connection_pool._new_conn()
+ self.assertEquals(new_connection.socket_options,
+ HTTPConnection.default_socket_options)
class FakeClient(InfluxDBClient):
diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py
index 72447c89..87b8e0d8 100644
--- a/influxdb/tests/dataframe_client_test.py
+++ b/influxdb/tests/dataframe_client_test.py
@@ -13,8 +13,8 @@
import warnings
import requests_mock
-from influxdb.tests import skipIfPYpy, using_pypy
from nose.tools import raises
+from influxdb.tests import skip_if_pypy, using_pypy
from .client_test import _mocked_session
@@ -22,9 +22,10 @@
import pandas as pd
from pandas.util.testing import assert_frame_equal
from influxdb import DataFrameClient
+ import numpy as np
-@skipIfPYpy
+@skip_if_pypy
class TestDataFrameClient(unittest.TestCase):
"""Set up a test DataFrameClient object."""
@@ -388,6 +389,71 @@ def test_write_points_from_dataframe_with_numeric_column_names(self):
self.assertEqual(m.last_request.body, expected)
+ def test_write_points_from_dataframe_with_leading_none_column(self):
+ """write_points detect erroneous leading comma for null first field."""
+ dataframe = pd.DataFrame(
+ dict(
+ first=[1, None, None, 8, 9],
+ second=[2, None, None, None, 10],
+ third=[3, 4.1, None, None, 11],
+ first_tag=["one", None, None, "eight", None],
+ second_tag=["two", None, None, None, None],
+ third_tag=["three", "four", None, None, None],
+ comment=[
+ "All columns filled",
+ "First two of three empty",
+ "All empty",
+ "Last two of three empty",
+ "Empty tags with values",
+ ]
+ ),
+ index=pd.date_range(
+ start=pd.to_datetime('2018-01-01'),
+ freq='1D',
+ periods=5,
+ )
+ )
+ expected = (
+ b'foo,first_tag=one,second_tag=two,third_tag=three'
+ b' comment="All columns filled",first=1.0,second=2.0,third=3.0'
+ b' 1514764800000000000\n'
+ b'foo,third_tag=four'
+ b' comment="First two of three empty",third=4.1'
+ b' 1514851200000000000\n'
+ b'foo comment="All empty" 1514937600000000000\n'
+ b'foo,first_tag=eight'
+ b' comment="Last two of three empty",first=8.0'
+ b' 1515024000000000000\n'
+ b'foo'
+ b' comment="Empty tags with values",first=9.0,second=10.0'
+ b',third=11.0'
+ b' 1515110400000000000\n'
+ )
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204)
+
+ cli = DataFrameClient(database='db')
+
+ colnames = [
+ "first_tag",
+ "second_tag",
+ "third_tag",
+ "comment",
+ "first",
+ "second",
+ "third"
+ ]
+ cli.write_points(dataframe.loc[:, colnames], 'foo',
+ tag_columns=[
+ "first_tag",
+ "second_tag",
+ "third_tag"])
+
+ self.assertEqual(m.last_request.body, expected)
+
def test_write_points_from_dataframe_with_numeric_precision(self):
"""Test write points from df with numeric precision."""
now = pd.Timestamp('1970-01-01 00:00+00:00')
@@ -396,10 +462,16 @@ def test_write_points_from_dataframe_with_numeric_precision(self):
["2", 2, 2.2222222222222]],
index=[now, now + timedelta(hours=1)])
- expected_default_precision = (
- b'foo,hello=there 0=\"1\",1=1i,2=1.11111111111 0\n'
- b'foo,hello=there 0=\"2\",1=2i,2=2.22222222222 3600000000000\n'
- )
+ if np.lib.NumpyVersion(np.__version__) <= '1.13.3':
+ expected_default_precision = (
+ b'foo,hello=there 0=\"1\",1=1i,2=1.11111111111 0\n'
+ b'foo,hello=there 0=\"2\",1=2i,2=2.22222222222 3600000000000\n'
+ )
+ else:
+ expected_default_precision = (
+ b'foo,hello=there 0=\"1\",1=1i,2=1.1111111111111 0\n'
+ b'foo,hello=there 0=\"2\",1=2i,2=2.2222222222222 3600000000000\n' # noqa E501 line too long
+ )
expected_specified_precision = (
b'foo,hello=there 0=\"1\",1=1i,2=1.1111 0\n'
@@ -419,6 +491,9 @@ def test_write_points_from_dataframe_with_numeric_precision(self):
cli = DataFrameClient(database='db')
cli.write_points(dataframe, "foo", {"hello": "there"})
+ print(expected_default_precision)
+ print(m.last_request.body)
+
self.assertEqual(m.last_request.body, expected_default_precision)
cli = DataFrameClient(database='db')
@@ -802,7 +877,7 @@ def test_query_into_dataframe(self):
{"measurement": "network",
"tags": {"direction": ""},
"columns": ["time", "value"],
- "values":[["2009-11-10T23:00:00Z", 23422]]
+ "values": [["2009-11-10T23:00:00Z", 23422]]
},
{"measurement": "network",
"tags": {"direction": "in"},
@@ -818,13 +893,15 @@ def test_query_into_dataframe(self):
pd1 = pd.DataFrame(
[[23422]], columns=['value'],
index=pd.to_datetime(["2009-11-10T23:00:00Z"]))
- pd1.index = pd1.index.tz_localize('UTC')
+ if pd1.index.tzinfo is None:
+ pd1.index = pd1.index.tz_localize('UTC')
pd2 = pd.DataFrame(
[[23422], [23422], [23422]], columns=['value'],
index=pd.to_datetime(["2009-11-10T23:00:00Z",
"2009-11-10T23:00:00Z",
"2009-11-10T23:00:00Z"]))
- pd2.index = pd2.index.tz_localize('UTC')
+ if pd2.index.tzinfo is None:
+ pd2.index = pd2.index.tz_localize('UTC')
expected = {
('network', (('direction', ''),)): pd1,
('network', (('direction', 'in'),)): pd2
@@ -837,7 +914,7 @@ def test_query_into_dataframe(self):
assert_frame_equal(expected[k], result[k])
def test_multiquery_into_dataframe(self):
- """Test multiquyer into df for TestDataFrameClient object."""
+ """Test multiquery into df for TestDataFrameClient object."""
data = {
"results": [
{
@@ -871,22 +948,118 @@ def test_multiquery_into_dataframe(self):
index=pd.to_datetime([
"2015-01-29 21:55:43.702900257+0000",
"2015-01-29 21:55:43.702900257+0000",
- "2015-06-11 20:46:02+0000"])).tz_localize('UTC')
+ "2015-06-11 20:46:02+0000"]))
+ if pd1.index.tzinfo is None:
+ pd1.index = pd1.index.tz_localize('UTC')
pd2 = pd.DataFrame(
[[3]], columns=['count'],
- index=pd.to_datetime(["1970-01-01 00:00:00+00:00"]))\
- .tz_localize('UTC')
+ index=pd.to_datetime(["1970-01-01 00:00:00+00:00"]))
+ if pd2.index.tzinfo is None:
+ pd2.index = pd2.index.tz_localize('UTC')
expected = [{'cpu_load_short': pd1}, {'cpu_load_short': pd2}]
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
- iql = "SELECT value FROM cpu_load_short WHERE region='us-west';"\
- "SELECT count(value) FROM cpu_load_short WHERE region='us-west'"
+ iql = "SELECT value FROM cpu_load_short WHERE region=$region;"\
+ "SELECT count(value) FROM cpu_load_short WHERE region=$region"
+ bind_params = {'region': 'us-west'}
with _mocked_session(cli, 'GET', 200, data):
- result = cli.query(iql)
+ result = cli.query(iql, bind_params=bind_params)
for r, e in zip(result, expected):
for k in e:
assert_frame_equal(e[k], r[k])
+ def test_multiquery_into_dataframe_dropna(self):
+ """Test multiquery into df for TestDataFrameClient object."""
+ data = {
+ "results": [
+ {
+ "series": [
+ {
+ "name": "cpu_load_short",
+ "columns": ["time", "value", "value2", "value3"],
+ "values": [
+ ["2015-01-29T21:55:43.702900257Z",
+ 0.55, 0.254, np.NaN],
+ ["2015-01-29T21:55:43.702900257Z",
+ 23422, 122878, np.NaN],
+ ["2015-06-11T20:46:02Z",
+ 0.64, 0.5434, np.NaN]
+ ]
+ }
+ ]
+ }, {
+ "series": [
+ {
+ "name": "cpu_load_short",
+ "columns": ["time", "count"],
+ "values": [
+ ["1970-01-01T00:00:00Z", 3]
+ ]
+ }
+ ]
+ }
+ ]
+ }
+
+ pd1 = pd.DataFrame(
+ [[0.55, 0.254, np.NaN],
+ [23422.0, 122878, np.NaN],
+ [0.64, 0.5434, np.NaN]],
+ columns=['value', 'value2', 'value3'],
+ index=pd.to_datetime([
+ "2015-01-29 21:55:43.702900257+0000",
+ "2015-01-29 21:55:43.702900257+0000",
+ "2015-06-11 20:46:02+0000"]))
+
+ if pd1.index.tzinfo is None:
+ pd1.index = pd1.index.tz_localize('UTC')
+
+ pd1_dropna = pd.DataFrame(
+ [[0.55, 0.254], [23422.0, 122878], [0.64, 0.5434]],
+ columns=['value', 'value2'],
+ index=pd.to_datetime([
+ "2015-01-29 21:55:43.702900257+0000",
+ "2015-01-29 21:55:43.702900257+0000",
+ "2015-06-11 20:46:02+0000"]))
+
+ if pd1_dropna.index.tzinfo is None:
+ pd1_dropna.index = pd1_dropna.index.tz_localize('UTC')
+
+ pd2 = pd.DataFrame(
+ [[3]], columns=['count'],
+ index=pd.to_datetime(["1970-01-01 00:00:00+00:00"]))
+
+ if pd2.index.tzinfo is None:
+ pd2.index = pd2.index.tz_localize('UTC')
+
+ expected_dropna_true = [
+ {'cpu_load_short': pd1_dropna},
+ {'cpu_load_short': pd2}]
+ expected_dropna_false = [
+ {'cpu_load_short': pd1},
+ {'cpu_load_short': pd2}]
+
+ cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
+ iql = "SELECT value FROM cpu_load_short WHERE region=$region;" \
+ "SELECT count(value) FROM cpu_load_short WHERE region=$region"
+ bind_params = {'region': 'us-west'}
+
+ for dropna in [True, False]:
+ with _mocked_session(cli, 'GET', 200, data):
+ result = cli.query(iql, bind_params=bind_params, dropna=dropna)
+ expected = \
+ expected_dropna_true if dropna else expected_dropna_false
+ for r, e in zip(result, expected):
+ for k in e:
+ assert_frame_equal(e[k], r[k])
+
+ # test default value (dropna = True)
+ with _mocked_session(cli, 'GET', 200, data):
+ result = cli.query(iql, bind_params=bind_params)
+ for r, e in zip(result, expected_dropna_true):
+ for k in e:
+ assert_frame_equal(e[k], r[k])
+
def test_query_with_empty_result(self):
"""Test query with empty results in TestDataFrameClient object."""
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
@@ -951,3 +1124,225 @@ def test_dsn_constructor(self):
client = DataFrameClient.from_dsn('influxdb://localhost:8086')
self.assertIsInstance(client, DataFrameClient)
self.assertEqual('http://localhost:8086', client._baseurl)
+
+ def test_write_points_from_dataframe_with_nan_line(self):
+ """Test write points from dataframe with Nan lines."""
+ now = pd.Timestamp('1970-01-01 00:00+00:00')
+ dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]],
+ index=[now, now + timedelta(hours=1)],
+ columns=["column_one", "column_two",
+ "column_three"])
+ expected = (
+ b"foo column_one=\"1\",column_two=1i 0\n"
+ b"foo column_one=\"2\",column_two=2i "
+ b"3600000000000\n"
+ )
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204)
+
+ cli = DataFrameClient(database='db')
+
+ cli.write_points(dataframe, 'foo', protocol='line')
+ self.assertEqual(m.last_request.body, expected)
+
+ cli.write_points(dataframe, 'foo', tags=None, protocol='line')
+ self.assertEqual(m.last_request.body, expected)
+
+ def test_write_points_from_dataframe_with_nan_json(self):
+ """Test write points from json with NaN lines."""
+ now = pd.Timestamp('1970-01-01 00:00+00:00')
+ dataframe = pd.DataFrame(data=[["1", 1, np.inf], ["2", 2, np.nan]],
+ index=[now, now + timedelta(hours=1)],
+ columns=["column_one", "column_two",
+ "column_three"])
+ expected = (
+ b"foo column_one=\"1\",column_two=1i 0\n"
+ b"foo column_one=\"2\",column_two=2i "
+ b"3600000000000\n"
+ )
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204)
+
+ cli = DataFrameClient(database='db')
+
+ cli.write_points(dataframe, 'foo', protocol='json')
+ self.assertEqual(m.last_request.body, expected)
+
+ cli.write_points(dataframe, 'foo', tags=None, protocol='json')
+ self.assertEqual(m.last_request.body, expected)
+
+ def test_write_points_from_dataframe_with_tags_and_nan_line(self):
+ """Test write points from dataframe with NaN lines and tags."""
+ now = pd.Timestamp('1970-01-01 00:00+00:00')
+ dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, np.inf],
+ ['red', 0, "2", 2, np.nan]],
+ index=[now, now + timedelta(hours=1)],
+ columns=["tag_one", "tag_two", "column_one",
+ "column_two", "column_three"])
+ expected = (
+ b"foo,tag_one=blue,tag_two=1 "
+ b"column_one=\"1\",column_two=1i "
+ b"0\n"
+ b"foo,tag_one=red,tag_two=0 "
+ b"column_one=\"2\",column_two=2i "
+ b"3600000000000\n"
+ )
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204)
+
+ cli = DataFrameClient(database='db')
+
+ cli.write_points(dataframe, 'foo', protocol='line',
+ tag_columns=['tag_one', 'tag_two'])
+ self.assertEqual(m.last_request.body, expected)
+
+ cli.write_points(dataframe, 'foo', tags=None, protocol='line',
+ tag_columns=['tag_one', 'tag_two'])
+ self.assertEqual(m.last_request.body, expected)
+
+ def test_write_points_from_dataframe_with_tags_and_nan_json(self):
+ """Test write points from json with NaN lines and tags."""
+ now = pd.Timestamp('1970-01-01 00:00+00:00')
+ dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, np.inf],
+ ['red', 0, "2", 2, np.nan]],
+ index=[now, now + timedelta(hours=1)],
+ columns=["tag_one", "tag_two", "column_one",
+ "column_two", "column_three"])
+ expected = (
+ b"foo,tag_one=blue,tag_two=1 "
+ b"column_one=\"1\",column_two=1i "
+ b"0\n"
+ b"foo,tag_one=red,tag_two=0 "
+ b"column_one=\"2\",column_two=2i "
+ b"3600000000000\n"
+ )
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204)
+
+ cli = DataFrameClient(database='db')
+
+ cli.write_points(dataframe, 'foo', protocol='json',
+ tag_columns=['tag_one', 'tag_two'])
+ self.assertEqual(m.last_request.body, expected)
+
+ cli.write_points(dataframe, 'foo', tags=None, protocol='json',
+ tag_columns=['tag_one', 'tag_two'])
+ self.assertEqual(m.last_request.body, expected)
+
+ def test_query_custom_index(self):
+ """Test query with custom indexes."""
+ data = {
+ "results": [
+ {
+ "series": [
+ {
+ "name": "cpu_load_short",
+ "columns": ["time", "value", "host"],
+ "values": [
+ [1, 0.55, "local"],
+ [2, 23422, "local"],
+ [3, 0.64, "local"]
+ ]
+ }
+ ]
+ }
+ ]
+ }
+
+ cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
+ iql = "SELECT value FROM cpu_load_short WHERE region=$region;" \
+ "SELECT count(value) FROM cpu_load_short WHERE region=$region"
+ bind_params = {'region': 'us-west'}
+ with _mocked_session(cli, 'GET', 200, data):
+ result = cli.query(iql, bind_params=bind_params,
+ data_frame_index=["time", "host"])
+
+ _data_frame = result['cpu_load_short']
+ print(_data_frame)
+
+ self.assertListEqual(["time", "host"],
+ list(_data_frame.index.names))
+
+ def test_dataframe_nanosecond_precision(self):
+ """Test nanosecond precision."""
+ for_df_dict = {
+ "nanFloats": [1.1, float('nan'), 3.3, 4.4],
+ "onlyFloats": [1.1, 2.2, 3.3, 4.4],
+ "strings": ['one_one', 'two_two', 'three_three', 'four_four']
+ }
+ df = pd.DataFrame.from_dict(for_df_dict)
+ df['time'] = ['2019-10-04 06:27:19.850557111+00:00',
+ '2019-10-04 06:27:19.850557184+00:00',
+ '2019-10-04 06:27:42.251396864+00:00',
+ '2019-10-04 06:27:42.251396974+00:00']
+ df['time'] = pd.to_datetime(df['time'], unit='ns')
+ df = df.set_index('time')
+
+ expected = (
+ b'foo nanFloats=1.1,onlyFloats=1.1,strings="one_one" 1570170439850557111\n' # noqa E501 line too long
+ b'foo onlyFloats=2.2,strings="two_two" 1570170439850557184\n' # noqa E501 line too long
+ b'foo nanFloats=3.3,onlyFloats=3.3,strings="three_three" 1570170462251396864\n' # noqa E501 line too long
+ b'foo nanFloats=4.4,onlyFloats=4.4,strings="four_four" 1570170462251396974\n' # noqa E501 line too long
+ )
+
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204
+ )
+
+ cli = DataFrameClient(database='db')
+ cli.write_points(df, 'foo', time_precision='n')
+
+ self.assertEqual(m.last_request.body, expected)
+
+ def test_dataframe_nanosecond_precision_one_microsecond(self):
+ """Test nanosecond precision within one microsecond."""
+ # 1 microsecond = 1000 nanoseconds
+ start = np.datetime64('2019-10-04T06:27:19.850557000')
+ end = np.datetime64('2019-10-04T06:27:19.850558000')
+
+ # generate timestamps with nanosecond precision
+ timestamps = np.arange(
+ start,
+ end + np.timedelta64(1, 'ns'),
+ np.timedelta64(1, 'ns')
+ )
+ # generate values
+ values = np.arange(0.0, len(timestamps))
+
+ df = pd.DataFrame({'value': values}, index=timestamps)
+ with requests_mock.Mocker() as m:
+ m.register_uri(
+ requests_mock.POST,
+ "http://localhost:8086/write",
+ status_code=204
+ )
+
+ cli = DataFrameClient(database='db')
+ cli.write_points(df, 'foo', time_precision='n')
+
+ lines = m.last_request.body.decode('utf-8').split('\n')
+ self.assertEqual(len(lines), 1002)
+
+ for index, line in enumerate(lines):
+ if index == 1001:
+ self.assertEqual(line, '')
+ continue
+ self.assertEqual(
+ line,
+ f"foo value={index}.0 157017043985055{7000 + index:04}"
+ )
diff --git a/influxdb/tests/helper_test.py b/influxdb/tests/helper_test.py
index 6f24e85d..6737f921 100644
--- a/influxdb/tests/helper_test.py
+++ b/influxdb/tests/helper_test.py
@@ -47,6 +47,14 @@ class Meta:
TestSeriesHelper.MySeriesHelper = MySeriesHelper
+ def setUp(self):
+ """Check that MySeriesHelper has empty datapoints."""
+ super(TestSeriesHelper, self).setUp()
+ self.assertEqual(
+ TestSeriesHelper.MySeriesHelper._json_body_(),
+ [],
+ 'Resetting helper in teardown did not empty datapoints.')
+
def tearDown(self):
"""Deconstruct the TestSeriesHelper object."""
super(TestSeriesHelper, self).tearDown()
@@ -310,8 +318,19 @@ class Meta:
series_name = 'events.stats.{server_name}'
+ class InvalidTimePrecision(SeriesHelper):
+ """Define instance of SeriesHelper for invalid time precision."""
+
+ class Meta:
+ """Define metadata for InvalidTimePrecision."""
+
+ series_name = 'events.stats.{server_name}'
+ time_precision = "ks"
+ fields = ['time', 'server_name']
+ autocommit = True
+
for cls in [MissingMeta, MissingClient, MissingFields,
- MissingSeriesName]:
+ MissingSeriesName, InvalidTimePrecision]:
self.assertRaises(
AttributeError, cls, **{'time': 159,
'server_name': 'us.east-1'})
@@ -365,3 +384,54 @@ class Meta:
.format(WarnBulkSizeNoEffect))
self.assertIn('has no affect', str(w[-1].message),
'Warning message did not contain "has not affect".')
+
+ def testSeriesWithRetentionPolicy(self):
+ """Test that the data is saved with the specified retention policy."""
+ my_policy = 'my_policy'
+
+ class RetentionPolicySeriesHelper(SeriesHelper):
+
+ class Meta:
+ client = InfluxDBClient()
+ series_name = 'events.stats.{server_name}'
+ fields = ['some_stat', 'time']
+ tags = ['server_name', 'other_tag']
+ bulk_size = 2
+ autocommit = True
+ retention_policy = my_policy
+
+ fake_write_points = mock.MagicMock()
+ RetentionPolicySeriesHelper(
+ server_name='us.east-1', some_stat=159, other_tag='gg')
+ RetentionPolicySeriesHelper._client.write_points = fake_write_points
+ RetentionPolicySeriesHelper(
+ server_name='us.east-1', some_stat=158, other_tag='aa')
+
+ kall = fake_write_points.call_args
+ args, kwargs = kall
+ self.assertTrue('retention_policy' in kwargs)
+ self.assertEqual(kwargs['retention_policy'], my_policy)
+
+ def testSeriesWithoutRetentionPolicy(self):
+ """Test that the data is saved without any retention policy."""
+ class NoRetentionPolicySeriesHelper(SeriesHelper):
+
+ class Meta:
+ client = InfluxDBClient()
+ series_name = 'events.stats.{server_name}'
+ fields = ['some_stat', 'time']
+ tags = ['server_name', 'other_tag']
+ bulk_size = 2
+ autocommit = True
+
+ fake_write_points = mock.MagicMock()
+ NoRetentionPolicySeriesHelper(
+ server_name='us.east-1', some_stat=159, other_tag='gg')
+ NoRetentionPolicySeriesHelper._client.write_points = fake_write_points
+ NoRetentionPolicySeriesHelper(
+ server_name='us.east-1', some_stat=158, other_tag='aa')
+
+ kall = fake_write_points.call_args
+ args, kwargs = kall
+ self.assertTrue('retention_policy' in kwargs)
+ self.assertEqual(kwargs['retention_policy'], None)
diff --git a/influxdb/tests/influxdb08/dataframe_client_test.py b/influxdb/tests/influxdb08/dataframe_client_test.py
index 6e6fa2cc..0a766af0 100644
--- a/influxdb/tests/influxdb08/dataframe_client_test.py
+++ b/influxdb/tests/influxdb08/dataframe_client_test.py
@@ -12,7 +12,7 @@
from nose.tools import raises
-from influxdb.tests import skipIfPYpy, using_pypy
+from influxdb.tests import skip_if_pypy, using_pypy
from .client_test import _mocked_session
@@ -22,7 +22,7 @@
from influxdb.influxdb08 import DataFrameClient
-@skipIfPYpy
+@skip_if_pypy
class TestDataFrameClient(unittest.TestCase):
"""Define the DataFramClient test object."""
diff --git a/influxdb/tests/server_tests/base.py b/influxdb/tests/server_tests/base.py
index f4bd3ff9..45a9ec80 100644
--- a/influxdb/tests/server_tests/base.py
+++ b/influxdb/tests/server_tests/base.py
@@ -36,6 +36,15 @@ def _setup_influxdb_server(inst):
database='db')
+def _setup_gzip_client(inst):
+ inst.cli = InfluxDBClient('localhost',
+ inst.influxd_inst.http_port,
+ 'root',
+ '',
+ database='db',
+ gzip=True)
+
+
def _teardown_influxdb_server(inst):
remove_tree = sys.exc_info() == (None, None, None)
inst.influxd_inst.close(remove_tree=remove_tree)
@@ -51,8 +60,15 @@ class SingleTestCaseWithServerMixin(object):
# 'influxdb_template_conf' attribute must be set
# on the TestCase class or instance.
- setUp = _setup_influxdb_server
- tearDown = _teardown_influxdb_server
+ @classmethod
+ def setUp(cls):
+ """Set up an instance of the SingleTestCaseWithServerMixin."""
+ _setup_influxdb_server(cls)
+
+ @classmethod
+ def tearDown(cls):
+ """Tear down an instance of the SingleTestCaseWithServerMixin."""
+ _teardown_influxdb_server(cls)
class ManyTestCasesWithServerMixin(object):
@@ -82,3 +98,41 @@ def tearDownClass(cls):
def tearDown(self):
"""Deconstruct an instance of ManyTestCasesWithServerMixin."""
self.cli.drop_database('db')
+
+
+class SingleTestCaseWithServerGzipMixin(object):
+ """Define the single testcase with server with gzip client mixin.
+
+ Same as the SingleTestCaseWithServerGzipMixin but the InfluxDBClient has
+ gzip=True
+ """
+
+ @classmethod
+ def setUp(cls):
+ """Set up an instance of the SingleTestCaseWithServerGzipMixin."""
+ _setup_influxdb_server(cls)
+ _setup_gzip_client(cls)
+
+ @classmethod
+ def tearDown(cls):
+ """Tear down an instance of the SingleTestCaseWithServerMixin."""
+ _teardown_influxdb_server(cls)
+
+
+class ManyTestCasesWithServerGzipMixin(object):
+ """Define the many testcase with server with gzip client mixin.
+
+ Same as the ManyTestCasesWithServerMixin but the InfluxDBClient has
+ gzip=True.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ """Set up an instance of the ManyTestCasesWithServerGzipMixin."""
+ _setup_influxdb_server(cls)
+ _setup_gzip_client(cls)
+
+ @classmethod
+ def tearDown(cls):
+ """Tear down an instance of the SingleTestCaseWithServerMixin."""
+ _teardown_influxdb_server(cls)
diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py
index 2f8a2097..a0263243 100644
--- a/influxdb/tests/server_tests/client_test_with_server.py
+++ b/influxdb/tests/server_tests/client_test_with_server.py
@@ -23,9 +23,11 @@
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
-from influxdb.tests import skipIfPYpy, using_pypy, skipServerTests
+from influxdb.tests import skip_if_pypy, using_pypy, skip_server_tests
from influxdb.tests.server_tests.base import ManyTestCasesWithServerMixin
from influxdb.tests.server_tests.base import SingleTestCaseWithServerMixin
+from influxdb.tests.server_tests.base import ManyTestCasesWithServerGzipMixin
+from influxdb.tests.server_tests.base import SingleTestCaseWithServerGzipMixin
# By default, raise exceptions on warnings
warnings.simplefilter('error', FutureWarning)
@@ -82,7 +84,7 @@ def point(series_name, timestamp=None, tags=None, **fields):
]
if not using_pypy:
- dummy_pointDF = {
+ dummy_point_df = {
"measurement": "cpu_load_short",
"tags": {"host": "server01",
"region": "us-west"},
@@ -90,7 +92,7 @@ def point(series_name, timestamp=None, tags=None, **fields):
[[0.64]], columns=['value'],
index=pd.to_datetime(["2009-11-10T23:00:00Z"]))
}
- dummy_pointsDF = [{
+ dummy_points_df = [{
"measurement": "cpu_load_short",
"tags": {"host": "server01", "region": "us-west"},
"dataframe": pd.DataFrame(
@@ -120,7 +122,7 @@ def point(series_name, timestamp=None, tags=None, **fields):
]
-@skipServerTests
+@skip_server_tests
class SimpleTests(SingleTestCaseWithServerMixin, unittest.TestCase):
"""Define the class of simple tests."""
@@ -267,7 +269,7 @@ def test_invalid_port_fails(self):
InfluxDBClient('host', '80/redir', 'username', 'password')
-@skipServerTests
+@skip_server_tests
class CommonTests(ManyTestCasesWithServerMixin, unittest.TestCase):
"""Define a class to handle common tests for the server."""
@@ -293,15 +295,15 @@ def test_write_points(self):
"""Test writing points to the server."""
self.assertIs(True, self.cli.write_points(dummy_point))
- @skipIfPYpy
+ @skip_if_pypy
def test_write_points_DF(self):
"""Test writing points with dataframe."""
self.assertIs(
True,
self.cliDF.write_points(
- dummy_pointDF['dataframe'],
- dummy_pointDF['measurement'],
- dummy_pointDF['tags']
+ dummy_point_df['dataframe'],
+ dummy_point_df['measurement'],
+ dummy_point_df['tags']
)
)
@@ -342,7 +344,7 @@ def test_write_points_check_read_DF(self):
rsp = self.cliDF.query('SELECT * FROM cpu_load_short')
assert_frame_equal(
rsp['cpu_load_short'],
- dummy_pointDF['dataframe']
+ dummy_point_df['dataframe']
)
# Query with Tags
@@ -351,7 +353,7 @@ def test_write_points_check_read_DF(self):
assert_frame_equal(
rsp[('cpu_load_short',
(('host', 'server01'), ('region', 'us-west')))],
- dummy_pointDF['dataframe']
+ dummy_point_df['dataframe']
)
def test_write_multiple_points_different_series(self):
@@ -407,21 +409,21 @@ def test_write_multiple_points_different_series_DF(self):
for i in range(2):
self.assertIs(
True, self.cliDF.write_points(
- dummy_pointsDF[i]['dataframe'],
- dummy_pointsDF[i]['measurement'],
- dummy_pointsDF[i]['tags']))
+ dummy_points_df[i]['dataframe'],
+ dummy_points_df[i]['measurement'],
+ dummy_points_df[i]['tags']))
time.sleep(1)
rsp = self.cliDF.query('SELECT * FROM cpu_load_short')
assert_frame_equal(
rsp['cpu_load_short'],
- dummy_pointsDF[0]['dataframe']
+ dummy_points_df[0]['dataframe']
)
rsp = self.cliDF.query('SELECT * FROM memory')
assert_frame_equal(
rsp['memory'],
- dummy_pointsDF[1]['dataframe']
+ dummy_points_df[1]['dataframe']
)
def test_write_points_batch(self):
@@ -440,7 +442,36 @@ def test_write_points_batch(self):
batch_size=2)
time.sleep(5)
net_in = self.cli.query("SELECT value FROM network "
- "WHERE direction='in'").raw
+ "WHERE direction=$dir",
+ bind_params={'dir': 'in'}
+ ).raw
+ net_out = self.cli.query("SELECT value FROM network "
+ "WHERE direction='out'").raw
+ cpu = self.cli.query("SELECT value FROM cpu_usage").raw
+ self.assertIn(123, net_in['series'][0]['values'][0])
+ self.assertIn(12, net_out['series'][0]['values'][0])
+ self.assertIn(12.34, cpu['series'][0]['values'][0])
+
+ def test_write_points_batch_generator(self):
+ """Test writing points in a batch from a generator."""
+ dummy_points = [
+ {"measurement": "cpu_usage", "tags": {"unit": "percent"},
+ "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
+ {"measurement": "network", "tags": {"direction": "in"},
+ "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
+ {"measurement": "network", "tags": {"direction": "out"},
+ "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
+ ]
+ dummy_points_generator = (point for point in dummy_points)
+ self.cli.write_points(points=dummy_points_generator,
+ tags={"host": "server01",
+ "region": "us-west"},
+ batch_size=2)
+ time.sleep(5)
+ net_in = self.cli.query("SELECT value FROM network "
+ "WHERE direction=$dir",
+ bind_params={'dir': 'in'}
+ ).raw
net_out = self.cli.query("SELECT value FROM network "
"WHERE direction='out'").raw
cpu = self.cli.query("SELECT value FROM cpu_usage").raw
@@ -720,6 +751,36 @@ def test_drop_retention_policy(self):
rsp
)
+ def test_create_continuous_query(self):
+ """Test continuous query creation."""
+ self.cli.create_retention_policy('some_rp', '1d', 1)
+ query = 'select count("value") into "some_rp"."events" from ' \
+ '"events" group by time(10m)'
+ self.cli.create_continuous_query('test_cq', query, 'db')
+ cqs = self.cli.get_list_continuous_queries()
+ expected_cqs = [
+ {
+ 'db': [
+ {
+ 'name': 'test_cq',
+ 'query': 'CREATE CONTINUOUS QUERY test_cq ON db '
+ 'BEGIN SELECT count(value) INTO '
+ 'db.some_rp.events FROM db.autogen.events '
+ 'GROUP BY time(10m) END'
+ }
+ ]
+ }
+ ]
+ self.assertEqual(cqs, expected_cqs)
+
+ def test_drop_continuous_query(self):
+ """Test continuous query drop."""
+ self.test_create_continuous_query()
+ self.cli.drop_continuous_query('test_cq', 'db')
+ cqs = self.cli.get_list_continuous_queries()
+ expected_cqs = [{'db': []}]
+ self.assertEqual(cqs, expected_cqs)
+
def test_issue_143(self):
"""Test for PR#143 from repo."""
pt = partial(point, 'a_series_name', timestamp='2015-03-30T16:16:37Z')
@@ -785,8 +846,66 @@ def test_query_multiple_series(self):
]
self.cli.write_points(pts)
+ def test_get_list_series(self):
+ """Test get a list of series from the database."""
+ dummy_points = [
+ {
+ "measurement": "cpu_load_short",
+ "tags": {
+ "host": "server01",
+ "region": "us-west"
+ },
+ "time": "2009-11-10T23:00:00.123456Z",
+ "fields": {
+ "value": 0.64
+ }
+ }
+ ]
-@skipServerTests
+ dummy_points_2 = [
+ {
+ "measurement": "memory_usage",
+ "tags": {
+ "host": "server02",
+ "region": "us-east"
+ },
+ "time": "2009-11-10T23:00:00.123456Z",
+ "fields": {
+ "value": 80
+ }
+ }
+ ]
+
+ self.cli.write_points(dummy_points)
+ self.cli.write_points(dummy_points_2)
+
+ self.assertEquals(
+ self.cli.get_list_series(),
+ ['cpu_load_short,host=server01,region=us-west',
+ 'memory_usage,host=server02,region=us-east']
+ )
+
+ self.assertEquals(
+ self.cli.get_list_series(measurement='memory_usage'),
+ ['memory_usage,host=server02,region=us-east']
+ )
+
+ self.assertEquals(
+ self.cli.get_list_series(measurement='memory_usage'),
+ ['memory_usage,host=server02,region=us-east']
+ )
+
+ self.assertEquals(
+ self.cli.get_list_series(tags={'host': 'server02'}),
+ ['memory_usage,host=server02,region=us-east'])
+
+ self.assertEquals(
+ self.cli.get_list_series(
+ measurement='cpu_load_short', tags={'host': 'server02'}),
+ [])
+
+
+@skip_server_tests
class UdpTests(ManyTestCasesWithServerMixin, unittest.TestCase):
"""Define a class to test UDP series."""
@@ -823,3 +942,25 @@ def test_write_points_udp(self):
],
list(rsp['cpu_load_short'])
)
+
+
+# Run the tests again, but with gzip enabled this time
+@skip_server_tests
+class GzipSimpleTests(SimpleTests, SingleTestCaseWithServerGzipMixin):
+ """Repeat the simple tests with InfluxDBClient where gzip=True."""
+
+ pass
+
+
+@skip_server_tests
+class GzipCommonTests(CommonTests, ManyTestCasesWithServerGzipMixin):
+ """Repeat the common tests with InfluxDBClient where gzip=True."""
+
+ pass
+
+
+@skip_server_tests
+class GzipUdpTests(UdpTests, ManyTestCasesWithServerGzipMixin):
+ """Repeat the UDP tests with InfluxDBClient where gzip=True."""
+
+ pass
diff --git a/influxdb/tests/server_tests/influxdb_instance.py b/influxdb/tests/server_tests/influxdb_instance.py
index 1dcd7567..2dd823ff 100644
--- a/influxdb/tests/server_tests/influxdb_instance.py
+++ b/influxdb/tests/server_tests/influxdb_instance.py
@@ -7,7 +7,7 @@
from __future__ import unicode_literals
import datetime
-import distutils
+import distutils.spawn
import os
import tempfile
import shutil
diff --git a/influxdb/tests/test_line_protocol.py b/influxdb/tests/test_line_protocol.py
index a3d84793..5b344990 100644
--- a/influxdb/tests/test_line_protocol.py
+++ b/influxdb/tests/test_line_protocol.py
@@ -6,10 +6,12 @@
from __future__ import print_function
from __future__ import unicode_literals
-from datetime import datetime
import unittest
-from pytz import UTC, timezone
+from datetime import datetime
+from decimal import Decimal
+
+from pytz import UTC, timezone
from influxdb import line_protocol
@@ -42,7 +44,7 @@ def test_make_lines(self):
self.assertEqual(
line_protocol.make_lines(data),
- 'test,backslash_tag=C:\\\\ ,integer_tag=2,string_tag=hello '
+ 'test,backslash_tag=C:\\\\,integer_tag=2,string_tag=hello '
'bool_val=True,float_val=1.1,int_val=1i,string_val="hello!"\n'
)
@@ -115,6 +117,45 @@ def test_make_lines_unicode(self):
'test,unicode_tag=\'Привет!\' unicode_val="Привет!"\n'
)
+ def test_make_lines_empty_field_string(self):
+ """Test make lines with an empty string field."""
+ data = {
+ "points": [
+ {
+ "measurement": "test",
+ "fields": {
+ "string": "",
+ }
+ }
+ ]
+ }
+
+ self.assertEqual(
+ line_protocol.make_lines(data),
+ 'test string=""\n'
+ )
+
+ def test_tag_value_newline(self):
+ """Test make lines with tag value contains newline."""
+ data = {
+ "tags": {
+ "t1": "line1\nline2"
+ },
+ "points": [
+ {
+ "measurement": "test",
+ "fields": {
+ "val": "hello"
+ }
+ }
+ ]
+ }
+
+ self.assertEqual(
+ line_protocol.make_lines(data),
+ 'test,t1=line1\\nline2 val="hello"\n'
+ )
+
def test_quote_ident(self):
"""Test quote indentation in TestLineProtocol object."""
self.assertEqual(
@@ -145,3 +186,20 @@ def test_float_with_long_decimal_fraction(self):
line_protocol.make_lines(data),
'test float_val=1.0000000000000009\n'
)
+
+ def test_float_with_long_decimal_fraction_as_type_decimal(self):
+ """Ensure precision is preserved when casting Decimal into strings."""
+ data = {
+ "points": [
+ {
+ "measurement": "test",
+ "fields": {
+ "float_val": Decimal(0.8289445733333332),
+ }
+ }
+ ]
+ }
+ self.assertEqual(
+ line_protocol.make_lines(data),
+ 'test float_val=0.8289445733333332\n'
+ )
diff --git a/mypy.ini b/mypy.ini
new file mode 100644
index 00000000..308aa62d
--- /dev/null
+++ b/mypy.ini
@@ -0,0 +1,8 @@
+[mypy]
+ignore_missing_imports = True
+warn_unused_ignores = True
+warn_unused_configs = True
+warn_redundant_casts = True
+warn_no_return = True
+no_implicit_optional = True
+strict_equality = True
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 00000000..1b68d94e
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,3 @@
+[build-system]
+requires = ["setuptools>=42", "wheel"]
+build-backend = "setuptools.build_meta"
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index db5f6f85..a3df3154 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,5 @@
python-dateutil>=2.6.0
-pytz
+pytz>=2016.10
requests>=2.17.0
six>=1.10.0
+msgpack>=0.5.0
diff --git a/setup.py b/setup.py
index cd6e4e9b..8ac7d1a7 100755
--- a/setup.py
+++ b/setup.py
@@ -23,6 +23,11 @@
with open('requirements.txt', 'r') as f:
requires = [x.strip() for x in f if x.strip()]
+# Debugging: Print the requires values
+print("install_requires values:")
+for req in requires:
+ print(f"- {req}")
+
with open('test-requirements.txt', 'r') as f:
test_requires = [x.strip() for x in f if x.strip()]
@@ -42,7 +47,7 @@
tests_require=test_requires,
install_requires=requires,
extras_require={'test': test_requires},
- classifiers=(
+ classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
@@ -55,5 +60,5 @@
'Programming Language :: Python :: 3.6',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules',
- ),
+ ],
)
diff --git a/tox.ini b/tox.ini
index d0d87fec..a1005abb 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,14 +1,21 @@
[tox]
-envlist = py27, py35, py36, pypy, pypy3, flake8, pep257, coverage, docs
+envlist = py27, py35, py36, py37, pypy, pypy3, flake8, pep257, coverage, docs, mypy
[testenv]
passenv = INFLUXDB_PYTHON_INFLUXD_PATH
setenv = INFLUXDB_PYTHON_SKIP_SERVER_TESTS=False
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
- py27,py34,py35,py36: pandas==0.20.1
- py27,py34,py35,py36: numpy==1.13.3
+ py27: pandas==0.21.1
+ py27: numpy==1.13.3
+ py35: pandas==0.22.0
+ py35: numpy==1.14.6
+ py36: pandas==0.23.4
+ py36: numpy==1.15.4
+ py37: pandas>=0.24.2
+ py37: numpy>=1.16.2
# Only install pandas with non-pypy interpreters
+# Testing all combinations would be too expensive
commands = nosetests -v --with-doctest {posargs}
[testenv:flake8]
@@ -24,21 +31,29 @@ commands = pydocstyle --count -ve examples influxdb
[testenv:coverage]
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
- pandas
+ pandas==0.24.2
coverage
- numpy==1.13.3
+ numpy
commands = nosetests -v --with-coverage --cover-html --cover-package=influxdb
[testenv:docs]
deps = -r{toxinidir}/requirements.txt
- pandas==0.20.1
- numpy==1.13.3
- Sphinx==1.5.5
+ pandas>=0.24.2
+ numpy>=1.16.2
+ Sphinx>=1.8.5
sphinx_rtd_theme
commands = sphinx-build -b html docs/source docs/build
+[testenv:mypy]
+deps = -r{toxinidir}/test-requirements.txt
+ mypy==0.720
+commands = mypy --config-file mypy.ini -p influxdb
+
[flake8]
-ignore = N802,F821,E402
-# E402: module level import not at top of file
+ignore = W503,W504,W605,N802,F821,E402
+# W503: Line break occurred before a binary operator
+# W504: Line break occurred after a binary operator
+# W605: invalid escape sequence
# N802: nosetests's setUp function
# F821: False positive in intluxdb/dataframe_client.py
+# E402: module level import not at top of file