6
6
7
7
from sentry_sdk .hub import Hub
8
8
from sentry_sdk .utils import capture_internal_exceptions , event_from_exception
9
+ from sentry_sdk .tracing import SpanContext
9
10
from sentry_sdk ._compat import reraise
10
11
from sentry_sdk .integrations import Integration
11
12
from sentry_sdk .integrations .logging import ignore_logger
14
15
class CeleryIntegration (Integration ):
15
16
identifier = "celery"
16
17
18
+ def __init__ (self , propagate_traces = True ):
19
+ self .propagate_traces = propagate_traces
20
+
17
21
@staticmethod
18
22
def setup_once ():
19
23
import celery .app .trace as trace # type: ignore
@@ -25,6 +29,7 @@ def sentry_build_tracer(name, task, *args, **kwargs):
25
29
# short-circuits to task.run if it thinks it's safe.
26
30
task .__call__ = _wrap_task_call (task , task .__call__ )
27
31
task .run = _wrap_task_call (task , task .run )
32
+ task .apply_async = _wrap_apply_async (task , task .apply_async )
28
33
return _wrap_tracer (task , old_build_tracer (name , task , * args , ** kwargs ))
29
34
30
35
trace .build_tracer = sentry_build_tracer
@@ -37,6 +42,23 @@ def sentry_build_tracer(name, task, *args, **kwargs):
37
42
ignore_logger ("celery.worker.job" )
38
43
39
44
45
+ def _wrap_apply_async (task , f ):
46
+ def apply_async (self , * args , ** kwargs ):
47
+ hub = Hub .current
48
+ integration = hub .get_integration (CeleryIntegration )
49
+ if integration is not None and integration .propagate_traces :
50
+ headers = None
51
+ for key , value in hub .iter_trace_propagation_headers ():
52
+ if headers is None :
53
+ headers = dict (kwargs .get ("headers" ) or {})
54
+ headers [key ] = value
55
+ if headers is not None :
56
+ kwargs ["headers" ] = headers
57
+ return f (self , * args , ** kwargs )
58
+
59
+ return apply_async
60
+
61
+
40
62
def _wrap_tracer (task , f ):
41
63
# Need to wrap tracer for pushing the scope before prerun is sent, and
42
64
# popping it after postrun is sent.
@@ -52,13 +74,22 @@ def _inner(*args, **kwargs):
52
74
with hub .push_scope () as scope :
53
75
scope ._name = "celery"
54
76
scope .clear_breadcrumbs ()
77
+ _continue_trace (args [3 ].get ("headers" ) or {}, scope )
55
78
scope .add_event_processor (_make_event_processor (task , * args , ** kwargs ))
56
79
57
80
return f (* args , ** kwargs )
58
81
59
82
return _inner
60
83
61
84
85
+ def _continue_trace (headers , scope ):
86
+ if headers :
87
+ span_context = SpanContext .continue_from_headers (headers )
88
+ else :
89
+ span_context = SpanContext .start_trace ()
90
+ scope .set_span_context (span_context )
91
+
92
+
62
93
def _wrap_task_call (task , f ):
63
94
# Need to wrap task call because the exception is caught before we get to
64
95
# see it. Also celery's reported stacktrace is untrustworthy.
0 commit comments