From ce1e36ae66c8ccd2195f64c72c09611ed9057f54 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 00:15:53 -0500 Subject: [PATCH 01/15] Draft --- Lib/functools.py | 79 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 18 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index 357c1dfd909fa9..199c3a005fab53 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -18,7 +18,7 @@ from collections import namedtuple # import types, weakref # Deferred to single_dispatch() from reprlib import recursive_repr -from _thread import RLock +from threading import RLock, Condition, get_ident from types import GenericAlias @@ -929,14 +929,16 @@ def __isabstractmethod__(self): ################################################################################ _NOT_FOUND = object() - +_EXCEPTION_RAISED = object() class cached_property: def __init__(self, func): self.func = func self.attrname = None self.__doc__ = func.__doc__ - self.lock = RLock() + self.updater_lock = RLock() + self.cv = Condition(RLock()) + self.updater = {} def __set_name__(self, owner, name): if self.attrname is None: @@ -961,21 +963,62 @@ def __get__(self, instance, owner=None): f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None - val = cache.get(self.attrname, _NOT_FOUND) - if val is _NOT_FOUND: - with self.lock: - # check if another thread filled cache while we awaited lock - val = cache.get(self.attrname, _NOT_FOUND) - if val is _NOT_FOUND: - val = self.func(instance) - try: - cache[self.attrname] = val - except TypeError: - msg = ( - f"The '__dict__' attribute on {type(instance).__name__!r} instance " - f"does not support item assignment for caching {self.attrname!r} property." - ) - raise TypeError(msg) from None + + # Quickly and atomically determine which thread is reponsible + # for doing the update, so other threads can wait for that + # update to complete. If the update is already done, don't + # wait. If the updating thread is reentrant, don't wait. + key = id(self) + this_thread = get_ident() + with self.updater_lock: + val = cache.get(self.attrname, _NOT_FOUND) + if val is not _NOT_FOUND: + return val + wait = self.updater.setdefault(key, this_thread) != this_thread + + # ONLY if this instance currently being updated, block and wait + # for the computed result. Other instances won't have to wait. + # If an exception occurred, stop waiting. + if wait: + with self.cv: + while cache.get(self.attrname, _NOT_FOUND) is _NOT_FOUND: + self.cv.wait() + val = cache[self.attrname] + if val is not _EXCEPTION_RAISED: + return val + + # Call the underlying function to compute the value. + try: + val = self.func(instance) + except Exception: + val = _EXCEPTION_RAISED + + # Attempt to store the value + try: + cache[self.attrname] = val + except TypeError: + # Note: we have no way to communicate this exception to + # threads waiting on the condition variable. However, the + # inability to store an attribute is a programming problem + # rather than a runtime problem -- this exception would + # likely occur early in testing rather than being runtime + # event triggered by specific data. + msg = ( + f"The '__dict__' attribute on {type(instance).__name__!r} instance " + f"does not support item assignment for caching {self.attrname!r} property." + ) + raise TypeError(msg) from None + + # Now that the value is stored, threads waiting on the condition + # variable can be awakened and the updater dictionary can be + # cleaned up. + with self.updater_lock: + self.updater.pop(key, None) + cache[self.attrname] = _EXCEPTION_RAISED + self.cv.notify_all() + + if val is _EXCEPTION_RAISED: + raise return val __class_getitem__ = classmethod(GenericAlias) From d06d532c4bf3ac74e4caf64bd2fdbc7bc122c5f4 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 15:13:00 -0500 Subject: [PATCH 02/15] Prevent deadlock for non-writeable attributes --- Lib/functools.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index 199c3a005fab53..d799cb4e5f902c 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -939,6 +939,7 @@ def __init__(self, func): self.updater_lock = RLock() self.cv = Condition(RLock()) self.updater = {} + self.writeable = True # Assume attrname is writeable until shown otherwise def __set_name__(self, owner, name): if self.attrname is None: @@ -978,10 +979,12 @@ def __get__(self, instance, owner=None): # ONLY if this instance currently being updated, block and wait # for the computed result. Other instances won't have to wait. - # If an exception occurred, stop waiting. + # If an exception occurred, stop waiting. If attrname turns + # out not to be writeable, don't wait. Just recompute. if wait: with self.cv: - while cache.get(self.attrname, _NOT_FOUND) is _NOT_FOUND: + while (self.writeable + and cache.get(self.attrname, _NOT_FOUND) is _NOT_FOUND): self.cv.wait() val = cache[self.attrname] if val is not _EXCEPTION_RAISED: @@ -997,12 +1000,8 @@ def __get__(self, instance, owner=None): try: cache[self.attrname] = val except TypeError: - # Note: we have no way to communicate this exception to - # threads waiting on the condition variable. However, the - # inability to store an attribute is a programming problem - # rather than a runtime problem -- this exception would - # likely occur early in testing rather than being runtime - # event triggered by specific data. + self.writeable = False + self.cv.notifyall() msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." From d5ea2451bb735119560aaa2dd6a3f82a50070c41 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 16:42:20 -0500 Subject: [PATCH 03/15] Add "writeable" attribute --- Lib/functools.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/functools.py b/Lib/functools.py index d799cb4e5f902c..50c5e5504c9429 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -986,6 +986,7 @@ def __get__(self, instance, owner=None): while (self.writeable and cache.get(self.attrname, _NOT_FOUND) is _NOT_FOUND): self.cv.wait() + val = cache[self.attrname] if val is not _EXCEPTION_RAISED: return val From e5825ceb7521541d52663a055d24d0cfcc83b7ea Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 17:15:32 -0500 Subject: [PATCH 04/15] Simplified logic --- Lib/functools.py | 45 ++++++++++++++++++--------------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index 50c5e5504c9429..c878f5aebb59fa 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -929,7 +929,6 @@ def __isabstractmethod__(self): ################################################################################ _NOT_FOUND = object() -_EXCEPTION_RAISED = object() class cached_property: def __init__(self, func): @@ -967,41 +966,40 @@ def __get__(self, instance, owner=None): # Quickly and atomically determine which thread is reponsible # for doing the update, so other threads can wait for that - # update to complete. If the update is already done, don't - # wait. If the updating thread is reentrant, don't wait. + # update to complete. XXX If the current thread is already running, + # allow the reentrant thread to proceed rather than waiting. key = id(self) this_thread = get_ident() with self.updater_lock: - val = cache.get(self.attrname, _NOT_FOUND) - if val is not _NOT_FOUND: - return val wait = self.updater.setdefault(key, this_thread) != this_thread # ONLY if this instance currently being updated, block and wait - # for the computed result. Other instances won't have to wait. - # If an exception occurred, stop waiting. If attrname turns - # out not to be writeable, don't wait. Just recompute. + # for the main thread to finish. Other instances won't have to wait. if wait: with self.cv: - while (self.writeable - and cache.get(self.attrname, _NOT_FOUND) is _NOT_FOUND): + while key in self.updater: self.cv.wait() - - val = cache[self.attrname] - if val is not _EXCEPTION_RAISED: - return val + + # If a value has already been stored, use it + val = cache.get(self.attrname, _NOT_FOUND) + if val is not _NOT_FOUND: + self.updater.pop(key, None) + self.cv.notify_all() + return val # Call the underlying function to compute the value. try: val = self.func(instance) except Exception: - val = _EXCEPTION_RAISED + self.updater.pop(key, None) + self.cv.notify_all() + raise # Attempt to store the value try: cache[self.attrname] = val except TypeError: - self.writeable = False + self.updater.pop(key, None) self.cv.notifyall() msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " @@ -1009,16 +1007,9 @@ def __get__(self, instance, owner=None): ) raise TypeError(msg) from None - # Now that the value is stored, threads waiting on the condition - # variable can be awakened and the updater dictionary can be - # cleaned up. - with self.updater_lock: - self.updater.pop(key, None) - cache[self.attrname] = _EXCEPTION_RAISED - self.cv.notify_all() - - if val is _EXCEPTION_RAISED: - raise + # Value has been computed and cached. Now return it. + self.updater.pop(key, None) + self.cv.notify_all() return val __class_getitem__ = classmethod(GenericAlias) From 8e7f03cf8bda7f8b12ced47ff2b6d48c399b42d2 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 17:32:47 -0500 Subject: [PATCH 05/15] Passing tests --- Lib/functools.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index c878f5aebb59fa..25e35add5c57da 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -971,7 +971,13 @@ def __get__(self, instance, owner=None): key = id(self) this_thread = get_ident() with self.updater_lock: - wait = self.updater.setdefault(key, this_thread) != this_thread + if self.updater.get(key) == this_thread: + raise RuntimeError('reentrant') + if self.updater.get(key) is None: + self.updater[key] = this_thread + wait = False + else: + wait = True # ONLY if this instance currently being updated, block and wait # for the main thread to finish. Other instances won't have to wait. @@ -983,24 +989,30 @@ def __get__(self, instance, owner=None): # If a value has already been stored, use it val = cache.get(self.attrname, _NOT_FOUND) if val is not _NOT_FOUND: - self.updater.pop(key, None) - self.cv.notify_all() + with self.updater_lock: + self.updater.pop(key, None) + with self.cv: + self.cv.notify_all() return val # Call the underlying function to compute the value. try: val = self.func(instance) except Exception: - self.updater.pop(key, None) - self.cv.notify_all() + with self.updater_lock: + self.updater.pop(key, None) + with self.cv: + self.cv.notify_all() raise # Attempt to store the value try: cache[self.attrname] = val except TypeError: - self.updater.pop(key, None) - self.cv.notifyall() + with self.updater_lock: + self.updater.pop(key, None) + with self.cv: + self.cv.notify_all() msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." @@ -1008,8 +1020,10 @@ def __get__(self, instance, owner=None): raise TypeError(msg) from None # Value has been computed and cached. Now return it. - self.updater.pop(key, None) - self.cv.notify_all() + with self.updater_lock: + self.updater.pop(key, None) + with self.cv: + self.cv.notify_all() return val __class_getitem__ = classmethod(GenericAlias) From e90cfdf23901e2a5aa4d6ef9cc47fe77a0ab6c89 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 17:38:12 -0500 Subject: [PATCH 06/15] Support reentrancy --- Lib/functools.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index 25e35add5c57da..b446dc3faadc95 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -966,13 +966,15 @@ def __get__(self, instance, owner=None): # Quickly and atomically determine which thread is reponsible # for doing the update, so other threads can wait for that - # update to complete. XXX If the current thread is already running, + # update to complete. If the current thread is already running, # allow the reentrant thread to proceed rather than waiting. key = id(self) this_thread = get_ident() + reentrant = False with self.updater_lock: if self.updater.get(key) == this_thread: - raise RuntimeError('reentrant') + reentrant = True + wait = False if self.updater.get(key) is None: self.updater[key] = this_thread wait = False @@ -989,10 +991,11 @@ def __get__(self, instance, owner=None): # If a value has already been stored, use it val = cache.get(self.attrname, _NOT_FOUND) if val is not _NOT_FOUND: - with self.updater_lock: - self.updater.pop(key, None) - with self.cv: - self.cv.notify_all() + if not reentrant: + with self.updater_lock: + self.updater.pop(key, None) + with self.cv: + self.cv.notify_all() return val # Call the underlying function to compute the value. @@ -1020,10 +1023,11 @@ def __get__(self, instance, owner=None): raise TypeError(msg) from None # Value has been computed and cached. Now return it. - with self.updater_lock: - self.updater.pop(key, None) - with self.cv: - self.cv.notify_all() + if not reentrant: + with self.updater_lock: + self.updater.pop(key, None) + with self.cv: + self.cv.notify_all() return val __class_getitem__ = classmethod(GenericAlias) From f126087adba04f2115d0d919aa4c7ae1c0437e63 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 17:38:57 -0500 Subject: [PATCH 07/15] Remove unused variables --- Lib/functools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/functools.py b/Lib/functools.py index b446dc3faadc95..5417b5569a1c41 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -931,6 +931,7 @@ def __isabstractmethod__(self): _NOT_FOUND = object() class cached_property: + def __init__(self, func): self.func = func self.attrname = None @@ -938,7 +939,6 @@ def __init__(self, func): self.updater_lock = RLock() self.cv = Condition(RLock()) self.updater = {} - self.writeable = True # Assume attrname is writeable until shown otherwise def __set_name__(self, owner, name): if self.attrname is None: From 61f5f5812dac687dbd3228c13ff31e483fe39e46 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 17:41:29 -0500 Subject: [PATCH 08/15] Use setdefault() to simply "wait" logic --- Lib/functools.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index 5417b5569a1c41..916595cc2ecb4f 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -974,12 +974,7 @@ def __get__(self, instance, owner=None): with self.updater_lock: if self.updater.get(key) == this_thread: reentrant = True - wait = False - if self.updater.get(key) is None: - self.updater[key] = this_thread - wait = False - else: - wait = True + wait = self.updater.setdefault(key, this_thread) != this_thread # ONLY if this instance currently being updated, block and wait # for the main thread to finish. Other instances won't have to wait. From 83273d544201198f156e723906f02786f8f36d62 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 17:44:21 -0500 Subject: [PATCH 09/15] Improve comments --- Lib/functools.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index 916595cc2ecb4f..9b54d2af1758fa 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -976,14 +976,14 @@ def __get__(self, instance, owner=None): reentrant = True wait = self.updater.setdefault(key, this_thread) != this_thread - # ONLY if this instance currently being updated, block and wait - # for the main thread to finish. Other instances won't have to wait. + # Only if this particular instance is currently being updated, block and wait + # for the updating thread to finish. Other instances won't have to wait. if wait: with self.cv: while key in self.updater: self.cv.wait() - # If a value has already been stored, use it + # If a value has already been stored, use it. val = cache.get(self.attrname, _NOT_FOUND) if val is not _NOT_FOUND: if not reentrant: From b76c78a0aa9bfe33b1b7fa26c2fd63aeced72509 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 4 Aug 2021 18:30:40 -0500 Subject: [PATCH 10/15] Add NEWS entry --- .../next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst diff --git a/Misc/NEWS.d/next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst b/Misc/NEWS.d/next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst new file mode 100644 index 00000000000000..dc22a8bf8ffcea --- /dev/null +++ b/Misc/NEWS.d/next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst @@ -0,0 +1,3 @@ +Fixed locking for functools.cached_property() to have instance specific +locking. Formerly, it blocked all instances of the same class, making it +suitable for parallel I/O bound computations (which were a key use case). From fa4b8e0fbbb8bcb58cd9b38058d563ce3710ace0 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 11 Aug 2021 09:19:26 -0500 Subject: [PATCH 11/15] Simplify logic for setting flags --- Lib/functools.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index 9b54d2af1758fa..5a40cbea067816 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -970,10 +970,8 @@ def __get__(self, instance, owner=None): # allow the reentrant thread to proceed rather than waiting. key = id(self) this_thread = get_ident() - reentrant = False with self.updater_lock: - if self.updater.get(key) == this_thread: - reentrant = True + reentrant = self.updater.get(key) == this_thread wait = self.updater.setdefault(key, this_thread) != this_thread # Only if this particular instance is currently being updated, block and wait From de08f9f550a7056485dca3a8116b6f583b6eea55 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Wed, 11 Aug 2021 09:22:43 -0500 Subject: [PATCH 12/15] Neaten-up comments --- Lib/functools.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index 5a40cbea067816..69cbc2398b11a1 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -965,7 +965,7 @@ def __get__(self, instance, owner=None): raise TypeError(msg) from None # Quickly and atomically determine which thread is reponsible - # for doing the update, so other threads can wait for that + # for doing the update so that other threads can wait for the # update to complete. If the current thread is already running, # allow the reentrant thread to proceed rather than waiting. key = id(self) @@ -974,8 +974,9 @@ def __get__(self, instance, owner=None): reentrant = self.updater.get(key) == this_thread wait = self.updater.setdefault(key, this_thread) != this_thread - # Only if this particular instance is currently being updated, block and wait - # for the updating thread to finish. Other instances won't have to wait. + # Only if this particular instance is currently being updated, + # block and wait for the updating thread to finish. Other + # instances won't have to wait. if wait: with self.cv: while key in self.updater: From f0893efd7e0c84dc516598a214df2dadeea464c9 Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Fri, 27 Aug 2021 08:13:39 -0500 Subject: [PATCH 13/15] Fix typo in Misc/NEWS --- .../next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst b/Misc/NEWS.d/next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst index dc22a8bf8ffcea..700f40d280b455 100644 --- a/Misc/NEWS.d/next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst +++ b/Misc/NEWS.d/next/Library/2021-08-04-18-30-25.bpo-43468.PlU--J.rst @@ -1,3 +1,3 @@ Fixed locking for functools.cached_property() to have instance specific locking. Formerly, it blocked all instances of the same class, making it -suitable for parallel I/O bound computations (which were a key use case). +unsuitable for parallel I/O bound computations (which were a key use case). From 014a5c309c691437c54858a7a4d9a086a4d7645b Mon Sep 17 00:00:00 2001 From: Raymond Hettinger Date: Fri, 27 Aug 2021 08:18:01 -0500 Subject: [PATCH 14/15] Resolve import cycle --- Lib/threading.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/threading.py b/Lib/threading.py index c2b94a5045514f..797b7ed883f472 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -3,7 +3,6 @@ import os as _os import sys as _sys import _thread -import functools from time import monotonic as _time from _weakrefset import WeakSet @@ -1477,6 +1476,8 @@ def _register_atexit(func, *arg, **kwargs): if _SHUTTING_DOWN: raise RuntimeError("can't register atexit after shutdown") + # Lazy import to prevent an import cycle + import functools call = functools.partial(func, *arg, **kwargs) _threading_atexits.append(call) From 22db0d45e2ee418a593cfc9b6a5171dd494ea4a9 Mon Sep 17 00:00:00 2001 From: joncrall Date: Mon, 30 Aug 2021 21:09:38 -0400 Subject: [PATCH 15/15] candidate fix for deadlock --- Lib/functools.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Lib/functools.py b/Lib/functools.py index 69cbc2398b11a1..39948554ae4ade 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -977,10 +977,13 @@ def __get__(self, instance, owner=None): # Only if this particular instance is currently being updated, # block and wait for the updating thread to finish. Other # instances won't have to wait. - if wait: + while wait: with self.cv: - while key in self.updater: + while this_thread != self.updater.get(key, this_thread): self.cv.wait() + with self.updater_lock: + reentrant = self.updater.get(key) == this_thread + wait = self.updater.setdefault(key, this_thread) != this_thread # If a value has already been stored, use it. val = cache.get(self.attrname, _NOT_FOUND)