Skip to content

Commit cd6ef0c

Browse files
untitakersentry-bot
and
sentry-bot
authored
fix: Fix deadlock in transport due to GC running (getsentry#814)
Co-authored-by: sentry-bot <markus+ghbot@sentry.io>
1 parent bb8960a commit cd6ef0c

File tree

5 files changed

+252
-31
lines changed

5 files changed

+252
-31
lines changed

mypy.ini

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,7 @@ ignore_missing_imports = True
5454
ignore_missing_imports = True
5555
[mypy-pure_eval.*]
5656
ignore_missing_imports = True
57+
58+
[mypy-sentry_sdk._queue]
59+
ignore_missing_imports = True
60+
disallow_untyped_defs = False

sentry_sdk/_compat.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import urlparse # noqa
2020

2121
text_type = unicode # noqa
22-
import Queue as queue # noqa
2322

2423
string_types = (str, text_type)
2524
number_types = (int, long, float) # noqa
@@ -37,7 +36,6 @@ def implements_str(cls):
3736

3837
else:
3938
import urllib.parse as urlparse # noqa
40-
import queue # noqa
4139

4240
text_type = str
4341
string_types = (text_type,) # type: Tuple[type]

sentry_sdk/_queue.py

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
"""
2+
A fork of Python 3.6's stdlib queue with Lock swapped out for RLock to avoid a
3+
deadlock while garbage collecting.
4+
5+
See
6+
https://codewithoutrules.com/2017/08/16/concurrency-python/
7+
https://bugs.python.org/issue14976
8+
https://github.com/sqlalchemy/sqlalchemy/blob/4eb747b61f0c1b1c25bdee3856d7195d10a0c227/lib/sqlalchemy/queue.py#L1
9+
10+
We also vendor the code to evade eventlet's broken monkeypatching, see
11+
https://github.com/getsentry/sentry-python/pull/484
12+
"""
13+
14+
import threading
15+
16+
from collections import deque
17+
from time import time
18+
19+
from sentry_sdk._types import MYPY
20+
21+
if MYPY:
22+
from typing import Any
23+
24+
__all__ = ["Empty", "Full", "Queue"]
25+
26+
27+
class Empty(Exception):
28+
"Exception raised by Queue.get(block=0)/get_nowait()."
29+
pass
30+
31+
32+
class Full(Exception):
33+
"Exception raised by Queue.put(block=0)/put_nowait()."
34+
pass
35+
36+
37+
class Queue(object):
38+
"""Create a queue object with a given maximum size.
39+
40+
If maxsize is <= 0, the queue size is infinite.
41+
"""
42+
43+
def __init__(self, maxsize=0):
44+
self.maxsize = maxsize
45+
self._init(maxsize)
46+
47+
# mutex must be held whenever the queue is mutating. All methods
48+
# that acquire mutex must release it before returning. mutex
49+
# is shared between the three conditions, so acquiring and
50+
# releasing the conditions also acquires and releases mutex.
51+
self.mutex = threading.RLock()
52+
53+
# Notify not_empty whenever an item is added to the queue; a
54+
# thread waiting to get is notified then.
55+
self.not_empty = threading.Condition(self.mutex)
56+
57+
# Notify not_full whenever an item is removed from the queue;
58+
# a thread waiting to put is notified then.
59+
self.not_full = threading.Condition(self.mutex)
60+
61+
# Notify all_tasks_done whenever the number of unfinished tasks
62+
# drops to zero; thread waiting to join() is notified to resume
63+
self.all_tasks_done = threading.Condition(self.mutex)
64+
self.unfinished_tasks = 0
65+
66+
def task_done(self):
67+
"""Indicate that a formerly enqueued task is complete.
68+
69+
Used by Queue consumer threads. For each get() used to fetch a task,
70+
a subsequent call to task_done() tells the queue that the processing
71+
on the task is complete.
72+
73+
If a join() is currently blocking, it will resume when all items
74+
have been processed (meaning that a task_done() call was received
75+
for every item that had been put() into the queue).
76+
77+
Raises a ValueError if called more times than there were items
78+
placed in the queue.
79+
"""
80+
with self.all_tasks_done:
81+
unfinished = self.unfinished_tasks - 1
82+
if unfinished <= 0:
83+
if unfinished < 0:
84+
raise ValueError("task_done() called too many times")
85+
self.all_tasks_done.notify_all()
86+
self.unfinished_tasks = unfinished
87+
88+
def join(self):
89+
"""Blocks until all items in the Queue have been gotten and processed.
90+
91+
The count of unfinished tasks goes up whenever an item is added to the
92+
queue. The count goes down whenever a consumer thread calls task_done()
93+
to indicate the item was retrieved and all work on it is complete.
94+
95+
When the count of unfinished tasks drops to zero, join() unblocks.
96+
"""
97+
with self.all_tasks_done:
98+
while self.unfinished_tasks:
99+
self.all_tasks_done.wait()
100+
101+
def qsize(self):
102+
"""Return the approximate size of the queue (not reliable!)."""
103+
with self.mutex:
104+
return self._qsize()
105+
106+
def empty(self):
107+
"""Return True if the queue is empty, False otherwise (not reliable!).
108+
109+
This method is likely to be removed at some point. Use qsize() == 0
110+
as a direct substitute, but be aware that either approach risks a race
111+
condition where a queue can grow before the result of empty() or
112+
qsize() can be used.
113+
114+
To create code that needs to wait for all queued tasks to be
115+
completed, the preferred technique is to use the join() method.
116+
"""
117+
with self.mutex:
118+
return not self._qsize()
119+
120+
def full(self):
121+
"""Return True if the queue is full, False otherwise (not reliable!).
122+
123+
This method is likely to be removed at some point. Use qsize() >= n
124+
as a direct substitute, but be aware that either approach risks a race
125+
condition where a queue can shrink before the result of full() or
126+
qsize() can be used.
127+
"""
128+
with self.mutex:
129+
return 0 < self.maxsize <= self._qsize()
130+
131+
def put(self, item, block=True, timeout=None):
132+
"""Put an item into the queue.
133+
134+
If optional args 'block' is true and 'timeout' is None (the default),
135+
block if necessary until a free slot is available. If 'timeout' is
136+
a non-negative number, it blocks at most 'timeout' seconds and raises
137+
the Full exception if no free slot was available within that time.
138+
Otherwise ('block' is false), put an item on the queue if a free slot
139+
is immediately available, else raise the Full exception ('timeout'
140+
is ignored in that case).
141+
"""
142+
with self.not_full:
143+
if self.maxsize > 0:
144+
if not block:
145+
if self._qsize() >= self.maxsize:
146+
raise Full()
147+
elif timeout is None:
148+
while self._qsize() >= self.maxsize:
149+
self.not_full.wait()
150+
elif timeout < 0:
151+
raise ValueError("'timeout' must be a non-negative number")
152+
else:
153+
endtime = time() + timeout
154+
while self._qsize() >= self.maxsize:
155+
remaining = endtime - time()
156+
if remaining <= 0.0:
157+
raise Full
158+
self.not_full.wait(remaining)
159+
self._put(item)
160+
self.unfinished_tasks += 1
161+
self.not_empty.notify()
162+
163+
def get(self, block=True, timeout=None):
164+
"""Remove and return an item from the queue.
165+
166+
If optional args 'block' is true and 'timeout' is None (the default),
167+
block if necessary until an item is available. If 'timeout' is
168+
a non-negative number, it blocks at most 'timeout' seconds and raises
169+
the Empty exception if no item was available within that time.
170+
Otherwise ('block' is false), return an item if one is immediately
171+
available, else raise the Empty exception ('timeout' is ignored
172+
in that case).
173+
"""
174+
with self.not_empty:
175+
if not block:
176+
if not self._qsize():
177+
raise Empty()
178+
elif timeout is None:
179+
while not self._qsize():
180+
self.not_empty.wait()
181+
elif timeout < 0:
182+
raise ValueError("'timeout' must be a non-negative number")
183+
else:
184+
endtime = time() + timeout
185+
while not self._qsize():
186+
remaining = endtime - time()
187+
if remaining <= 0.0:
188+
raise Empty()
189+
self.not_empty.wait(remaining)
190+
item = self._get()
191+
self.not_full.notify()
192+
return item
193+
194+
def put_nowait(self, item):
195+
"""Put an item into the queue without blocking.
196+
197+
Only enqueue the item if a free slot is immediately available.
198+
Otherwise raise the Full exception.
199+
"""
200+
return self.put(item, block=False)
201+
202+
def get_nowait(self):
203+
"""Remove and return an item from the queue without blocking.
204+
205+
Only get an item if one is immediately available. Otherwise
206+
raise the Empty exception.
207+
"""
208+
return self.get(block=False)
209+
210+
# Override these methods to implement other queue organizations
211+
# (e.g. stack or priority queue).
212+
# These will only be called with appropriate locks held
213+
214+
# Initialize the queue representation
215+
def _init(self, maxsize):
216+
self.queue = deque() # type: Any
217+
218+
def _qsize(self):
219+
return len(self.queue)
220+
221+
# Put a new item in the queue
222+
def _put(self, item):
223+
self.queue.append(item)
224+
225+
# Get an item from the queue
226+
def _get(self):
227+
return self.queue.popleft()

sentry_sdk/worker.py

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import os
2+
import threading
23

3-
from threading import Thread, Lock
44
from time import sleep, time
5-
from sentry_sdk._compat import queue, check_thread_support
5+
from sentry_sdk._compat import check_thread_support
6+
from sentry_sdk._queue import Queue, Full
67
from sentry_sdk.utils import logger
78

89
from sentry_sdk._types import MYPY
910

1011
if MYPY:
11-
from queue import Queue
1212
from typing import Any
1313
from typing import Optional
1414
from typing import Callable
@@ -18,12 +18,12 @@
1818

1919

2020
class BackgroundWorker(object):
21-
def __init__(self):
22-
# type: () -> None
21+
def __init__(self, queue_size=30):
22+
# type: (int) -> None
2323
check_thread_support()
24-
self._queue = queue.Queue(30) # type: Queue[Any]
25-
self._lock = Lock()
26-
self._thread = None # type: Optional[Thread]
24+
self._queue = Queue(queue_size) # type: Queue
25+
self._lock = threading.Lock()
26+
self._thread = None # type: Optional[threading.Thread]
2727
self._thread_for_pid = None # type: Optional[int]
2828

2929
@property
@@ -45,38 +45,24 @@ def _timed_queue_join(self, timeout):
4545
deadline = time() + timeout
4646
queue = self._queue
4747

48-
real_all_tasks_done = getattr(
49-
queue, "all_tasks_done", None
50-
) # type: Optional[Any]
51-
if real_all_tasks_done is not None:
52-
real_all_tasks_done.acquire()
53-
all_tasks_done = real_all_tasks_done # type: Optional[Any]
54-
elif queue.__module__.startswith("eventlet."):
55-
all_tasks_done = getattr(queue, "_cond", None)
56-
else:
57-
all_tasks_done = None
48+
queue.all_tasks_done.acquire()
5849

5950
try:
6051
while queue.unfinished_tasks:
6152
delay = deadline - time()
6253
if delay <= 0:
6354
return False
64-
if all_tasks_done is not None:
65-
all_tasks_done.wait(timeout=delay)
66-
else:
67-
# worst case, we just poll the number of remaining tasks
68-
sleep(0.1)
55+
queue.all_tasks_done.wait(timeout=delay)
6956

7057
return True
7158
finally:
72-
if real_all_tasks_done is not None:
73-
real_all_tasks_done.release()
59+
queue.all_tasks_done.release()
7460

7561
def start(self):
7662
# type: () -> None
7763
with self._lock:
7864
if not self.is_alive:
79-
self._thread = Thread(
65+
self._thread = threading.Thread(
8066
target=self._target, name="raven-sentry.BackgroundWorker"
8167
)
8268
self._thread.setDaemon(True)
@@ -94,7 +80,7 @@ def kill(self):
9480
if self._thread:
9581
try:
9682
self._queue.put_nowait(_TERMINATOR)
97-
except queue.Full:
83+
except Full:
9884
logger.debug("background worker queue full, kill failed")
9985

10086
self._thread = None
@@ -123,7 +109,7 @@ def submit(self, callback):
123109
self._ensure_thread()
124110
try:
125111
self._queue.put_nowait(callback)
126-
except queue.Full:
112+
except Full:
127113
logger.debug("background worker queue full, dropping event")
128114

129115
def _target(self):

tox.ini

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,13 @@ basepython =
246246
py3.7: python3.7
247247
py3.8: python3.8
248248
py3.9: python3.9
249-
linters: python3
249+
250+
# Python version is pinned here because flake8 actually behaves differently
251+
# depending on which version is used. You can patch this out to point to
252+
# some random Python 3 binary, but then you get guaranteed mismatches with
253+
# CI. Other tools such as mypy and black have options that pin the Python
254+
# version.
255+
linters: python3.8
250256
pypy: pypy
251257

252258
commands =

0 commit comments

Comments
 (0)