Skip to content

Commit 3235d4e

Browse files
authored
feat: flux query profiler (influxdata#260)
feat: flux query profiler
1 parent 30b75c9 commit 3235d4e

File tree

10 files changed

+479
-29
lines changed

10 files changed

+479
-29
lines changed

README.rst

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ The following options are supported:
176176
- ``ssl_ca_cert`` - set this to customize the certificate file to verify the peer
177177
- ``connection_pool_maxsize`` - set the number of connections to save that can be reused by urllib3
178178
- ``auth_basic`` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
179+
- ``profilers`` - set the list of enabled `Flux profilers <https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/>`_
179180

180181
.. code-block:: python
181182
@@ -204,11 +205,122 @@ Supported properties are:
204205
- ``INFLUXDB_V2_SSL_CA_CERT`` - set this to customize the certificate file to verify the peer
205206
- ``INFLUXDB_V2_CONNECTION_POOL_MAXSIZE`` - set the number of connections to save that can be reused by urllib3
206207
- ``INFLUXDB_V2_AUTH_BASIC`` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
208+
- ``INFLUXDB_V2_PROFILERS`` - set the list of enabled `Flux profilers <https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/>`_
207209

208210
.. code-block:: python
209211
210212
self.client = InfluxDBClient.from_env_properties()
211213
214+
Profile query
215+
^^^^^^^^^^^^^
216+
217+
The `Flux Profiler package <https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/>`_ provides
218+
performance profiling tools for Flux queries and operations.
219+
220+
You can enable printing profiler information of the Flux query in client library by:
221+
222+
- set QueryOptions.profilers in QueryApi,
223+
- set ``INFLUXDB_V2_PROFILERS`` environment variable,
224+
- set ``profilers`` option in configuration file.
225+
226+
When the profiler is enabled, the result of flux query contains additional tables "profiler/\*".
227+
In order to have consistent behaviour with enabled/disabled profiler, ``FluxCSVParser`` excludes "profiler/\*" measurements
228+
from result.
229+
230+
Example how to enable profilers using API:
231+
232+
.. code-block:: python
233+
234+
q = '''
235+
from(bucket: stringParam)
236+
|> range(start: -5m, stop: now())
237+
|> filter(fn: (r) => r._measurement == "mem")
238+
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
239+
|> aggregateWindow(every: 1m, fn: mean)
240+
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
241+
'''
242+
p = {
243+
"stringParam": "my-bucket",
244+
}
245+
246+
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
247+
csv_result = query_api.query(query=q, params=p)
248+
249+
250+
Example of a profiler output:
251+
252+
.. code-block::
253+
254+
===============
255+
Profiler: query
256+
===============
257+
258+
from(bucket: stringParam)
259+
|> range(start: -5m, stop: now())
260+
|> filter(fn: (r) => r._measurement == "mem")
261+
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
262+
|> aggregateWindow(every: 1m, fn: mean)
263+
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
264+
265+
========================
266+
Profiler: profiler/query
267+
========================
268+
result : _profiler
269+
table : 0
270+
_measurement : profiler/query
271+
TotalDuration : 8924700
272+
CompileDuration : 350900
273+
QueueDuration : 33800
274+
PlanDuration : 0
275+
RequeueDuration : 0
276+
ExecuteDuration : 8486500
277+
Concurrency : 0
278+
MaxAllocated : 2072
279+
TotalAllocated : 0
280+
flux/query-plan :
281+
282+
digraph {
283+
ReadWindowAggregateByTime11
284+
// every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"
285+
pivot8
286+
generated_yield
287+
288+
ReadWindowAggregateByTime11 -> pivot8
289+
pivot8 -> generated_yield
290+
}
291+
292+
293+
influxdb/scanned-bytes: 0
294+
influxdb/scanned-values: 0
295+
296+
===========================
297+
Profiler: profiler/operator
298+
===========================
299+
result : _profiler
300+
table : 1
301+
_measurement : profiler/operator
302+
Type : *universe.pivotTransformation
303+
Label : pivot8
304+
Count : 3
305+
MinDuration : 32600
306+
MaxDuration : 126200
307+
DurationSum : 193400
308+
MeanDuration : 64466.666666666664
309+
310+
===========================
311+
Profiler: profiler/operator
312+
===========================
313+
result : _profiler
314+
table : 1
315+
_measurement : profiler/operator
316+
Type : *influxdb.readWindowAggregateSource
317+
Label : ReadWindowAggregateByTime11
318+
Count : 1
319+
MinDuration : 940500
320+
MaxDuration : 940500
321+
DurationSum : 940500
322+
MeanDuration : 940500.0
323+
212324
.. marker-index-end
213325
214326

examples/query.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,24 @@
6262
"""
6363
Query: using Stream
6464
"""
65-
records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)')
65+
records = query_api.query_stream('''
66+
from(bucket:"my-bucket")
67+
|> range(start: -10m)
68+
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
69+
''')
6670

6771
for record in records:
6872
print(f'Temperature in {record["location"]} is {record["_value"]}')
6973

7074
"""
7175
Interrupt a stream after retrieve a required data
7276
"""
73-
large_stream = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -100d)')
77+
large_stream = query_api.query_stream('''
78+
from(bucket:"my-bucket")
79+
|> range(start: -100d)
80+
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
81+
''')
82+
7483
for record in large_stream:
7584
if record["location"] == "New York":
7685
print(f'New York temperature: {record["_value"]}')

influxdb_client/client/flux_csv_parser.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@ class FluxCsvParser(object):
4646
"""Parse to processing response from InfluxDB to FluxStructures or DataFrame."""
4747

4848
def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
49-
data_frame_index: List[str] = None) -> None:
49+
data_frame_index: List[str] = None, profilers: List[str] = None) -> None:
5050
"""Initialize defaults."""
5151
self._response = response
5252
self.tables = []
5353
self._serialization_mode = serialization_mode
5454
self._data_frame_index = data_frame_index
5555
self._data_frame_values = []
56+
self._profilers = profilers
5657
pass
5758

5859
def __enter__(self):
@@ -101,7 +102,9 @@ def _parse_flux_response(self):
101102

102103
# Return already parsed DataFrame
103104
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
104-
yield self._prepare_data_frame()
105+
df = self._prepare_data_frame()
106+
if not self._is_profiler_table(table):
107+
yield df
105108

106109
start_new_table = True
107110
table = FluxTable()
@@ -152,6 +155,10 @@ def _parse_flux_response(self):
152155

153156
flux_record = self.parse_record(table_index - 1, table, csv)
154157

158+
if self._is_profiler_record(flux_record):
159+
self._print_profiler_info(flux_record)
160+
continue
161+
155162
if self._serialization_mode is FluxSerializationMode.tables:
156163
self.tables[table_index - 1].records.append(flux_record)
157164

@@ -164,7 +171,9 @@ def _parse_flux_response(self):
164171

165172
# Return latest DataFrame
166173
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
167-
yield self._prepare_data_frame()
174+
df = self._prepare_data_frame()
175+
if not self._is_profiler_table(table):
176+
yield df
168177

169178
def _prepare_data_frame(self):
170179
from ..extras import pd
@@ -256,3 +265,42 @@ def add_column_names_and_tags(table, csv):
256265
def _insert_table(self, table, table_index):
257266
if self._serialization_mode is FluxSerializationMode.tables:
258267
self.tables.insert(table_index, table)
268+
269+
def _is_profiler_record(self, flux_record: FluxRecord) -> bool:
270+
if not self._profilers:
271+
return False
272+
273+
for profiler in self._profilers:
274+
if "_measurement" in flux_record.values and flux_record["_measurement"] == "profiler/" + profiler:
275+
return True
276+
277+
return False
278+
279+
def _is_profiler_table(self, table: FluxTable) -> bool:
280+
281+
if not self._profilers:
282+
return False
283+
284+
return any(filter(lambda column: (column.default_value == "_profiler" and column.label == "result"),
285+
table.columns))
286+
287+
def table_list(self) -> List[FluxTable]:
288+
"""Get the list of flux tables."""
289+
if not self._profilers:
290+
return self.tables
291+
else:
292+
return list(filter(lambda table: not self._is_profiler_table(table), self.tables))
293+
294+
@staticmethod
295+
def _print_profiler_info(flux_record: FluxRecord):
296+
if flux_record.get_measurement().startswith("profiler/"):
297+
msg = "Profiler: " + flux_record.get_measurement()
298+
print("\n" + len(msg) * "=")
299+
print(msg)
300+
print(len(msg) * "=")
301+
for name in flux_record.values:
302+
val = flux_record[name]
303+
if isinstance(val, str) and len(val) > 50:
304+
print(f"{name:<20}: \n\n{val}")
305+
elif val is not None:
306+
print(f"{name:<20}: {val:<20}")

influxdb_client/client/influxdb_client.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from influxdb_client.client.delete_api import DeleteApi
1313
from influxdb_client.client.labels_api import LabelsApi
1414
from influxdb_client.client.organizations_api import OrganizationsApi
15-
from influxdb_client.client.query_api import QueryApi
15+
from influxdb_client.client.query_api import QueryApi, QueryOptions
1616
from influxdb_client.client.tasks_api import TasksApi
1717
from influxdb_client.client.users_api import UsersApi
1818
from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings
@@ -45,6 +45,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
4545
:key bool auth_basic: Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that
4646
does not use auth-enabled but is protected by a reverse proxy with basic authentication.
4747
(defaults to false, don't set to true when talking to InfluxDB 2)
48+
:key list[str] profilers: list of enabled Flux profilers
4849
"""
4950
self.url = url
5051
self.token = token
@@ -75,6 +76,8 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
7576

7677
retries = kwargs.get('retries', False)
7778

79+
self.profilers = kwargs.get('profilers', None)
80+
7881
self.api_client = ApiClient(configuration=conf, header_name=auth_header_name,
7982
header_value=auth_header_value, retries=retries)
8083

@@ -111,6 +114,8 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
111114
- ssl_ca_cert
112115
- connection_pool_maxsize
113116
- auth_basic
117+
- profilers
118+
114119
115120
config.ini example::
116121
@@ -121,6 +126,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
121126
timeout=6000
122127
connection_pool_maxsize=25
123128
auth_basic=false
129+
profilers=query,operator
124130
125131
[tags]
126132
id = 132-987-655
@@ -136,6 +142,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
136142
timeout = 6000
137143
connection_pool_maxsize = 25
138144
auth_basic = false
145+
profilers="query, operator"
139146
140147
[tags]
141148
id = "132-987-655"
@@ -181,9 +188,14 @@ def config_value(key: str):
181188
tags = {k: v.strip('"') for k, v in config.items('tags')}
182189
default_tags = dict(tags)
183190

191+
profilers = None
192+
if config.has_option('influx2', 'profilers'):
193+
profilers = [x.strip() for x in config_value('profilers').split(',')]
194+
184195
return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,
185196
enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,
186-
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic))
197+
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
198+
profilers=profilers)
187199

188200
@classmethod
189201
def from_env_properties(cls, debug=None, enable_gzip=False):
@@ -209,6 +221,11 @@ def from_env_properties(cls, debug=None, enable_gzip=False):
209221
connection_pool_maxsize = os.getenv('INFLUXDB_V2_CONNECTION_POOL_MAXSIZE', None)
210222
auth_basic = os.getenv('INFLUXDB_V2_AUTH_BASIC', "False")
211223

224+
prof = os.getenv("INFLUXDB_V2_PROFILERS", None)
225+
profilers = None
226+
if prof is not None:
227+
profilers = [x.strip() for x in prof.split(',')]
228+
212229
default_tags = dict()
213230

214231
for key, value in os.environ.items():
@@ -217,7 +234,8 @@ def from_env_properties(cls, debug=None, enable_gzip=False):
217234

218235
return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,
219236
enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,
220-
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic))
237+
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
238+
profilers=profilers)
221239

222240
def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi:
223241
"""
@@ -229,13 +247,14 @@ def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()
229247
"""
230248
return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings)
231249

232-
def query_api(self) -> QueryApi:
250+
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi:
233251
"""
234252
Create a Query API instance.
235253
254+
:param query_options: optional query api configuration
236255
:return: Query api instance
237256
"""
238-
return QueryApi(self)
257+
return QueryApi(self, query_options)
239258

240259
def close(self):
241260
"""Shutdown the client."""

0 commit comments

Comments
 (0)