Skip to content

Commit 41d90c9

Browse files
committed
Use celery.__main__ to really patch eventlet as *early* as possible.
1 parent 5ac16ce commit 41d90c9

File tree

4 files changed

+255
-23
lines changed

4 files changed

+255
-23
lines changed

celery/__main__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from __future__ import absolute_import
2+
3+
import sys
4+
5+
6+
def main():
7+
from celery.platforms import maybe_patch_concurrency
8+
maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
9+
from celery.bin.celery import main
10+
main()
11+
12+
if __name__ == '__main__':
13+
main()

celery/bin/celery.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,9 @@
88
"""
99
from __future__ import absolute_import, print_function
1010

11-
import sys
12-
from celery.platforms import maybe_patch_concurrency
13-
maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
14-
1511
import anyjson
1612
import warnings
13+
import sys
1714

1815
from future_builtins import map
1916

@@ -932,7 +929,7 @@ def determine_exit_status(ret):
932929
return EX_OK if ret else EX_FAILURE
933930

934931

935-
def main():
932+
def main(argv=None):
936933
# Fix for setuptools generated scripts, so that it will
937934
# work with multiprocessing fork emulation.
938935
# (see multiprocessing.forking.get_preparation_data())
@@ -943,7 +940,7 @@ def main():
943940
cmd.maybe_patch_concurrency()
944941
from billiard import freeze_support
945942
freeze_support()
946-
cmd.execute_from_commandline()
943+
cmd.execute_from_commandline(argv)
947944
except KeyboardInterrupt:
948945
pass
949946

celery/utils/threads.py

Lines changed: 231 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
from kombu.syn import detect_environment
1717

18+
from celery.local import Proxy
19+
1820
USE_PURE_LOCALS = os.environ.get('USE_PURE_LOCALS')
1921

2022

@@ -70,20 +72,234 @@ def stop(self):
7072
if self.is_alive():
7173
self.join(1e100)
7274

75+
try:
76+
from greenlet import getcurrent as get_ident
77+
except ImportError: # pragma: no cover
78+
try:
79+
from thread import get_ident # noqa
80+
except ImportError: # pragma: no cover
81+
try:
82+
from dummy_thread import get_ident # noqa
83+
except ImportError: # pragma: no cover
84+
from _thread import get_ident # noqa
85+
86+
87+
def release_local(local):
88+
"""Releases the contents of the local for the current context.
89+
This makes it possible to use locals without a manager.
90+
91+
Example::
92+
93+
>>> loc = Local()
94+
>>> loc.foo = 42
95+
>>> release_local(loc)
96+
>>> hasattr(loc, 'foo')
97+
False
98+
99+
With this function one can release :class:`Local` objects as well
100+
as :class:`StackLocal` objects. However it is not possible to
101+
release data held by proxies that way, one always has to retain
102+
a reference to the underlying local object in order to be able
103+
to release it.
104+
105+
.. versionadded:: 0.6.1
106+
"""
107+
local.__release_local__()
108+
109+
110+
class Local(object):
111+
__slots__ = ('__storage__', '__ident_func__')
112+
113+
def __init__(self):
114+
object.__setattr__(self, '__storage__', {})
115+
object.__setattr__(self, '__ident_func__', get_ident)
116+
117+
def __iter__(self):
118+
return iter(self.__storage__.items())
119+
120+
def __call__(self, proxy):
121+
"""Create a proxy for a name."""
122+
return Proxy(self, proxy)
123+
124+
def __release_local__(self):
125+
self.__storage__.pop(self.__ident_func__(), None)
126+
127+
def __getattr__(self, name):
128+
try:
129+
return self.__storage__[self.__ident_func__()][name]
130+
except KeyError:
131+
raise AttributeError(name)
132+
133+
def __setattr__(self, name, value):
134+
ident = self.__ident_func__()
135+
storage = self.__storage__
136+
try:
137+
storage[ident][name] = value
138+
except KeyError:
139+
storage[ident] = {name: value}
140+
141+
def __delattr__(self, name):
142+
try:
143+
del self.__storage__[self.__ident_func__()][name]
144+
except KeyError:
145+
raise AttributeError(name)
146+
147+
148+
class _LocalStack(object):
149+
"""This class works similar to a :class:`Local` but keeps a stack
150+
of objects instead. This is best explained with an example::
151+
152+
>>> ls = LocalStack()
153+
>>> ls.push(42)
154+
>>> ls.top
155+
42
156+
>>> ls.push(23)
157+
>>> ls.top
158+
23
159+
>>> ls.pop()
160+
23
161+
>>> ls.top
162+
42
163+
164+
They can be force released by using a :class:`LocalManager` or with
165+
the :func:`release_local` function but the correct way is to pop the
166+
item from the stack after using. When the stack is empty it will
167+
no longer be bound to the current context (and as such released).
168+
169+
By calling the stack without arguments it returns a proxy that
170+
resolves to the topmost item on the stack.
171+
172+
"""
173+
174+
def __init__(self):
175+
self._local = Local()
176+
177+
def __release_local__(self):
178+
self._local.__release_local__()
179+
180+
def _get__ident_func__(self):
181+
return self._local.__ident_func__
182+
183+
def _set__ident_func__(self, value):
184+
object.__setattr__(self._local, '__ident_func__', value)
185+
__ident_func__ = property(_get__ident_func__, _set__ident_func__)
186+
del _get__ident_func__, _set__ident_func__
187+
188+
def __call__(self):
189+
def _lookup():
190+
rv = self.top
191+
if rv is None:
192+
raise RuntimeError('object unbound')
193+
return rv
194+
return Proxy(_lookup)
195+
196+
def push(self, obj):
197+
"""Pushes a new item to the stack"""
198+
rv = getattr(self._local, 'stack', None)
199+
if rv is None:
200+
self._local.stack = rv = []
201+
rv.append(obj)
202+
return rv
203+
204+
def pop(self):
205+
"""Removes the topmost item from the stack, will return the
206+
old value or `None` if the stack was already empty.
207+
"""
208+
stack = getattr(self._local, 'stack', None)
209+
if stack is None:
210+
return None
211+
elif len(stack) == 1:
212+
release_local(self._local)
213+
return stack[-1]
214+
else:
215+
return stack.pop()
216+
217+
@property
218+
def stack(self):
219+
"""get_current_worker_task uses this to find
220+
the original task that was executed by the worker."""
221+
stack = getattr(self._local, 'stack', None)
222+
if stack is not None:
223+
return stack
224+
return []
225+
226+
@property
227+
def top(self):
228+
"""The topmost item on the stack. If the stack is empty,
229+
`None` is returned.
230+
"""
231+
try:
232+
return self._local.stack[-1]
233+
except (AttributeError, IndexError):
234+
return None
235+
236+
237+
class LocalManager(object):
238+
"""Local objects cannot manage themselves. For that you need a local
239+
manager. You can pass a local manager multiple locals or add them
240+
later by appending them to `manager.locals`. Everytime the manager
241+
cleans up it, will clean up all the data left in the locals for this
242+
context.
243+
244+
The `ident_func` parameter can be added to override the default ident
245+
function for the wrapped locals.
246+
247+
"""
248+
249+
def __init__(self, locals=None, ident_func=None):
250+
if locals is None:
251+
self.locals = []
252+
elif isinstance(locals, Local):
253+
self.locals = [locals]
254+
else:
255+
self.locals = list(locals)
256+
if ident_func is not None:
257+
self.ident_func = ident_func
258+
for local in self.locals:
259+
object.__setattr__(local, '__ident_func__', ident_func)
260+
else:
261+
self.ident_func = get_ident
262+
263+
def get_ident(self):
264+
"""Return the context identifier the local objects use internally
265+
for this context. You cannot override this method to change the
266+
behavior but use it to link other context local objects (such as
267+
SQLAlchemy's scoped sessions) to the Werkzeug locals."""
268+
return self.ident_func()
269+
270+
def cleanup(self):
271+
"""Manually clean up the data in the locals for this context.
272+
273+
Call this at the end of the request or use `make_middleware()`.
274+
275+
"""
276+
for local in self.locals:
277+
release_local(local)
278+
279+
def __repr__(self):
280+
return '<{0} storages: {1}>'.format(
281+
self.__class__.__name__, len(self.locals))
282+
283+
284+
class _FastLocalStack(threading.local):
285+
286+
def __init__(self):
287+
self.stack = []
288+
self.push = self.stack.append
289+
self.pop = self.stack.pop
290+
291+
@property
292+
def top(self):
293+
try:
294+
return self.stack[-1]
295+
except (AttributeError, IndexError):
296+
return None
297+
73298
if detect_environment() == 'default' and not USE_PURE_LOCALS:
74-
class LocalStack(threading.local):
75-
76-
def __init__(self):
77-
self.stack = []
78-
self.push = self.stack.append
79-
self.pop = self.stack.pop
80-
81-
@property
82-
def top(self):
83-
try:
84-
return self.stack[-1]
85-
except (AttributeError, IndexError):
86-
return None
299+
LocalStack = _FastLocalStack
87300
else:
88-
# See #706
89-
from celery.local import LocalStack # noqa
301+
# - See #706
302+
# since each thread has its own greenlet we can just use those as
303+
# identifiers for the context. If greenlets are not available we
304+
# fall back to the current thread ident.
305+
LocalStack = _LocalStack # noqa

setup.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import sys
1616
import codecs
1717

18+
CELERY_COMPAT_PROGRAMS = os.environ.get('CELERY_COMPAT_PROGRAMS')
19+
1820
if sys.version_info < (2, 6):
1921
raise Exception('Celery 3.1 requires Python 2.6 or higher.')
2022

@@ -175,14 +177,18 @@ def reqs(f):
175177
# -*- Entry Points -*- #
176178

177179
console_scripts = entrypoints['console_scripts'] = [
178-
'celery = celery.bin.celery:main',
180+
'celery = celery.__main__:main',
181+
]
182+
183+
if CELERY_COMPAT_PROGRAMS:
184+
console_scripts.extend([
179185
'celeryd = celery.bin.celeryd:main',
180186
'celerybeat = celery.bin.celerybeat:main',
181187
'camqadm = celery.bin.camqadm:main',
182188
'celeryev = celery.bin.celeryev:main',
183189
'celeryctl = celery.bin.celeryctl:main',
184190
'celeryd-multi = celery.bin.celeryd_multi:main',
185-
]
191+
])
186192

187193
# bundles: Only relevant for Celery developers.
188194
entrypoints['bundle.bundles'] = ['celery = celery.contrib.bundles:bundles']

0 commit comments

Comments
 (0)