Skip to content

bpo-43468: Per instance locking for functools.cached_property #27609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
83 changes: 66 additions & 17 deletions Lib/functools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from collections import namedtuple
# import types, weakref # Deferred to single_dispatch()
from reprlib import recursive_repr
from _thread import RLock
from threading import RLock, Condition, get_ident
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces an import cycle which causes tests to fail.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition on 980-983.
Scenario:

  • 3 threads, 1 registered in updaters
  • 2nd and 3rd will wait
  • both waiting threads will see registration of registered thread disappear
    Mutual exclusion breaks.

from types import GenericAlias


Expand Down Expand Up @@ -930,13 +930,15 @@ def __isabstractmethod__(self):

_NOT_FOUND = object()


class cached_property:

def __init__(self, func):
self.func = func
self.attrname = None
self.__doc__ = func.__doc__
self.lock = RLock()
self.updater_lock = RLock()
self.cv = Condition(RLock())
self.updater = {}

def __set_name__(self, owner, name):
if self.attrname is None:
Expand All @@ -961,21 +963,68 @@ def __get__(self, instance, owner=None):
f"instance to cache {self.attrname!r} property."
)
raise TypeError(msg) from None

# Quickly and atomically determine which thread is reponsible
# for doing the update so that other threads can wait for the
# update to complete. If the current thread is already running,
# allow the reentrant thread to proceed rather than waiting.
key = id(self)
this_thread = get_ident()
with self.updater_lock:
reentrant = self.updater.get(key) == this_thread
wait = self.updater.setdefault(key, this_thread) != this_thread

# Only if this particular instance is currently being updated,
# block and wait for the updating thread to finish. Other
# instances won't have to wait.
while wait:
with self.cv:
while this_thread != self.updater.get(key, this_thread):
self.cv.wait()
with self.updater_lock:
reentrant = self.updater.get(key) == this_thread
wait = self.updater.setdefault(key, this_thread) != this_thread

# If a value has already been stored, use it.
val = cache.get(self.attrname, _NOT_FOUND)
if val is _NOT_FOUND:
with self.lock:
# check if another thread filled cache while we awaited lock
val = cache.get(self.attrname, _NOT_FOUND)
if val is _NOT_FOUND:
val = self.func(instance)
try:
cache[self.attrname] = val
except TypeError:
msg = (
f"The '__dict__' attribute on {type(instance).__name__!r} instance "
f"does not support item assignment for caching {self.attrname!r} property."
)
raise TypeError(msg) from None
if val is not _NOT_FOUND:
if not reentrant:
with self.updater_lock:
self.updater.pop(key, None)
with self.cv:
self.cv.notify_all()
return val

# Call the underlying function to compute the value.
try:
val = self.func(instance)
except Exception:
with self.updater_lock:
self.updater.pop(key, None)
with self.cv:
self.cv.notify_all()
raise

# Attempt to store the value
try:
cache[self.attrname] = val
except TypeError:
with self.updater_lock:
self.updater.pop(key, None)
with self.cv:
self.cv.notify_all()
msg = (
f"The '__dict__' attribute on {type(instance).__name__!r} instance "
f"does not support item assignment for caching {self.attrname!r} property."
)
raise TypeError(msg) from None

# Value has been computed and cached. Now return it.
if not reentrant:
with self.updater_lock:
self.updater.pop(key, None)
with self.cv:
self.cv.notify_all()
return val

__class_getitem__ = classmethod(GenericAlias)
3 changes: 2 additions & 1 deletion Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os as _os
import sys as _sys
import _thread
import functools

from time import monotonic as _time
from _weakrefset import WeakSet
Expand Down Expand Up @@ -1477,6 +1476,8 @@ def _register_atexit(func, *arg, **kwargs):
if _SHUTTING_DOWN:
raise RuntimeError("can't register atexit after shutdown")

# Lazy import to prevent an import cycle
import functools
call = functools.partial(func, *arg, **kwargs)
_threading_atexits.append(call)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed locking for functools.cached_property() to have instance specific
locking. Formerly, it blocked all instances of the same class, making it
unsuitable for parallel I/O bound computations (which were a key use case).