Skip to content

Commit 7f74a89

Browse files
thomasdesralrex
and
alrex
authored
ext/asyncpg: Shouldn't capture query parameters by default (open-telemetry#854)
* Update CHANGELOG.md Co-authored-by: alrex <aboten@lightstep.com>
1 parent 090fb7c commit 7f74a89

File tree

3 files changed

+150
-95
lines changed

3 files changed

+150
-95
lines changed

ext/opentelemetry-ext-asyncpg/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## Unreleased
44

5+
- Shouldn't capture query parameters by default
6+
([#854](https://github.com/open-telemetry/opentelemetry-python/pull/854))
7+
58
## Version 0.10b0
69

710
Released 2020-06-23

ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/__init__.py

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -74,36 +74,11 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict:
7474
return span_attributes
7575

7676

77-
async def _do_execute(func, instance, args, kwargs):
78-
span_attributes = _hydrate_span_from_args(instance, args[0], args[1:])
79-
tracer = getattr(asyncpg, _APPLIED)
80-
81-
exception = None
82-
83-
with tracer.start_as_current_span(
84-
"postgresql", kind=SpanKind.CLIENT
85-
) as span:
86-
87-
for attribute, value in span_attributes.items():
88-
span.set_attribute(attribute, value)
89-
90-
try:
91-
result = await func(*args, **kwargs)
92-
except Exception as exc: # pylint: disable=W0703
93-
exception = exc
94-
raise
95-
finally:
96-
if exception is not None:
97-
span.set_status(
98-
Status(_exception_to_canonical_code(exception))
99-
)
100-
else:
101-
span.set_status(Status(StatusCanonicalCode.OK))
102-
103-
return result
104-
105-
10677
class AsyncPGInstrumentor(BaseInstrumentor):
78+
def __init__(self, capture_parameters=False):
79+
super().__init__()
80+
self.capture_parameters = capture_parameters
81+
10782
def _instrument(self, **kwargs):
10883
tracer_provider = kwargs.get(
10984
"tracer_provider", trace.get_tracer_provider()
@@ -113,6 +88,7 @@ def _instrument(self, **kwargs):
11388
_APPLIED,
11489
tracer_provider.get_tracer("asyncpg", __version__),
11590
)
91+
11692
for method in [
11793
"Connection.execute",
11894
"Connection.executemany",
@@ -121,7 +97,7 @@ def _instrument(self, **kwargs):
12197
"Connection.fetchrow",
12298
]:
12399
wrapt.wrap_function_wrapper(
124-
"asyncpg.connection", method, _do_execute
100+
"asyncpg.connection", method, self._do_execute
125101
)
126102

127103
def _uninstrument(self, **__):
@@ -134,3 +110,33 @@ def _uninstrument(self, **__):
134110
"fetchrow",
135111
]:
136112
unwrap(asyncpg.Connection, method)
113+
114+
async def _do_execute(self, func, instance, args, kwargs):
115+
span_attributes = _hydrate_span_from_args(
116+
instance, args[0], args[1:] if self.capture_parameters else None,
117+
)
118+
tracer = getattr(asyncpg, _APPLIED)
119+
120+
exception = None
121+
122+
with tracer.start_as_current_span(
123+
"postgresql", kind=SpanKind.CLIENT
124+
) as span:
125+
126+
for attribute, value in span_attributes.items():
127+
span.set_attribute(attribute, value)
128+
129+
try:
130+
result = await func(*args, **kwargs)
131+
except Exception as exc: # pylint: disable=W0703
132+
exception = exc
133+
raise
134+
finally:
135+
if exception is not None:
136+
span.set_status(
137+
Status(_exception_to_canonical_code(exception))
138+
)
139+
else:
140+
span.set_status(Status(StatusCanonicalCode.OK))
141+
142+
return result

ext/opentelemetry-ext-docker-tests/tests/asyncpg/test_asyncpg_functional.py

Lines changed: 111 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def _await(coro):
1919
return loop.run_until_complete(coro)
2020

2121

22-
class TestFunctionalPsycopg(TestBase):
22+
class TestFunctionalAsyncPG(TestBase):
2323
@classmethod
2424
def setUpClass(cls):
2525
super().setUpClass()
@@ -58,24 +58,6 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__):
5858
},
5959
)
6060

61-
def test_instrumented_execute_method_with_arguments(self, *_, **__):
62-
_await(self._connection.execute("SELECT $1;", "1"))
63-
spans = self.memory_exporter.get_finished_spans()
64-
self.assertEqual(len(spans), 1)
65-
self.assertEqual(
66-
StatusCanonicalCode.OK, spans[0].status.canonical_code
67-
)
68-
self.assertEqual(
69-
spans[0].attributes,
70-
{
71-
"db.type": "sql",
72-
"db.user": POSTGRES_USER,
73-
"db.statement.parameters": "('1',)",
74-
"db.instance": POSTGRES_DB_NAME,
75-
"db.statement": "SELECT $1;",
76-
},
77-
)
78-
7961
def test_instrumented_fetch_method_without_arguments(self, *_, **__):
8062
_await(self._connection.fetch("SELECT 42;"))
8163
spans = self.memory_exporter.get_finished_spans()
@@ -90,52 +72,6 @@ def test_instrumented_fetch_method_without_arguments(self, *_, **__):
9072
},
9173
)
9274

93-
def test_instrumented_fetch_method_with_arguments(self, *_, **__):
94-
_await(self._connection.fetch("SELECT $1;", "1"))
95-
spans = self.memory_exporter.get_finished_spans()
96-
self.assertEqual(len(spans), 1)
97-
self.assertEqual(
98-
spans[0].attributes,
99-
{
100-
"db.type": "sql",
101-
"db.user": POSTGRES_USER,
102-
"db.statement.parameters": "('1',)",
103-
"db.instance": POSTGRES_DB_NAME,
104-
"db.statement": "SELECT $1;",
105-
},
106-
)
107-
108-
def test_instrumented_executemany_method_with_arguments(self, *_, **__):
109-
_await(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
110-
spans = self.memory_exporter.get_finished_spans()
111-
self.assertEqual(len(spans), 1)
112-
self.assertEqual(
113-
{
114-
"db.type": "sql",
115-
"db.statement": "SELECT $1;",
116-
"db.statement.parameters": "([['1'], ['2']],)",
117-
"db.user": POSTGRES_USER,
118-
"db.instance": POSTGRES_DB_NAME,
119-
},
120-
spans[0].attributes,
121-
)
122-
123-
def test_instrumented_execute_interface_error_method(self, *_, **__):
124-
with self.assertRaises(asyncpg.InterfaceError):
125-
_await(self._connection.execute("SELECT 42;", 1, 2, 3))
126-
spans = self.memory_exporter.get_finished_spans()
127-
self.assertEqual(len(spans), 1)
128-
self.assertEqual(
129-
spans[0].attributes,
130-
{
131-
"db.type": "sql",
132-
"db.instance": POSTGRES_DB_NAME,
133-
"db.user": POSTGRES_USER,
134-
"db.statement.parameters": "(1, 2, 3)",
135-
"db.statement": "SELECT 42;",
136-
},
137-
)
138-
13975
def test_instrumented_transaction_method(self, *_, **__):
14076
async def _transaction_execute():
14177
async with self._connection.transaction():
@@ -229,3 +165,113 @@ async def _transaction_execute():
229165
self.assertEqual(
230166
StatusCanonicalCode.OK, spans[2].status.canonical_code
231167
)
168+
169+
def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
170+
_await(self._connection.execute("SELECT $1;", "1"))
171+
spans = self.memory_exporter.get_finished_spans()
172+
self.assertEqual(len(spans), 1)
173+
self.assertEqual(
174+
StatusCanonicalCode.OK, spans[0].status.canonical_code
175+
)
176+
self.assertEqual(
177+
spans[0].attributes,
178+
{
179+
"db.type": "sql",
180+
"db.user": POSTGRES_USER,
181+
# This shouldn't be set because we don't capture parameters by
182+
# default
183+
#
184+
# "db.statement.parameters": "('1',)",
185+
"db.instance": POSTGRES_DB_NAME,
186+
"db.statement": "SELECT $1;",
187+
},
188+
)
189+
190+
191+
class TestFunctionalAsyncPG_CaptureParameters(TestBase):
192+
@classmethod
193+
def setUpClass(cls):
194+
super().setUpClass()
195+
cls._connection = None
196+
cls._cursor = None
197+
cls._tracer = cls.tracer_provider.get_tracer(__name__)
198+
AsyncPGInstrumentor(capture_parameters=True).instrument(
199+
tracer_provider=cls.tracer_provider
200+
)
201+
cls._connection = _await(
202+
asyncpg.connect(
203+
database=POSTGRES_DB_NAME,
204+
user=POSTGRES_USER,
205+
password=POSTGRES_PASSWORD,
206+
host=POSTGRES_HOST,
207+
port=POSTGRES_PORT,
208+
)
209+
)
210+
211+
@classmethod
212+
def tearDownClass(cls):
213+
AsyncPGInstrumentor().uninstrument()
214+
215+
def test_instrumented_execute_method_with_arguments(self, *_, **__):
216+
_await(self._connection.execute("SELECT $1;", "1"))
217+
spans = self.memory_exporter.get_finished_spans()
218+
self.assertEqual(len(spans), 1)
219+
self.assertEqual(
220+
StatusCanonicalCode.OK, spans[0].status.canonical_code
221+
)
222+
self.assertEqual(
223+
spans[0].attributes,
224+
{
225+
"db.type": "sql",
226+
"db.user": POSTGRES_USER,
227+
"db.statement.parameters": "('1',)",
228+
"db.instance": POSTGRES_DB_NAME,
229+
"db.statement": "SELECT $1;",
230+
},
231+
)
232+
233+
def test_instrumented_fetch_method_with_arguments(self, *_, **__):
234+
_await(self._connection.fetch("SELECT $1;", "1"))
235+
spans = self.memory_exporter.get_finished_spans()
236+
self.assertEqual(len(spans), 1)
237+
self.assertEqual(
238+
spans[0].attributes,
239+
{
240+
"db.type": "sql",
241+
"db.user": POSTGRES_USER,
242+
"db.statement.parameters": "('1',)",
243+
"db.instance": POSTGRES_DB_NAME,
244+
"db.statement": "SELECT $1;",
245+
},
246+
)
247+
248+
def test_instrumented_executemany_method_with_arguments(self, *_, **__):
249+
_await(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
250+
spans = self.memory_exporter.get_finished_spans()
251+
self.assertEqual(len(spans), 1)
252+
self.assertEqual(
253+
{
254+
"db.type": "sql",
255+
"db.statement": "SELECT $1;",
256+
"db.statement.parameters": "([['1'], ['2']],)",
257+
"db.user": POSTGRES_USER,
258+
"db.instance": POSTGRES_DB_NAME,
259+
},
260+
spans[0].attributes,
261+
)
262+
263+
def test_instrumented_execute_interface_error_method(self, *_, **__):
264+
with self.assertRaises(asyncpg.InterfaceError):
265+
_await(self._connection.execute("SELECT 42;", 1, 2, 3))
266+
spans = self.memory_exporter.get_finished_spans()
267+
self.assertEqual(len(spans), 1)
268+
self.assertEqual(
269+
spans[0].attributes,
270+
{
271+
"db.type": "sql",
272+
"db.instance": POSTGRES_DB_NAME,
273+
"db.user": POSTGRES_USER,
274+
"db.statement.parameters": "(1, 2, 3)",
275+
"db.statement": "SELECT 42;",
276+
},
277+
)

0 commit comments

Comments
 (0)