Skip to content

Commit 1b28f3e

Browse files
committed
Fixes dump_request example. thanks to ojii
1 parent ddc65f9 commit 1b28f3e

File tree

6 files changed

+61
-30
lines changed

6 files changed

+61
-30
lines changed

celery/bin/base.py

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969

7070
import celery
7171
from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
72-
from celery.platforms import EX_FAILURE, EX_USAGE
72+
from celery.platforms import EX_FAILURE, EX_USAGE, maybe_patch_concurrency
7373
from celery.utils import text
7474
from celery.utils.imports import symbol_by_name, import_from_cwd
7575

@@ -165,9 +165,7 @@ def execute_from_commandline(self, argv=None):
165165
if argv is None:
166166
argv = list(sys.argv)
167167
# Should we load any special concurrency environment?
168-
pool_option = self.with_pool_option(argv)
169-
if pool_option:
170-
self.maybe_patch_concurrency(argv, *pool_option)
168+
self.maybe_patch_concurrency(argv)
171169
self.on_concurrency_setup()
172170

173171
# Dump version and exit if '--version' arg set.
@@ -176,26 +174,12 @@ def execute_from_commandline(self, argv=None):
176174
prog_name = os.path.basename(argv[0])
177175
return self.handle_argv(prog_name, argv[1:])
178176

179-
def _find_option_with_arg(self, argv, short_opts=None, long_opts=None):
180-
for i, arg in enumerate(argv):
181-
if arg.startswith('-'):
182-
if long_opts and arg.startswith('--'):
183-
name, _, val = arg.partition('=')
184-
if name in long_opts:
185-
return val
186-
if short_opts and arg in short_opts:
187-
return argv[i + 1]
188-
raise KeyError('|'.join(short_opts or [] + long_opts or []))
189-
190-
def maybe_patch_concurrency(self, argv, short_opts=None, long_opts=None):
191-
try:
192-
pool = self._find_option_with_arg(argv, short_opts, long_opts)
193-
except KeyError:
194-
pass
195-
else:
196-
from celery import concurrency
197-
# set up eventlet/gevent environments ASAP.
198-
concurrency.get_implementation(pool)
177+
def maybe_patch_concurrency(self, argv=None):
178+
argv = argv or sys.argv
179+
pool_option = self.with_pool_option(argv)
180+
if pool_option:
181+
maybe_patch_concurrency(argv, *pool_option)
182+
short_opts, long_opts = pool_option
199183

200184
def on_concurrency_setup(self):
201185
pass

celery/bin/celery.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
from __future__ import absolute_import
1010
from __future__ import with_statement
1111

12-
import anyjson
1312
import sys
13+
from celery.platforms import maybe_patch_concurrency
14+
maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
15+
16+
import anyjson
1417
import warnings
1518

16-
from billiard import freeze_support
1719
from importlib import import_module
1820
from pprint import pformat
1921

@@ -933,8 +935,11 @@ def main():
933935
try:
934936
if __name__ != '__main__': # pragma: no cover
935937
sys.modules['__main__'] = sys.modules[__name__]
938+
cmd = CeleryCommand()
939+
cmd.maybe_patch_concurrency()
940+
from billiard import freeze_support
936941
freeze_support()
937-
CeleryCommand().execute_from_commandline()
942+
cmd.execute_from_commandline()
938943
except KeyboardInterrupt:
939944
pass
940945

celery/bin/celeryd.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@
116116
from __future__ import absolute_import
117117

118118
import sys
119+
import sys
120+
from celery.platforms import maybe_patch_concurrency
121+
maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
119122

120123
from billiard import freeze_support
121124

celery/platforms.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
from .local import try_import
2424

25-
from billiard import current_process
2625
from kombu.utils.limits import TokenBucket
2726

2827
_setproctitle = try_import('setproctitle')
@@ -67,6 +66,29 @@ def pyimplementation():
6766
return 'CPython'
6867

6968

69+
def _find_option_with_arg(argv, short_opts=None, long_opts=None):
70+
for i, arg in enumerate(argv):
71+
if arg.startswith('-'):
72+
if long_opts and arg.startswith('--'):
73+
name, _, val = arg.partition('=')
74+
if name in long_opts:
75+
return val
76+
if short_opts and arg in short_opts:
77+
return argv[i + 1]
78+
raise KeyError('|'.join(short_opts or [] + long_opts or []))
79+
80+
81+
def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
82+
try:
83+
pool = _find_option_with_arg(argv, short_opts, long_opts)
84+
except KeyError:
85+
pass
86+
else:
87+
# set up eventlet/gevent environments ASAP.
88+
from celery import concurrency
89+
concurrency.get_implementation(pool)
90+
91+
7092
class LockFailed(Exception):
7193
"""Raised if a pidlock can't be acquired."""
7294
pass
@@ -594,6 +616,7 @@ def set_mp_process_title(progname, info=None, hostname=None, # noqa
594616
595617
"""
596618
if not rate_limit or _setps_bucket.can_consume(1):
619+
from billiard import current_process
597620
if hostname:
598621
progname = '%s@%s' % (progname, hostname.split('.')[0])
599622
return set_process_title(

celery/utils/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import operator
1313
import os
1414
import sys
15-
import threading
1615
import traceback
1716
import warnings
1817

@@ -147,6 +146,8 @@ def cry(): # pragma: no cover
147146
From https://gist.github.com/737056
148147
149148
"""
149+
import threading
150+
150151
tmap = {}
151152
main_thread = None
152153
# get a map of threads by their ID so we can print their names

docs/userguide/tasks.rst

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,24 @@ An example task accessing information in the context is:
228228
.. code-block:: python
229229
230230
@celery.task()
231-
def dump_context(x, y):
231+
def add(x, y):
232232
print('Executing task id %r, args: %r kwargs: %r' % (
233233
add.request.id, add.request.args, add.request.kwargs))
234+
return x + y
235+
236+
237+
:data:`~celery.current_task` can also be used:
238+
239+
.. code-block:: python
240+
241+
from celery import current_task
242+
243+
@celery.task()
244+
def add(x, y):
245+
request = current_task.request
246+
print('Executing task id %r, args: %r kwargs: %r' % (
247+
request.id, request.args, request.kwargs))
248+
return x + y
234249
235250
.. _task-logging:
236251

0 commit comments

Comments
 (0)