Skip to content

Commit 633dba9

Browse files
authored
fix(celery): Fix dropped transactions under Celery 4.2+ (getsentry#825)
* Work around celery/celery#4875 which causes us to lose transaction events. Fix getsentry#824 * Rewrite celery testsuite to use redis backend and test transactions too. This is better because it works on more celery versions (memory backend is often broken). However, this still does not trigger the bug, so I guess for this to be properly tested we'd need to install rabbitmq into CI? No thanks
1 parent 93f6d33 commit 633dba9

File tree

6 files changed

+85
-41
lines changed

6 files changed

+85
-41
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ dist: xenial
44

55
services:
66
- postgresql
7+
- redis-server
78

89
language: python
910

sentry_sdk/integrations/celery.py

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,23 @@ def apply_async(*args, **kwargs):
9393
hub = Hub.current
9494
integration = hub.get_integration(CeleryIntegration)
9595
if integration is not None and integration.propagate_traces:
96-
headers = None
97-
for key, value in hub.iter_trace_propagation_headers():
98-
if headers is None:
99-
headers = dict(kwargs.get("headers") or {})
100-
headers[key] = value
101-
if headers is not None:
102-
kwargs["headers"] = headers
103-
10496
with hub.start_span(op="celery.submit", description=task.name):
97+
with capture_internal_exceptions():
98+
headers = dict(hub.iter_trace_propagation_headers())
99+
if headers:
100+
kwarg_headers = kwargs.setdefault("headers", {})
101+
kwarg_headers.update(headers)
102+
103+
# https://github.com/celery/celery/issues/4875
104+
#
105+
# Need to setdefault the inner headers too since other
106+
# tracing tools (dd-trace-py) also employ this exact
107+
# workaround and we don't want to break them.
108+
#
109+
# This is not reproducible outside of AMQP, therefore no
110+
# tests!
111+
kwarg_headers.setdefault("headers", {}).update(headers)
112+
105113
return f(*args, **kwargs)
106114
else:
107115
return f(*args, **kwargs)
@@ -130,19 +138,22 @@ def _inner(*args, **kwargs):
130138
scope.clear_breadcrumbs()
131139
scope.add_event_processor(_make_event_processor(task, *args, **kwargs))
132140

133-
transaction = Transaction.continue_from_headers(
134-
args[3].get("headers") or {},
135-
op="celery.task",
136-
name="unknown celery task",
137-
)
138-
139-
# Could possibly use a better hook than this one
140-
transaction.set_status("ok")
141+
transaction = None
141142

143+
# Celery task objects are not a thing to be trusted. Even
144+
# something such as attribute access can fail.
142145
with capture_internal_exceptions():
143-
# Celery task objects are not a thing to be trusted. Even
144-
# something such as attribute access can fail.
146+
transaction = Transaction.continue_from_headers(
147+
args[3].get("headers") or {},
148+
op="celery.task",
149+
name="unknown celery task",
150+
)
151+
145152
transaction.name = task.name
153+
transaction.set_status("ok")
154+
155+
if transaction is None:
156+
return f(*args, **kwargs)
146157

147158
with hub.start_transaction(transaction):
148159
return f(*args, **kwargs)

sentry_sdk/tracing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ def set_status(self, value):
318318

319319
def set_http_status(self, http_status):
320320
# type: (int) -> None
321-
self.set_tag("http.status_code", http_status)
321+
self.set_tag("http.status_code", str(http_status))
322322

323323
if http_status < 400:
324324
self.set_status("ok")

tests/conftest.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def append_event(event):
197197
def append_envelope(envelope):
198198
for item in envelope:
199199
if item.headers.get("type") in ("event", "transaction"):
200-
events.append(item.payload.json)
200+
test_client.transport.capture_event(item.payload.json)
201201
return old_capture_envelope(envelope)
202202

203203
monkeypatch.setattr(test_client.transport, "capture_event", append_event)
@@ -233,8 +233,14 @@ def append_envelope(envelope):
233233

234234

235235
@pytest.fixture
236-
def capture_events_forksafe(monkeypatch):
236+
def capture_events_forksafe(monkeypatch, capture_events, request):
237237
def inner():
238+
in_process_events = capture_events()
239+
240+
@request.addfinalizer
241+
def _():
242+
assert not in_process_events
243+
238244
events_r, events_w = os.pipe()
239245
events_r = os.fdopen(events_r, "rb", 0)
240246
events_w = os.fdopen(events_w, "wb", 0)

tests/integrations/celery/test_celery.py

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,41 @@ def inner(signal, f):
2222

2323

2424
@pytest.fixture
25-
def init_celery(sentry_init):
26-
def inner(propagate_traces=True, **kwargs):
25+
def init_celery(sentry_init, request):
26+
def inner(propagate_traces=True, backend="always_eager", **kwargs):
2727
sentry_init(
2828
integrations=[CeleryIntegration(propagate_traces=propagate_traces)],
2929
**kwargs
3030
)
3131
celery = Celery(__name__)
32-
if VERSION < (4,):
33-
celery.conf.CELERY_ALWAYS_EAGER = True
32+
33+
if backend == "always_eager":
34+
if VERSION < (4,):
35+
celery.conf.CELERY_ALWAYS_EAGER = True
36+
else:
37+
celery.conf.task_always_eager = True
38+
elif backend == "redis":
39+
# broken on celery 3
40+
if VERSION < (4,):
41+
pytest.skip("Redis backend broken for some reason")
42+
43+
# this backend requires capture_events_forksafe
44+
celery.conf.worker_max_tasks_per_child = 1
45+
celery.conf.broker_url = "redis://127.0.0.1:6379"
46+
celery.conf.result_backend = "redis://127.0.0.1:6379"
47+
celery.conf.task_always_eager = False
48+
49+
Hub.main.bind_client(Hub.current.client)
50+
request.addfinalizer(lambda: Hub.main.bind_client(None))
51+
52+
# Once we drop celery 3 we can use the celery_worker fixture
53+
w = worker.worker(app=celery)
54+
t = threading.Thread(target=w.run)
55+
t.daemon = True
56+
t.start()
3457
else:
35-
celery.conf.task_always_eager = True
58+
raise ValueError(backend)
59+
3660
return celery
3761

3862
return inner
@@ -273,15 +297,10 @@ def dummy_task(self):
273297

274298

275299
@pytest.mark.forked
276-
@pytest.mark.skipif(VERSION < (4,), reason="in-memory backend broken")
277-
def test_transport_shutdown(request, celery, capture_events_forksafe, tmpdir):
278-
events = capture_events_forksafe()
300+
def test_redis_backend(init_celery, capture_events_forksafe, tmpdir):
301+
celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True)
279302

280-
celery.conf.worker_max_tasks_per_child = 1
281-
celery.conf.broker_url = "memory://localhost/"
282-
celery.conf.broker_backend = "memory"
283-
celery.conf.result_backend = "file://{}".format(tmpdir.mkdir("celery-results"))
284-
celery.conf.task_always_eager = False
303+
events = capture_events_forksafe()
285304

286305
runs = []
287306

@@ -290,21 +309,26 @@ def dummy_task(self):
290309
runs.append(1)
291310
1 / 0
292311

293-
res = dummy_task.delay()
294-
295-
w = worker.worker(app=celery)
296-
t = threading.Thread(target=w.run)
297-
t.daemon = True
298-
t.start()
312+
# Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
313+
res = dummy_task.apply_async()
299314

300315
with pytest.raises(Exception):
301316
# Celery 4.1 raises a gibberish exception
302317
res.wait()
303318

319+
# if this is nonempty, the worker never really forked
320+
assert not runs
321+
304322
event = events.read_event()
305323
(exception,) = event["exception"]["values"]
306324
assert exception["type"] == "ZeroDivisionError"
307325

326+
transaction = events.read_event()
327+
assert (
328+
transaction["contexts"]["trace"]["trace_id"]
329+
== event["contexts"]["trace"]["trace_id"]
330+
)
331+
308332
events.read_flush()
309333

310334
# if this is nonempty, the worker never really forked

tox.ini

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ envlist =
3838
{py3.6,py3.7}-sanic-19
3939

4040
# TODO: Add py3.9
41-
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-celery-{4.1,4.2,4.3,4.4}
41+
{pypy,py2.7,py3.5,py3.6}-celery-{4.1,4.2}
42+
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-celery-{4.3,4.4}
4243
{pypy,py2.7}-celery-3
4344

4445
{py2.7,py3.7}-beam-{2.12,2.13}
@@ -128,6 +129,7 @@ deps =
128129
beam-2.13: apache-beam>=2.13.0, <2.14.0
129130
beam-master: git+https://github.com/apache/beam#egg=apache-beam&subdirectory=sdks/python
130131

132+
celery: redis
131133
celery-3: Celery>=3.1,<4.0
132134
celery-4.1: Celery>=4.1,<4.2
133135
celery-4.2: Celery>=4.2,<4.3

0 commit comments

Comments
 (0)