|
15 | 15 |
|
16 | 16 | from kombu.syn import detect_environment
|
17 | 17 |
|
| 18 | +from celery.local import Proxy |
| 19 | + |
18 | 20 | USE_PURE_LOCALS = os.environ.get('USE_PURE_LOCALS')
|
19 | 21 |
|
20 | 22 |
|
@@ -70,20 +72,234 @@ def stop(self):
|
70 | 72 | if self.is_alive():
|
71 | 73 | self.join(1e100)
|
72 | 74 |
|
| 75 | +try: |
| 76 | + from greenlet import getcurrent as get_ident |
| 77 | +except ImportError: # pragma: no cover |
| 78 | + try: |
| 79 | + from thread import get_ident # noqa |
| 80 | + except ImportError: # pragma: no cover |
| 81 | + try: |
| 82 | + from dummy_thread import get_ident # noqa |
| 83 | + except ImportError: # pragma: no cover |
| 84 | + from _thread import get_ident # noqa |
| 85 | + |
| 86 | + |
| 87 | +def release_local(local): |
| 88 | + """Releases the contents of the local for the current context. |
| 89 | + This makes it possible to use locals without a manager. |
| 90 | +
|
| 91 | + Example:: |
| 92 | +
|
| 93 | + >>> loc = Local() |
| 94 | + >>> loc.foo = 42 |
| 95 | + >>> release_local(loc) |
| 96 | + >>> hasattr(loc, 'foo') |
| 97 | + False |
| 98 | +
|
| 99 | + With this function one can release :class:`Local` objects as well |
| 100 | + as :class:`StackLocal` objects. However it is not possible to |
| 101 | + release data held by proxies that way, one always has to retain |
| 102 | + a reference to the underlying local object in order to be able |
| 103 | + to release it. |
| 104 | +
|
| 105 | + .. versionadded:: 0.6.1 |
| 106 | + """ |
| 107 | + local.__release_local__() |
| 108 | + |
| 109 | + |
| 110 | +class Local(object): |
| 111 | + __slots__ = ('__storage__', '__ident_func__') |
| 112 | + |
| 113 | + def __init__(self): |
| 114 | + object.__setattr__(self, '__storage__', {}) |
| 115 | + object.__setattr__(self, '__ident_func__', get_ident) |
| 116 | + |
| 117 | + def __iter__(self): |
| 118 | + return iter(self.__storage__.items()) |
| 119 | + |
| 120 | + def __call__(self, proxy): |
| 121 | + """Create a proxy for a name.""" |
| 122 | + return Proxy(self, proxy) |
| 123 | + |
| 124 | + def __release_local__(self): |
| 125 | + self.__storage__.pop(self.__ident_func__(), None) |
| 126 | + |
| 127 | + def __getattr__(self, name): |
| 128 | + try: |
| 129 | + return self.__storage__[self.__ident_func__()][name] |
| 130 | + except KeyError: |
| 131 | + raise AttributeError(name) |
| 132 | + |
| 133 | + def __setattr__(self, name, value): |
| 134 | + ident = self.__ident_func__() |
| 135 | + storage = self.__storage__ |
| 136 | + try: |
| 137 | + storage[ident][name] = value |
| 138 | + except KeyError: |
| 139 | + storage[ident] = {name: value} |
| 140 | + |
| 141 | + def __delattr__(self, name): |
| 142 | + try: |
| 143 | + del self.__storage__[self.__ident_func__()][name] |
| 144 | + except KeyError: |
| 145 | + raise AttributeError(name) |
| 146 | + |
| 147 | + |
| 148 | +class _LocalStack(object): |
| 149 | + """This class works similar to a :class:`Local` but keeps a stack |
| 150 | + of objects instead. This is best explained with an example:: |
| 151 | +
|
| 152 | + >>> ls = LocalStack() |
| 153 | + >>> ls.push(42) |
| 154 | + >>> ls.top |
| 155 | + 42 |
| 156 | + >>> ls.push(23) |
| 157 | + >>> ls.top |
| 158 | + 23 |
| 159 | + >>> ls.pop() |
| 160 | + 23 |
| 161 | + >>> ls.top |
| 162 | + 42 |
| 163 | +
|
| 164 | + They can be force released by using a :class:`LocalManager` or with |
| 165 | + the :func:`release_local` function but the correct way is to pop the |
| 166 | + item from the stack after using. When the stack is empty it will |
| 167 | + no longer be bound to the current context (and as such released). |
| 168 | +
|
| 169 | + By calling the stack without arguments it returns a proxy that |
| 170 | + resolves to the topmost item on the stack. |
| 171 | +
|
| 172 | + """ |
| 173 | + |
| 174 | + def __init__(self): |
| 175 | + self._local = Local() |
| 176 | + |
| 177 | + def __release_local__(self): |
| 178 | + self._local.__release_local__() |
| 179 | + |
| 180 | + def _get__ident_func__(self): |
| 181 | + return self._local.__ident_func__ |
| 182 | + |
| 183 | + def _set__ident_func__(self, value): |
| 184 | + object.__setattr__(self._local, '__ident_func__', value) |
| 185 | + __ident_func__ = property(_get__ident_func__, _set__ident_func__) |
| 186 | + del _get__ident_func__, _set__ident_func__ |
| 187 | + |
| 188 | + def __call__(self): |
| 189 | + def _lookup(): |
| 190 | + rv = self.top |
| 191 | + if rv is None: |
| 192 | + raise RuntimeError('object unbound') |
| 193 | + return rv |
| 194 | + return Proxy(_lookup) |
| 195 | + |
| 196 | + def push(self, obj): |
| 197 | + """Pushes a new item to the stack""" |
| 198 | + rv = getattr(self._local, 'stack', None) |
| 199 | + if rv is None: |
| 200 | + self._local.stack = rv = [] |
| 201 | + rv.append(obj) |
| 202 | + return rv |
| 203 | + |
| 204 | + def pop(self): |
| 205 | + """Removes the topmost item from the stack, will return the |
| 206 | + old value or `None` if the stack was already empty. |
| 207 | + """ |
| 208 | + stack = getattr(self._local, 'stack', None) |
| 209 | + if stack is None: |
| 210 | + return None |
| 211 | + elif len(stack) == 1: |
| 212 | + release_local(self._local) |
| 213 | + return stack[-1] |
| 214 | + else: |
| 215 | + return stack.pop() |
| 216 | + |
| 217 | + @property |
| 218 | + def stack(self): |
| 219 | + """get_current_worker_task uses this to find |
| 220 | + the original task that was executed by the worker.""" |
| 221 | + stack = getattr(self._local, 'stack', None) |
| 222 | + if stack is not None: |
| 223 | + return stack |
| 224 | + return [] |
| 225 | + |
| 226 | + @property |
| 227 | + def top(self): |
| 228 | + """The topmost item on the stack. If the stack is empty, |
| 229 | + `None` is returned. |
| 230 | + """ |
| 231 | + try: |
| 232 | + return self._local.stack[-1] |
| 233 | + except (AttributeError, IndexError): |
| 234 | + return None |
| 235 | + |
| 236 | + |
| 237 | +class LocalManager(object): |
| 238 | + """Local objects cannot manage themselves. For that you need a local |
| 239 | + manager. You can pass a local manager multiple locals or add them |
| 240 | + later by appending them to `manager.locals`. Everytime the manager |
| 241 | + cleans up it, will clean up all the data left in the locals for this |
| 242 | + context. |
| 243 | +
|
| 244 | + The `ident_func` parameter can be added to override the default ident |
| 245 | + function for the wrapped locals. |
| 246 | +
|
| 247 | + """ |
| 248 | + |
| 249 | + def __init__(self, locals=None, ident_func=None): |
| 250 | + if locals is None: |
| 251 | + self.locals = [] |
| 252 | + elif isinstance(locals, Local): |
| 253 | + self.locals = [locals] |
| 254 | + else: |
| 255 | + self.locals = list(locals) |
| 256 | + if ident_func is not None: |
| 257 | + self.ident_func = ident_func |
| 258 | + for local in self.locals: |
| 259 | + object.__setattr__(local, '__ident_func__', ident_func) |
| 260 | + else: |
| 261 | + self.ident_func = get_ident |
| 262 | + |
| 263 | + def get_ident(self): |
| 264 | + """Return the context identifier the local objects use internally |
| 265 | + for this context. You cannot override this method to change the |
| 266 | + behavior but use it to link other context local objects (such as |
| 267 | + SQLAlchemy's scoped sessions) to the Werkzeug locals.""" |
| 268 | + return self.ident_func() |
| 269 | + |
| 270 | + def cleanup(self): |
| 271 | + """Manually clean up the data in the locals for this context. |
| 272 | +
|
| 273 | + Call this at the end of the request or use `make_middleware()`. |
| 274 | +
|
| 275 | + """ |
| 276 | + for local in self.locals: |
| 277 | + release_local(local) |
| 278 | + |
| 279 | + def __repr__(self): |
| 280 | + return '<{0} storages: {1}>'.format( |
| 281 | + self.__class__.__name__, len(self.locals)) |
| 282 | + |
| 283 | + |
| 284 | +class _FastLocalStack(threading.local): |
| 285 | + |
| 286 | + def __init__(self): |
| 287 | + self.stack = [] |
| 288 | + self.push = self.stack.append |
| 289 | + self.pop = self.stack.pop |
| 290 | + |
| 291 | + @property |
| 292 | + def top(self): |
| 293 | + try: |
| 294 | + return self.stack[-1] |
| 295 | + except (AttributeError, IndexError): |
| 296 | + return None |
| 297 | + |
73 | 298 | if detect_environment() == 'default' and not USE_PURE_LOCALS:
|
74 |
| - class LocalStack(threading.local): |
75 |
| - |
76 |
| - def __init__(self): |
77 |
| - self.stack = [] |
78 |
| - self.push = self.stack.append |
79 |
| - self.pop = self.stack.pop |
80 |
| - |
81 |
| - @property |
82 |
| - def top(self): |
83 |
| - try: |
84 |
| - return self.stack[-1] |
85 |
| - except (AttributeError, IndexError): |
86 |
| - return None |
| 299 | + LocalStack = _FastLocalStack |
87 | 300 | else:
|
88 |
| - # See #706 |
89 |
| - from celery.local import LocalStack # noqa |
| 301 | + # - See #706 |
| 302 | + # since each thread has its own greenlet we can just use those as |
| 303 | + # identifiers for the context. If greenlets are not available we |
| 304 | + # fall back to the current thread ident. |
| 305 | + LocalStack = _LocalStack # noqa |
0 commit comments