Skip to content

Commit a40edf5

Browse files
hectorhdzgtoumorokoshi
authored andcommitted
Adding psycopg2 integration (open-telemetry#298)
1 parent 47d42aa commit a40edf5

File tree

10 files changed

+312
-60
lines changed

10 files changed

+312
-60
lines changed

ext/opentelemetry-ext-dbapi/README.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ Usage
1414
from opentelemetry.trace import tracer_source
1515
from opentelemetry.ext.dbapi import trace_integration
1616
17-
17+
trace.set_preferred_tracer_source_implementation(lambda T: TracerSource())
18+
tracer = trace.tracer_source().get_tracer(__name__)
1819
# Ex: mysql.connector
1920
trace_integration(tracer_source(), mysql.connector, "connect", "mysql")
2021

ext/opentelemetry-ext-dbapi/src/opentelemetry/ext/dbapi/__init__.py

Lines changed: 62 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
https://www.python.org/dev/peps/pep-0249/
1919
"""
2020

21+
import functools
2122
import logging
2223
import typing
2324

@@ -72,16 +73,13 @@ def wrap_connect(
7273

7374

7475
class DatabaseApiIntegration:
75-
# pylint: disable=unused-argument
7676
def __init__(
7777
self,
7878
tracer: Tracer,
7979
database_component: str,
8080
database_type: str = "sql",
8181
connection_attributes=None,
8282
):
83-
if tracer is None:
84-
raise ValueError("The tracer is not provided.")
8583
self.connection_attributes = connection_attributes
8684
if self.connection_attributes is None:
8785
self.connection_attributes = {
@@ -107,18 +105,40 @@ def wrapped_connection(
107105
"""Add object proxy to connection object.
108106
"""
109107
connection = connect_method(*args, **kwargs)
108+
self.get_connection_attributes(connection)
109+
traced_connection = TracedConnectionProxy(connection, self)
110+
return traced_connection
110111

112+
def get_connection_attributes(self, connection):
113+
# Populate span fields using connection
111114
for key, value in self.connection_attributes.items():
112-
attribute = getattr(connection, value, None)
115+
# Allow attributes nested in connection object
116+
attribute = functools.reduce(
117+
lambda attribute, attribute_value: getattr(
118+
attribute, attribute_value, None
119+
),
120+
value.split("."),
121+
connection,
122+
)
113123
if attribute:
114124
self.connection_props[key] = attribute
115-
traced_connection = TracedConnection(connection, self)
116-
return traced_connection
125+
self.name = self.database_component
126+
self.database = self.connection_props.get("database", "")
127+
if self.database:
128+
self.name += "." + self.database
129+
user = self.connection_props.get("user")
130+
if user is not None:
131+
self.span_attributes["db.user"] = user
132+
host = self.connection_props.get("host")
133+
if host is not None:
134+
self.span_attributes["net.peer.name"] = host
135+
port = self.connection_props.get("port")
136+
if port is not None:
137+
self.span_attributes["net.peer.port"] = port
117138

118139

119140
# pylint: disable=abstract-method
120-
class TracedConnection(wrapt.ObjectProxy):
121-
141+
class TracedConnectionProxy(wrapt.ObjectProxy):
122142
# pylint: disable=unused-argument
123143
def __init__(
124144
self,
@@ -130,62 +150,17 @@ def __init__(
130150
wrapt.ObjectProxy.__init__(self, connection)
131151
self._db_api_integration = db_api_integration
132152

133-
self._db_api_integration.name = (
134-
self._db_api_integration.database_component
135-
)
136-
self._db_api_integration.database = self._db_api_integration.connection_props.get(
137-
"database", ""
138-
)
139-
if self._db_api_integration.database:
140-
self._db_api_integration.name += (
141-
"." + self._db_api_integration.database
142-
)
143-
user = self._db_api_integration.connection_props.get("user")
144-
if user is not None:
145-
self._db_api_integration.span_attributes["db.user"] = user
146-
host = self._db_api_integration.connection_props.get("host")
147-
if host is not None:
148-
self._db_api_integration.span_attributes["net.peer.name"] = host
149-
port = self._db_api_integration.connection_props.get("port")
150-
if port is not None:
151-
self._db_api_integration.span_attributes["net.peer.port"] = port
152-
153153
def cursor(self, *args, **kwargs):
154-
return TracedCursor(
154+
return TracedCursorProxy(
155155
self.__wrapped__.cursor(*args, **kwargs), self._db_api_integration
156156
)
157157

158158

159-
# pylint: disable=abstract-method
160-
class TracedCursor(wrapt.ObjectProxy):
161-
162-
# pylint: disable=unused-argument
163-
def __init__(
164-
self,
165-
cursor,
166-
db_api_integration: DatabaseApiIntegration,
167-
*args,
168-
**kwargs
169-
):
170-
wrapt.ObjectProxy.__init__(self, cursor)
159+
class TracedCursor:
160+
def __init__(self, db_api_integration: DatabaseApiIntegration):
171161
self._db_api_integration = db_api_integration
172162

173-
def execute(self, *args, **kwargs):
174-
return self._traced_execution(
175-
self.__wrapped__.execute, *args, **kwargs
176-
)
177-
178-
def executemany(self, *args, **kwargs):
179-
return self._traced_execution(
180-
self.__wrapped__.executemany, *args, **kwargs
181-
)
182-
183-
def callproc(self, *args, **kwargs):
184-
return self._traced_execution(
185-
self.__wrapped__.callproc, *args, **kwargs
186-
)
187-
188-
def _traced_execution(
163+
def traced_execution(
189164
self,
190165
query_method: typing.Callable[..., any],
191166
*args: typing.Tuple[any, any],
@@ -223,3 +198,33 @@ def _traced_execution(
223198
except Exception as ex: # pylint: disable=broad-except
224199
span.set_status(Status(StatusCanonicalCode.UNKNOWN, str(ex)))
225200
raise ex
201+
202+
203+
# pylint: disable=abstract-method
204+
class TracedCursorProxy(wrapt.ObjectProxy):
205+
206+
# pylint: disable=unused-argument
207+
def __init__(
208+
self,
209+
cursor,
210+
db_api_integration: DatabaseApiIntegration,
211+
*args,
212+
**kwargs
213+
):
214+
wrapt.ObjectProxy.__init__(self, cursor)
215+
self._traced_cursor = TracedCursor(db_api_integration)
216+
217+
def execute(self, *args, **kwargs):
218+
return self._traced_cursor.traced_execution(
219+
self.__wrapped__.execute, *args, **kwargs
220+
)
221+
222+
def executemany(self, *args, **kwargs):
223+
return self._traced_cursor.traced_execution(
224+
self.__wrapped__.executemany, *args, **kwargs
225+
)
226+
227+
def callproc(self, *args, **kwargs):
228+
return self._traced_cursor.traced_execution(
229+
self.__wrapped__.callproc, *args, **kwargs
230+
)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
OpenTelemetry Psycopg integration
2+
=================================
3+
4+
The integration with PostgreSQL supports the `Psycopg`_ library and is specified
5+
to ``trace_integration`` using ``'PostgreSQL'``.
6+
7+
.. Psycopg: http://initd.org/psycopg/
8+
9+
Usage
10+
-----
11+
12+
.. code:: python
13+
import psycopg2
14+
from opentelemetry import trace
15+
from opentelemetry.sdk.trace import TracerSource
16+
from opentelemetry.trace.ext.psycopg2 import trace_integration
17+
18+
trace.set_preferred_tracer_source_implementation(lambda T: TracerSource())
19+
tracer = trace.tracer_source().get_tracer(__name__)
20+
trace_integration(tracer)
21+
cnx = psycopg2.connect(database='Database')
22+
cursor = cnx.cursor()
23+
cursor.execute("INSERT INTO test (testField) VALUES (123)")
24+
cursor.close()
25+
cnx.close()
26+
27+
References
28+
----------
29+
* `OpenTelemetry Project <https://opentelemetry.io/>`_
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Copyright 2020, OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
[metadata]
16+
name = opentelemetry-ext-psycopg2
17+
description = OpenTelemetry psycopg2 integration
18+
long_description = file: README.rst
19+
long_description_content_type = text/x-rst
20+
author = OpenTelemetry Authors
21+
author_email = cncf-opentelemetry-contributors@lists.cncf.io
22+
url = https://github.com/open-telemetry/opentelemetry-python/ext/opentelemetry-ext-psycopg2
23+
platforms = any
24+
license = Apache-2.0
25+
classifiers =
26+
Development Status :: 3 - Alpha
27+
Intended Audience :: Developers
28+
License :: OSI Approved :: Apache Software License
29+
Programming Language :: Python
30+
Programming Language :: Python :: 3
31+
Programming Language :: Python :: 3.4
32+
Programming Language :: Python :: 3.5
33+
Programming Language :: Python :: 3.6
34+
Programming Language :: Python :: 3.7
35+
36+
[options]
37+
python_requires = >=3.4
38+
package_dir=
39+
=src
40+
packages=find_namespace:
41+
install_requires =
42+
opentelemetry-api >= 0.4.dev0
43+
psycopg2-binary >= 2.7.3.1
44+
wrapt >= 1.0.0, < 2.0.0
45+
46+
[options.packages.find]
47+
where = src
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Copyright 2020, OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import os
15+
16+
import setuptools
17+
18+
BASE_DIR = os.path.dirname(__file__)
19+
VERSION_FILENAME = os.path.join(
20+
BASE_DIR, "src", "opentelemetry", "ext", "psycopg2", "version.py"
21+
)
22+
PACKAGE_INFO = {}
23+
with open(VERSION_FILENAME) as f:
24+
exec(f.read(), PACKAGE_INFO)
25+
26+
setuptools.setup(version=PACKAGE_INFO["__version__"])
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright 2020, OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
The opentelemetry-ext-psycopg2 package allows tracing PostgreSQL queries made by the
17+
Psycopg2 library.
18+
"""
19+
20+
import logging
21+
import typing
22+
23+
import psycopg2
24+
import wrapt
25+
from psycopg2.sql import Composable
26+
27+
from opentelemetry.ext.dbapi import DatabaseApiIntegration, TracedCursor
28+
from opentelemetry.trace import Tracer
29+
30+
logger = logging.getLogger(__name__)
31+
32+
DATABASE_COMPONENT = "postgresql"
33+
DATABASE_TYPE = "sql"
34+
35+
36+
def trace_integration(tracer):
37+
"""Integrate with PostgreSQL Psycopg library.
38+
Psycopg: http://initd.org/psycopg/
39+
"""
40+
41+
connection_attributes = {
42+
"database": "info.dbname",
43+
"port": "info.port",
44+
"host": "info.host",
45+
"user": "info.user",
46+
}
47+
db_integration = DatabaseApiIntegration(
48+
tracer,
49+
DATABASE_COMPONENT,
50+
database_type=DATABASE_TYPE,
51+
connection_attributes=connection_attributes,
52+
)
53+
54+
# pylint: disable=unused-argument
55+
def wrap_connect(
56+
connect_func: typing.Callable[..., any],
57+
instance: typing.Any,
58+
args: typing.Tuple[any, any],
59+
kwargs: typing.Dict[any, any],
60+
):
61+
connection = connect_func(*args, **kwargs)
62+
db_integration.get_connection_attributes(connection)
63+
connection.cursor_factory = PsycopgTraceCursor
64+
return connection
65+
66+
try:
67+
wrapt.wrap_function_wrapper(psycopg2, "connect", wrap_connect)
68+
except Exception as ex: # pylint: disable=broad-except
69+
logger.warning("Failed to integrate with pyscopg2. %s", str(ex))
70+
71+
class PsycopgTraceCursor(psycopg2.extensions.cursor):
72+
def __init__(self, *args, **kwargs):
73+
self._traced_cursor = TracedCursor(db_integration)
74+
super(PsycopgTraceCursor, self).__init__(*args, **kwargs)
75+
76+
# pylint: disable=redefined-builtin
77+
def execute(self, query, vars=None):
78+
if isinstance(query, Composable):
79+
query = query.as_string(self)
80+
return self._traced_cursor.traced_execution(
81+
super(PsycopgTraceCursor, self).execute, query, vars
82+
)
83+
84+
# pylint: disable=redefined-builtin
85+
def executemany(self, query, vars):
86+
if isinstance(query, Composable):
87+
query = query.as_string(self)
88+
return self._traced_cursor.traced_execution(
89+
super(PsycopgTraceCursor, self).executemany, query, vars
90+
)
91+
92+
# pylint: disable=redefined-builtin
93+
def callproc(self, procname, vars=None):
94+
return self._traced_cursor.traced_execution(
95+
super(PsycopgTraceCursor, self).callproc, procname, vars
96+
)

0 commit comments

Comments
 (0)