6
6
7
7
import codecs
8
8
import csv
9
+ from datetime import datetime , timedelta
9
10
from typing import List , Generator , Any
10
11
11
- from influxdb_client import Dialect
12
+ from influxdb_client import Dialect , IntegerLiteral , BooleanLiteral , FloatLiteral , DateTimeLiteral , StringLiteral , \
13
+ VariableAssignment , Identifier , OptionStatement , File , DurationLiteral , Duration , UnaryExpression
12
14
from influxdb_client import Query , QueryService
13
15
from influxdb_client .client .flux_csv_parser import FluxCsvParser , FluxSerializationMode
14
16
from influxdb_client .client .flux_table import FluxTable , FluxRecord
17
+ from influxdb_client .client .util .date_utils import get_date_helper
15
18
16
19
17
20
class QueryApi (object ):
@@ -29,51 +32,54 @@ def __init__(self, influxdb_client):
29
32
self ._influxdb_client = influxdb_client
30
33
self ._query_api = QueryService (influxdb_client .api_client )
31
34
32
- def query_csv (self , query : str , org = None , dialect : Dialect = default_dialect ):
35
+ def query_csv (self , query : str , org = None , dialect : Dialect = default_dialect , params : dict = None ):
33
36
"""
34
37
Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file.
35
38
36
39
:param query: a Flux query
37
40
:param org: organization name (optional if already specified in InfluxDBClient)
38
41
:param dialect: csv dialect format
42
+ :param params: bind parameters
39
43
:return: The returned object is an iterator. Each iteration returns a row of the CSV file
40
44
(which can span multiple input lines).
41
45
"""
42
46
if org is None :
43
47
org = self ._influxdb_client .org
44
- response = self ._query_api .post_query (org = org , query = self ._create_query (query , dialect ), async_req = False ,
45
- _preload_content = False )
48
+ response = self ._query_api .post_query (org = org , query = self ._create_query (query , dialect , params ) ,
49
+ async_req = False , _preload_content = False )
46
50
47
51
return csv .reader (codecs .iterdecode (response , 'utf-8' ))
48
52
49
- def query_raw (self , query : str , org = None , dialect = default_dialect ):
53
+ def query_raw (self , query : str , org = None , dialect = default_dialect , params : dict = None ):
50
54
"""
51
55
Execute synchronous Flux query and return result as raw unprocessed result as a str.
52
56
53
57
:param query: a Flux query
54
58
:param org: organization name (optional if already specified in InfluxDBClient)
55
59
:param dialect: csv dialect format
60
+ :param params: bind parameters
56
61
:return: str
57
62
"""
58
63
if org is None :
59
64
org = self ._influxdb_client .org
60
- result = self ._query_api .post_query (org = org , query = self ._create_query (query , dialect ), async_req = False ,
65
+ result = self ._query_api .post_query (org = org , query = self ._create_query (query , dialect , params ), async_req = False ,
61
66
_preload_content = False )
62
67
63
68
return result
64
69
65
- def query (self , query : str , org = None ) -> List ['FluxTable' ]:
70
+ def query (self , query : str , org = None , params : dict = None ) -> List ['FluxTable' ]:
66
71
"""
67
72
Execute synchronous Flux query and return result as a List['FluxTable'].
68
73
69
74
:param query: the Flux query
70
75
:param org: organization name (optional if already specified in InfluxDBClient)
76
+ :param params: bind parameters
71
77
:return:
72
78
"""
73
79
if org is None :
74
80
org = self ._influxdb_client .org
75
81
76
- response = self ._query_api .post_query (org = org , query = self ._create_query (query , self .default_dialect ),
82
+ response = self ._query_api .post_query (org = org , query = self ._create_query (query , self .default_dialect , params ),
77
83
async_req = False , _preload_content = False , _return_http_data_only = False )
78
84
79
85
_parser = FluxCsvParser (response = response , serialization_mode = FluxSerializationMode .tables )
@@ -82,25 +88,26 @@ def query(self, query: str, org=None) -> List['FluxTable']:
82
88
83
89
return _parser .tables
84
90
85
- def query_stream (self , query : str , org = None ) -> Generator ['FluxRecord' , Any , None ]:
91
+ def query_stream (self , query : str , org = None , params : dict = None ) -> Generator ['FluxRecord' , Any , None ]:
86
92
"""
87
93
Execute synchronous Flux query and return stream of FluxRecord as a Generator['FluxRecord'].
88
94
89
95
:param query: the Flux query
90
96
:param org: organization name (optional if already specified in InfluxDBClient)
97
+ :param params: bind parameters
91
98
:return:
92
99
"""
93
100
if org is None :
94
101
org = self ._influxdb_client .org
95
102
96
- response = self ._query_api .post_query (org = org , query = self ._create_query (query , self .default_dialect ),
103
+ response = self ._query_api .post_query (org = org , query = self ._create_query (query , self .default_dialect , params ),
97
104
async_req = False , _preload_content = False , _return_http_data_only = False )
98
105
99
106
_parser = FluxCsvParser (response = response , serialization_mode = FluxSerializationMode .stream )
100
107
101
108
return _parser .generator ()
102
109
103
- def query_data_frame (self , query : str , org = None , data_frame_index : List [str ] = None ):
110
+ def query_data_frame (self , query : str , org = None , data_frame_index : List [str ] = None , params : dict = None ):
104
111
"""
105
112
Execute synchronous Flux query and return Pandas DataFrame.
106
113
@@ -109,11 +116,12 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
109
116
:param query: the Flux query
110
117
:param org: organization name (optional if already specified in InfluxDBClient)
111
118
:param data_frame_index: the list of columns that are used as DataFrame index
119
+ :param params: bind parameters
112
120
:return:
113
121
"""
114
122
from ..extras import pd
115
123
116
- _generator = self .query_data_frame_stream (query , org = org , data_frame_index = data_frame_index )
124
+ _generator = self .query_data_frame_stream (query , org = org , data_frame_index = data_frame_index , params = params )
117
125
_dataFrames = list (_generator )
118
126
119
127
if len (_dataFrames ) == 0 :
@@ -123,7 +131,7 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
123
131
else :
124
132
return _dataFrames
125
133
126
- def query_data_frame_stream (self , query : str , org = None , data_frame_index : List [str ] = None ):
134
+ def query_data_frame_stream (self , query : str , org = None , data_frame_index : List [str ] = None , params : dict = None ):
127
135
"""
128
136
Execute synchronous Flux query and return stream of Pandas DataFrame as a Generator['pd.DataFrame'].
129
137
@@ -132,12 +140,13 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
132
140
:param query: the Flux query
133
141
:param org: organization name (optional if already specified in InfluxDBClient)
134
142
:param data_frame_index: the list of columns that are used as DataFrame index
143
+ :param params: bind parameters
135
144
:return:
136
145
"""
137
146
if org is None :
138
147
org = self ._influxdb_client .org
139
148
140
- response = self ._query_api .post_query (org = org , query = self ._create_query (query , self .default_dialect ),
149
+ response = self ._query_api .post_query (org = org , query = self ._create_query (query , self .default_dialect , params ),
141
150
async_req = False , _preload_content = False , _return_http_data_only = False )
142
151
143
152
_parser = FluxCsvParser (response = response , serialization_mode = FluxSerializationMode .dataFrame ,
@@ -146,10 +155,52 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
146
155
147
156
# private helper for c
148
157
@staticmethod
149
- def _create_query (query , dialect = default_dialect ):
150
- created = Query (query = query , dialect = dialect )
158
+ def _create_query (query , dialect = default_dialect , params : dict = None ):
159
+ created = Query (query = query , dialect = dialect , extern = QueryApi . _build_flux_ast ( params ) )
151
160
return created
152
161
162
+ @staticmethod
163
+ def _params_to_extern_ast (params : dict ) -> List ['OptionStatement' ]:
164
+
165
+ statements = []
166
+ for key , value in params .items ():
167
+ if value is None :
168
+ continue
169
+
170
+ if isinstance (value , bool ):
171
+ literal = BooleanLiteral ("BooleanLiteral" , value )
172
+ elif isinstance (value , int ):
173
+ literal = IntegerLiteral ("IntegerLiteral" , str (value ))
174
+ elif isinstance (value , float ):
175
+ literal = FloatLiteral ("FloatLiteral" , value )
176
+ elif isinstance (value , datetime ):
177
+ value = get_date_helper ().to_utc (value )
178
+ literal = DateTimeLiteral ("DateTimeLiteral" , value .strftime ('%Y-%m-%dT%H:%M:%S.%fZ' ))
179
+ elif isinstance (value , timedelta ):
180
+ # convert to microsecodns
181
+ _micro_delta = int (value / timedelta (microseconds = 1 ))
182
+ if _micro_delta < 0 :
183
+ literal = UnaryExpression ("UnaryExpression" , argument = DurationLiteral ("DurationLiteral" , [
184
+ Duration (magnitude = - _micro_delta , unit = "us" )]), operator = "-" )
185
+ else :
186
+ literal = DurationLiteral ("DurationLiteral" , [Duration (magnitude = _micro_delta , unit = "us" )])
187
+ elif isinstance (value , str ):
188
+ literal = StringLiteral ("StringLiteral" , str (value ))
189
+ else :
190
+ literal = value
191
+
192
+ statements .append (OptionStatement ("OptionStatement" ,
193
+ VariableAssignment ("VariableAssignment" , Identifier ("Identifier" , key ),
194
+ literal )))
195
+ return statements
196
+
197
+ @staticmethod
198
+ def _build_flux_ast (params : dict = None ):
199
+ if params is None :
200
+ return None
201
+
202
+ return File (package = None , name = None , type = None , imports = [], body = QueryApi ._params_to_extern_ast (params ))
203
+
153
204
def __del__ (self ):
154
205
"""Close QueryAPI."""
155
206
pass
0 commit comments