Skip to content

Commit f2c6c85

Browse files
sartxlzchenAlex Boten
authored
ext/aiopg: Add instrumentation for aiopg (open-telemetry#801)
Co-authored-by: Leighton Chen <lechen@microsoft.com> Co-authored-by: Alex Boten <aboten@lightstep.com>
1 parent 5ff9600 commit f2c6c85

File tree

17 files changed

+1535
-22
lines changed

17 files changed

+1535
-22
lines changed

docs-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ asgiref~=3.0
77
asyncpg>=0.12.0
88
ddtrace>=0.34.0
99
aiohttp~= 3.0
10+
aiopg>=0.13.0
1011
Deprecated>=1.2.6
1112
django>=2.2
1213
PyMySQL~=0.9.3

docs/ext/aiopg/aiopg.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
OpenTelemetry aiopg instrumentation
2+
===================================
3+
4+
.. automodule:: opentelemetry.instrumentation.aiopg
5+
:members:
6+
:undoc-members:
7+
:show-inheritance:

ext/opentelemetry-ext-dbapi/src/opentelemetry/ext/dbapi/__init__.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646

4747
import wrapt
4848

49+
from opentelemetry import trace as trace_api
4950
from opentelemetry.ext.dbapi.version import __version__
5051
from opentelemetry.instrumentation.utils import unwrap
5152
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
@@ -300,37 +301,37 @@ class TracedCursor:
300301
def __init__(self, db_api_integration: DatabaseApiIntegration):
301302
self._db_api_integration = db_api_integration
302303

304+
def _populate_span(
305+
self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any]
306+
):
307+
statement = args[0] if args else ""
308+
span.set_attribute(
309+
"component", self._db_api_integration.database_component
310+
)
311+
span.set_attribute("db.type", self._db_api_integration.database_type)
312+
span.set_attribute("db.instance", self._db_api_integration.database)
313+
span.set_attribute("db.statement", statement)
314+
315+
for (
316+
attribute_key,
317+
attribute_value,
318+
) in self._db_api_integration.span_attributes.items():
319+
span.set_attribute(attribute_key, attribute_value)
320+
321+
if len(args) > 1:
322+
span.set_attribute("db.statement.parameters", str(args[1]))
323+
303324
def traced_execution(
304325
self,
305326
query_method: typing.Callable[..., typing.Any],
306327
*args: typing.Tuple[typing.Any, typing.Any],
307328
**kwargs: typing.Dict[typing.Any, typing.Any]
308329
):
309330

310-
statement = args[0] if args else ""
311331
with self._db_api_integration.get_tracer().start_as_current_span(
312332
self._db_api_integration.name, kind=SpanKind.CLIENT
313333
) as span:
314-
span.set_attribute(
315-
"component", self._db_api_integration.database_component
316-
)
317-
span.set_attribute(
318-
"db.type", self._db_api_integration.database_type
319-
)
320-
span.set_attribute(
321-
"db.instance", self._db_api_integration.database
322-
)
323-
span.set_attribute("db.statement", statement)
324-
325-
for (
326-
attribute_key,
327-
attribute_value,
328-
) in self._db_api_integration.span_attributes.items():
329-
span.set_attribute(attribute_key, attribute_value)
330-
331-
if len(args) > 1:
332-
span.set_attribute("db.statement.parameters", str(args[1]))
333-
334+
self._populate_span(span, *args)
334335
try:
335336
result = query_method(*args, **kwargs)
336337
span.set_status(Status(StatusCanonicalCode.OK))
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
# Copyright 2020, OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import asyncio
15+
import os
16+
import time
17+
18+
import aiopg
19+
import psycopg2
20+
import pytest
21+
22+
from opentelemetry import trace as trace_api
23+
from opentelemetry.instrumentation.aiopg import AiopgInstrumentor
24+
from opentelemetry.test.test_base import TestBase
25+
26+
POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost")
27+
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432"))
28+
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests")
29+
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword")
30+
POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser")
31+
32+
33+
def async_call(coro):
34+
loop = asyncio.get_event_loop()
35+
return loop.run_until_complete(coro)
36+
37+
38+
class TestFunctionalAiopgConnect(TestBase):
39+
@classmethod
40+
def setUpClass(cls):
41+
super().setUpClass()
42+
cls._connection = None
43+
cls._cursor = None
44+
cls._tracer = cls.tracer_provider.get_tracer(__name__)
45+
AiopgInstrumentor().instrument(tracer_provider=cls.tracer_provider)
46+
cls._connection = async_call(
47+
aiopg.connect(
48+
dbname=POSTGRES_DB_NAME,
49+
user=POSTGRES_USER,
50+
password=POSTGRES_PASSWORD,
51+
host=POSTGRES_HOST,
52+
port=POSTGRES_PORT,
53+
)
54+
)
55+
cls._cursor = async_call(cls._connection.cursor())
56+
57+
@classmethod
58+
def tearDownClass(cls):
59+
if cls._cursor:
60+
cls._cursor.close()
61+
if cls._connection:
62+
cls._connection.close()
63+
AiopgInstrumentor().uninstrument()
64+
65+
def validate_spans(self):
66+
spans = self.memory_exporter.get_finished_spans()
67+
self.assertEqual(len(spans), 2)
68+
for span in spans:
69+
if span.name == "rootSpan":
70+
root_span = span
71+
else:
72+
child_span = span
73+
self.assertIsInstance(span.start_time, int)
74+
self.assertIsInstance(span.end_time, int)
75+
self.assertIsNotNone(root_span)
76+
self.assertIsNotNone(child_span)
77+
self.assertEqual(root_span.name, "rootSpan")
78+
self.assertEqual(child_span.name, "postgresql.opentelemetry-tests")
79+
self.assertIsNotNone(child_span.parent)
80+
self.assertIs(child_span.parent, root_span.get_context())
81+
self.assertIs(child_span.kind, trace_api.SpanKind.CLIENT)
82+
self.assertEqual(
83+
child_span.attributes["db.instance"], POSTGRES_DB_NAME
84+
)
85+
self.assertEqual(child_span.attributes["net.peer.name"], POSTGRES_HOST)
86+
self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT)
87+
88+
def test_execute(self):
89+
"""Should create a child span for execute method
90+
"""
91+
with self._tracer.start_as_current_span("rootSpan"):
92+
async_call(
93+
self._cursor.execute(
94+
"CREATE TABLE IF NOT EXISTS test (id integer)"
95+
)
96+
)
97+
self.validate_spans()
98+
99+
def test_executemany(self):
100+
"""Should create a child span for executemany
101+
"""
102+
with pytest.raises(psycopg2.ProgrammingError):
103+
with self._tracer.start_as_current_span("rootSpan"):
104+
data = (("1",), ("2",), ("3",))
105+
stmt = "INSERT INTO test (id) VALUES (%s)"
106+
async_call(self._cursor.executemany(stmt, data))
107+
self.validate_spans()
108+
109+
def test_callproc(self):
110+
"""Should create a child span for callproc
111+
"""
112+
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
113+
Exception
114+
):
115+
async_call(self._cursor.callproc("test", ()))
116+
self.validate_spans()
117+
118+
119+
class TestFunctionalAiopgCreatePool(TestBase):
120+
@classmethod
121+
def setUpClass(cls):
122+
super().setUpClass()
123+
cls._connection = None
124+
cls._cursor = None
125+
cls._tracer = cls.tracer_provider.get_tracer(__name__)
126+
AiopgInstrumentor().instrument(tracer_provider=cls.tracer_provider)
127+
cls._pool = async_call(
128+
aiopg.create_pool(
129+
dbname=POSTGRES_DB_NAME,
130+
user=POSTGRES_USER,
131+
password=POSTGRES_PASSWORD,
132+
host=POSTGRES_HOST,
133+
port=POSTGRES_PORT,
134+
)
135+
)
136+
cls._connection = async_call(cls._pool.acquire())
137+
cls._cursor = async_call(cls._connection.cursor())
138+
139+
@classmethod
140+
def tearDownClass(cls):
141+
if cls._cursor:
142+
cls._cursor.close()
143+
if cls._connection:
144+
cls._connection.close()
145+
if cls._pool:
146+
cls._pool.close()
147+
AiopgInstrumentor().uninstrument()
148+
149+
def validate_spans(self):
150+
spans = self.memory_exporter.get_finished_spans()
151+
self.assertEqual(len(spans), 2)
152+
for span in spans:
153+
if span.name == "rootSpan":
154+
root_span = span
155+
else:
156+
child_span = span
157+
self.assertIsInstance(span.start_time, int)
158+
self.assertIsInstance(span.end_time, int)
159+
self.assertIsNotNone(root_span)
160+
self.assertIsNotNone(child_span)
161+
self.assertEqual(root_span.name, "rootSpan")
162+
self.assertEqual(child_span.name, "postgresql.opentelemetry-tests")
163+
self.assertIsNotNone(child_span.parent)
164+
self.assertIs(child_span.parent, root_span.get_context())
165+
self.assertIs(child_span.kind, trace_api.SpanKind.CLIENT)
166+
self.assertEqual(
167+
child_span.attributes["db.instance"], POSTGRES_DB_NAME
168+
)
169+
self.assertEqual(child_span.attributes["net.peer.name"], POSTGRES_HOST)
170+
self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT)
171+
172+
def test_execute(self):
173+
"""Should create a child span for execute method
174+
"""
175+
with self._tracer.start_as_current_span("rootSpan"):
176+
async_call(
177+
self._cursor.execute(
178+
"CREATE TABLE IF NOT EXISTS test (id integer)"
179+
)
180+
)
181+
self.validate_spans()
182+
183+
def test_executemany(self):
184+
"""Should create a child span for executemany
185+
"""
186+
with pytest.raises(psycopg2.ProgrammingError):
187+
with self._tracer.start_as_current_span("rootSpan"):
188+
data = (("1",), ("2",), ("3",))
189+
stmt = "INSERT INTO test (id) VALUES (%s)"
190+
async_call(self._cursor.executemany(stmt, data))
191+
self.validate_spans()
192+
193+
def test_callproc(self):
194+
"""Should create a child span for callproc
195+
"""
196+
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
197+
Exception
198+
):
199+
async_call(self._cursor.callproc("test", ()))
200+
self.validate_spans()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Changelog
2+
3+
## Unreleased
4+
5+
- Initial release

0 commit comments

Comments
 (0)