Skip to content

Commit 2870071

Browse files
authored
feat: query bind parameters (influxdata#220)
1 parent 8df7b18 commit 2870071

File tree

7 files changed

+366
-29
lines changed

7 files changed

+366
-29
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.17.0 [unreleased]
22

3+
### Features
4+
1. [#203](https://github.com/influxdata/influxdb-client-python/issues/219): Bind query parameters
5+
36
## 1.16.0 [2021-04-01]
47

58
### Features

README.rst

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -464,9 +464,10 @@ Queries
464464
The result retrieved by `QueryApi <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py>`_ could be formatted as a:
465465

466466
1. Flux data structure: `FluxTable <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5>`_, `FluxColumn <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L22>`_ and `FluxRecord <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L31>`_
467-
2. `csv.reader <https://docs.python.org/3.4/library/csv.html#reader-objects>`__ which will iterate over CSV lines
468-
3. Raw unprocessed results as a ``str`` iterator
469-
4. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
467+
2. Query bind parameters
468+
3. `csv.reader <https://docs.python.org/3.4/library/csv.html#reader-objects>`__ which will iterate over CSV lines
469+
4. Raw unprocessed results as a ``str`` iterator
470+
5. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
470471

471472
The API also support streaming ``FluxRecord`` via `query_stream <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py#L77>`_, see example below:
472473

@@ -502,6 +503,34 @@ The API also support streaming ``FluxRecord`` via `query_stream <https://github.
502503
print()
503504
print()
504505
506+
"""
507+
Query: using Bind parameters
508+
"""
509+
510+
p = {"_start": datetime.timedelta(hours=-1),
511+
"_location": "Prague",
512+
"_desc": True,
513+
"_floatParam": 25.1,
514+
"_every": datetime.timedelta(minutes=5)
515+
}
516+
517+
tables = query_api.query('''
518+
from(bucket:"my-bucket") |> range(start: _start)
519+
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
520+
|> filter(fn: (r) => r["_field"] == "temperature")
521+
|> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam)
522+
|> aggregateWindow(every: _every, fn: mean, createEmpty: true)
523+
|> sort(columns: ["_time"], desc: _desc)
524+
''', params=p)
525+
526+
for table in tables:
527+
print(table)
528+
for record in table.records:
529+
print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"]))
530+
531+
print()
532+
print()
533+
505534
"""
506535
Query: using Stream
507536
"""

examples/query.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import datetime as datetime
2+
13
from influxdb_client import InfluxDBClient, Point, Dialect
24
from influxdb_client.client.write_api import SYNCHRONOUS
35

4-
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
6+
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org",debug=True)
57

68
write_api = client.write_api(write_options=SYNCHRONOUS)
79
query_api = client.query_api()
@@ -28,6 +30,34 @@
2830
print()
2931
print()
3032

33+
"""
34+
Query: using Bind parameters
35+
"""
36+
37+
p = {"_start": datetime.timedelta(hours=-1),
38+
"_location": "Prague",
39+
"_desc": True,
40+
"_floatParam": 25.1,
41+
"_every": datetime.timedelta(minutes=5)
42+
}
43+
44+
tables = query_api.query('''
45+
from(bucket:"my-bucket") |> range(start: _start)
46+
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
47+
|> filter(fn: (r) => r["_field"] == "temperature")
48+
|> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam)
49+
|> aggregateWindow(every: _every, fn: mean, createEmpty: true)
50+
|> sort(columns: ["_time"], desc: _desc)
51+
''', params=p)
52+
53+
for table in tables:
54+
print(table)
55+
for record in table.records:
56+
print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"]))
57+
58+
print()
59+
print()
60+
3161
"""
3262
Query: using Stream
3363
"""
@@ -66,10 +96,13 @@
6696
"""
6797
Query: using Pandas DataFrame
6898
"""
69-
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
70-
'|> range(start: -10m) '
71-
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
72-
'|> keep(columns: ["location", "temperature"])')
99+
data_frame = query_api.query_data_frame('''
100+
from(bucket:"my-bucket")
101+
|> range(start: -10m)
102+
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
103+
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
104+
|> keep(columns: ["_time","location", "temperature"])
105+
''')
73106
print(data_frame.to_string())
74107

75108
"""

influxdb_client/client/query_api.py

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66

77
import codecs
88
import csv
9+
from datetime import datetime, timedelta
910
from typing import List, Generator, Any
1011

11-
from influxdb_client import Dialect
12+
from influxdb_client import Dialect, IntegerLiteral, BooleanLiteral, FloatLiteral, DateTimeLiteral, StringLiteral, \
13+
VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression
1214
from influxdb_client import Query, QueryService
1315
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode
1416
from influxdb_client.client.flux_table import FluxTable, FluxRecord
17+
from influxdb_client.client.util.date_utils import get_date_helper
1518

1619

1720
class QueryApi(object):
@@ -29,51 +32,54 @@ def __init__(self, influxdb_client):
2932
self._influxdb_client = influxdb_client
3033
self._query_api = QueryService(influxdb_client.api_client)
3134

32-
def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect):
35+
def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect, params: dict = None):
3336
"""
3437
Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file.
3538
3639
:param query: a Flux query
3740
:param org: organization name (optional if already specified in InfluxDBClient)
3841
:param dialect: csv dialect format
42+
:param params: bind parameters
3943
:return: The returned object is an iterator. Each iteration returns a row of the CSV file
4044
(which can span multiple input lines).
4145
"""
4246
if org is None:
4347
org = self._influxdb_client.org
44-
response = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False,
45-
_preload_content=False)
48+
response = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params),
49+
async_req=False, _preload_content=False)
4650

4751
return csv.reader(codecs.iterdecode(response, 'utf-8'))
4852

49-
def query_raw(self, query: str, org=None, dialect=default_dialect):
53+
def query_raw(self, query: str, org=None, dialect=default_dialect, params: dict = None):
5054
"""
5155
Execute synchronous Flux query and return result as raw unprocessed result as a str.
5256
5357
:param query: a Flux query
5458
:param org: organization name (optional if already specified in InfluxDBClient)
5559
:param dialect: csv dialect format
60+
:param params: bind parameters
5661
:return: str
5762
"""
5863
if org is None:
5964
org = self._influxdb_client.org
60-
result = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False,
65+
result = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params), async_req=False,
6166
_preload_content=False)
6267

6368
return result
6469

65-
def query(self, query: str, org=None) -> List['FluxTable']:
70+
def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']:
6671
"""
6772
Execute synchronous Flux query and return result as a List['FluxTable'].
6873
6974
:param query: the Flux query
7075
:param org: organization name (optional if already specified in InfluxDBClient)
76+
:param params: bind parameters
7177
:return:
7278
"""
7379
if org is None:
7480
org = self._influxdb_client.org
7581

76-
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
82+
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
7783
async_req=False, _preload_content=False, _return_http_data_only=False)
7884

7985
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables)
@@ -82,25 +88,26 @@ def query(self, query: str, org=None) -> List['FluxTable']:
8288

8389
return _parser.tables
8490

85-
def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, None]:
91+
def query_stream(self, query: str, org=None, params: dict = None) -> Generator['FluxRecord', Any, None]:
8692
"""
8793
Execute synchronous Flux query and return stream of FluxRecord as a Generator['FluxRecord'].
8894
8995
:param query: the Flux query
9096
:param org: organization name (optional if already specified in InfluxDBClient)
97+
:param params: bind parameters
9198
:return:
9299
"""
93100
if org is None:
94101
org = self._influxdb_client.org
95102

96-
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
103+
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
97104
async_req=False, _preload_content=False, _return_http_data_only=False)
98105

99106
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream)
100107

101108
return _parser.generator()
102109

103-
def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None):
110+
def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
104111
"""
105112
Execute synchronous Flux query and return Pandas DataFrame.
106113
@@ -109,11 +116,12 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
109116
:param query: the Flux query
110117
:param org: organization name (optional if already specified in InfluxDBClient)
111118
:param data_frame_index: the list of columns that are used as DataFrame index
119+
:param params: bind parameters
112120
:return:
113121
"""
114122
from ..extras import pd
115123

116-
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index)
124+
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params)
117125
_dataFrames = list(_generator)
118126

119127
if len(_dataFrames) == 0:
@@ -123,7 +131,7 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
123131
else:
124132
return _dataFrames
125133

126-
def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None):
134+
def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
127135
"""
128136
Execute synchronous Flux query and return stream of Pandas DataFrame as a Generator['pd.DataFrame'].
129137
@@ -132,12 +140,13 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
132140
:param query: the Flux query
133141
:param org: organization name (optional if already specified in InfluxDBClient)
134142
:param data_frame_index: the list of columns that are used as DataFrame index
143+
:param params: bind parameters
135144
:return:
136145
"""
137146
if org is None:
138147
org = self._influxdb_client.org
139148

140-
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
149+
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
141150
async_req=False, _preload_content=False, _return_http_data_only=False)
142151

143152
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
@@ -146,10 +155,52 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
146155

147156
# private helper for c
148157
@staticmethod
149-
def _create_query(query, dialect=default_dialect):
150-
created = Query(query=query, dialect=dialect)
158+
def _create_query(query, dialect=default_dialect, params: dict = None):
159+
created = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params))
151160
return created
152161

162+
@staticmethod
163+
def _params_to_extern_ast(params: dict) -> List['OptionStatement']:
164+
165+
statements = []
166+
for key, value in params.items():
167+
if value is None:
168+
continue
169+
170+
if isinstance(value, bool):
171+
literal = BooleanLiteral("BooleanLiteral", value)
172+
elif isinstance(value, int):
173+
literal = IntegerLiteral("IntegerLiteral", str(value))
174+
elif isinstance(value, float):
175+
literal = FloatLiteral("FloatLiteral", value)
176+
elif isinstance(value, datetime):
177+
value = get_date_helper().to_utc(value)
178+
literal = DateTimeLiteral("DateTimeLiteral", value.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
179+
elif isinstance(value, timedelta):
180+
# convert to microsecodns
181+
_micro_delta = int(value / timedelta(microseconds=1))
182+
if _micro_delta < 0:
183+
literal = UnaryExpression("UnaryExpression", argument=DurationLiteral("DurationLiteral", [
184+
Duration(magnitude=-_micro_delta, unit="us")]), operator="-")
185+
else:
186+
literal = DurationLiteral("DurationLiteral", [Duration(magnitude=_micro_delta, unit="us")])
187+
elif isinstance(value, str):
188+
literal = StringLiteral("StringLiteral", str(value))
189+
else:
190+
literal = value
191+
192+
statements.append(OptionStatement("OptionStatement",
193+
VariableAssignment("VariableAssignment", Identifier("Identifier", key),
194+
literal)))
195+
return statements
196+
197+
@staticmethod
198+
def _build_flux_ast(params: dict = None):
199+
if params is None:
200+
return None
201+
202+
return File(package=None, name=None, type=None, imports=[], body=QueryApi._params_to_extern_ast(params))
203+
153204
def __del__(self):
154205
"""Close QueryAPI."""
155206
pass

influxdb_client/client/util/date_utils.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""Utils to get right Date parsing function."""
2+
import datetime
23

34
from dateutil import parser
5+
from pytz import UTC
46

57
date_helper = None
68

@@ -30,6 +32,18 @@ def to_nanoseconds(self, delta):
3032

3133
return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros
3234

35+
def to_utc(self, value: datetime):
36+
"""
37+
Convert datetime to UTC timezone.
38+
39+
:param value: datetime
40+
:return: datetime in UTC
41+
"""
42+
if not value.tzinfo:
43+
return UTC.localize(value)
44+
else:
45+
return value.astimezone(UTC)
46+
3347

3448
def get_date_helper() -> DateHelper:
3549
"""

influxdb_client/client/write/point.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,7 @@ def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION):
204204
if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime):
205205

206206
if isinstance(timestamp, datetime):
207-
if not timestamp.tzinfo:
208-
timestamp = UTC.localize(timestamp)
209-
else:
210-
timestamp = timestamp.astimezone(UTC)
211-
timestamp = timestamp - EPOCH
207+
timestamp = date_helper.to_utc(timestamp) - EPOCH
212208

213209
ns = date_helper.to_nanoseconds(timestamp)
214210

0 commit comments

Comments
 (0)