Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent celery from hanging due to spawned greenlet errors in greenlet drainers #9371

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
30e4d84
add .tool-versions to .gitignore
linusphan Oct 21, 2024
43d8395
propagate event drainer errors to prevent infinite loop and require m…
linusphan Oct 21, 2024
eb46270
Merge branch 'main' of github.com:linusphan/celery into fix-result-ba…
linusphan Oct 21, 2024
0cfb89d
remove typing
linusphan Oct 21, 2024
badb649
add tests
linusphan Oct 22, 2024
ac389cd
add tests and refactor implementation
linusphan Oct 22, 2024
297d6e3
remove test code and add pydoc for clarity
linusphan Oct 22, 2024
51bb91c
retrigger ci
linusphan Oct 23, 2024
29e0ea8
Merge branch 'main' into fix-result-backend-connection-error-handling
linusphan Oct 23, 2024
005f0a3
raise error in greenlet to ensure it exits, and add more test coverage
linusphan Oct 24, 2024
e65abec
calls `teardown_thread` when using `schedule_thread` in tests
linusphan Oct 24, 2024
d12c0ec
use wait() instead of while loop for clarity in teardown_thread for t…
linusphan Oct 24, 2024
87419c3
fix lint
linusphan Oct 24, 2024
7099666
Merge branch 'main' into fix-result-backend-connection-error-handling
Nusnus Nov 18, 2024
1967ab7
Merge branch 'main' into fix-result-backend-connection-error-handling
auvipy Dec 14, 2024
02e1f09
Merge branch 'main' into fix-result-backend-connection-error-handling
Nusnus Dec 17, 2024
7d0e768
Merge branch 'main' into fix-result-backend-connection-error-handling
auvipy Dec 18, 2024
75adf99
Merge branch 'main' into fix-result-backend-connection-error-handling
Nusnus Dec 19, 2024
6b83719
Merge branch 'main' into fix-result-backend-connection-error-handling
auvipy Dec 22, 2024
a9a1b4e
Merge branch 'main' into fix-result-backend-connection-error-handling
auvipy Dec 24, 2024
6f3a280
Merge branch 'main' into fix-result-backend-connection-error-handling
Nusnus Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
propagate event drainer errors to prevent infinite loop and require m…
…anual restart

Co-authored-by: Linus Phan <13613724+linusphan@users.noreply.github.com>
Co-authored-by: Jack <57678801+mothershipper@users.noreply.github.com>
  • Loading branch information
3 people committed Oct 21, 2024
commit 43d8395be712e5f4f929259a4a59f108117f1624
43 changes: 34 additions & 9 deletions celery/backends/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import threading
import time
from collections import deque
from collections.abc import Callable
from queue import Empty
from time import sleep
from typing import Optional
from weakref import WeakKeyDictionary

from kombu.utils.compat import detect_environment
Expand All @@ -13,6 +15,10 @@
from celery.exceptions import TimeoutError
from celery.utils.threads import THREAD_TIMEOUT_MAX

E_CELERY_RESTART_REQUIRED = """
Celery must be restarted due to an error encountered while draining events.
"""

__all__ = (
'AsyncBackendMixin', 'BaseResultConsumer', 'Drainer',
'register_drainer',
Expand Down Expand Up @@ -64,7 +70,8 @@ def wait_for(self, p, wait, timeout=None):


class greenletDrainer(Drainer):
spawn = None
spawn: Callable[[Callable[[], None]], object]
_exc: Optional[Exception] = None
_g = None
_drain_complete_event = None # event, sended (and recreated) after every drain_events iteration

Expand All @@ -78,23 +85,37 @@ def _send_drain_complete_event(self):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._mutex = threading.Lock()
self._started = threading.Event()
self._stopped = threading.Event()
self._shutdown = threading.Event()
self._create_drain_complete_event()

def run(self):
self._started.set()
while not self._stopped.is_set():
try:
self.result_consumer.drain_events(timeout=1)
self._send_drain_complete_event()
self._create_drain_complete_event()
except socket.timeout:
pass
self._shutdown.set()

try:
while not self._stopped.is_set():
try:
self.result_consumer.drain_events(timeout=1)
self._send_drain_complete_event()
self._create_drain_complete_event()
except socket.timeout:
pass
except Exception as e:
with self._mutex:
self._exc = e
finally:
self._shutdown.set()

def start(self):
if self._shutdown.is_set():
with self._mutex:
if self._exc is not None:
raise self._exc
else:
raise Exception(E_CELERY_RESTART_REQUIRED)

if not self._started.is_set():
self._g = self.spawn(self.run)
self._started.wait()
Expand All @@ -109,6 +130,10 @@ def wait_for(self, p, wait, timeout=None):
if not p.ready:
self._drain_complete_event.wait(timeout=timeout)

with self._mutex:
if self._exc is not None:
raise self._exc


@register_drainer('eventlet')
class eventletDrainer(greenletDrainer):
Expand Down
4 changes: 2 additions & 2 deletions celery/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def reconnect_on_error(self):
except self._connection_errors:
try:
self._ensure(self._reconnect_pubsub, ())
except self._connection_errors:
except self._connection_errors as e:
logger.critical(E_RETRY_LIMIT_EXCEEDED)
raise
raise RuntimeError(E_RETRY_LIMIT_EXCEEDED) from e

def _maybe_cancel_ready_task(self, meta):
if meta['status'] in states.READY_STATES:
Expand Down