Skip to content

bpo-38119: fix shared memory's resource tracking #23174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Include/internal/pycore_atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ typedef struct _Py_atomic_int {
#define _Py_atomic_load_explicit(ATOMIC_VAL, ORDER) \
atomic_load_explicit(&((ATOMIC_VAL)->_value), ORDER)

#define _Py_atomic_fetch_add_explicit(ATOMIC_VAL, VAL, ORDER) \
atomic_fetch_add_explicit(&((ATOMIC_VAL)->_value), VAL, ORDER)

#define _Py_atomic_fetch_sub_explicit(ATOMIC_VAL, VAL, ORDER) \
atomic_fetch_sub_explicit(&((ATOMIC_VAL)->_value), VAL, ORDER)

/* Use builtin atomic operations in GCC >= 4.7 */
#elif defined(HAVE_BUILTIN_ATOMIC)

Expand Down Expand Up @@ -100,6 +106,20 @@ typedef struct _Py_atomic_int {
|| (ORDER) == __ATOMIC_CONSUME), \
__atomic_load_n(&((ATOMIC_VAL)->_value), ORDER))

#define _Py_atomic_fetch_add_explicit(ATOMIC_VAL, VAL, ORDER) \
(assert((ORDER) == __ATOMIC_RELAXED \
|| (ORDER) == __ATOMIC_SEQ_CST \
|| (ORDER) == __ATOMIC_ACQUIRE \
|| (ORDER) == __ATOMIC_CONSUME), \
__atomic_fetch_add(&((ATOMIC_VAL)->_value), VAL, ORDER))

#define _Py_atomic_fetch_sub_explicit(ATOMIC_VAL, VAL, ORDER) \
(assert((ORDER) == __ATOMIC_RELAXED \
|| (ORDER) == __ATOMIC_SEQ_CST \
|| (ORDER) == __ATOMIC_ACQUIRE \
|| (ORDER) == __ATOMIC_CONSUME), \
__atomic_fetch_sub(&((ATOMIC_VAL)->_value), VAL, ORDER))

/* Only support GCC (for expression statements) and x86 (for simple
* atomic semantics) and MSVC x86/x64/ARM */
#elif defined(__GNUC__) && (defined(__i386__) || defined(__amd64))
Expand Down Expand Up @@ -551,6 +571,12 @@ typedef struct _Py_atomic_int {
#define _Py_atomic_load_relaxed(ATOMIC_VAL) \
_Py_atomic_load_explicit((ATOMIC_VAL), _Py_memory_order_relaxed)

#define _Py_atomic_fetch_add_relaxed(ATOMIC_VAL, VAL) \
_Py_atomic_fetch_add_explicit((ATOMIC_VAL), (VAL), _Py_memory_order_relaxed)

#define _Py_atomic_fetch_sub_relaxed(ATOMIC_VAL, VAL) \
_Py_atomic_fetch_sub_explicit((ATOMIC_VAL), (VAL), _Py_memory_order_relaxed)

#ifdef __cplusplus
}
#endif
Expand Down
7 changes: 5 additions & 2 deletions Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
if os.name == 'posix':
import _multiprocessing
import _posixshmem
from multiprocessing.shared_memory import cleanup_shared_memory, shm_inc_refcount

_CLEANUP_FUNCS.update({
'semaphore': _multiprocessing.sem_unlink,
'shared_memory': _posixshmem.shm_unlink,
'shared_memory': cleanup_shared_memory,
})


Expand Down Expand Up @@ -196,7 +197,9 @@ def main(fd):
f'unknown resource type {rtype}')

if cmd == 'REGISTER':
cache[rtype].add(name)
if rtype == "shared_memory" and name not in cache[rtype]:
cache[rtype].add(name)
shm_inc_refcount(name)
elif cmd == 'UNREGISTER':
cache[rtype].remove(name)
elif cmd == 'PROBE':
Expand Down
48 changes: 39 additions & 9 deletions Lib/multiprocessing/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ class SharedMemory:
_flags = os.O_RDWR
_mode = 0o600
_prepend_leading_slash = True if _USE_POSIX else False
_track_resource = True

def __init__(self, name=None, create=False, size=0):
def __init__(self, name=None, create=False, size=0, track_resource=True):
if not size >= 0:
raise ValueError("'size' must be a positive integer")
if create:
Expand All @@ -81,6 +82,7 @@ def __init__(self, name=None, create=False, size=0):
if name is None and not self._flags & os.O_EXCL:
raise ValueError("'name' can only be None if create=True")

self._track_resource = track_resource
if _USE_POSIX:

# POSIX Shared Memory
Expand Down Expand Up @@ -108,6 +110,7 @@ def __init__(self, name=None, create=False, size=0):
self._name = name
try:
if create and size:
size += _posixshmem.REFCOUNT_SIZE
os.ftruncate(self._fd, size)
stats = os.fstat(self._fd)
size = stats.st_size
Expand All @@ -116,8 +119,13 @@ def __init__(self, name=None, create=False, size=0):
self.unlink()
raise

from .resource_tracker import register
register(self._name, "shared_memory")
self._size = size
self._refcount = memoryview(self._mmap)[0:_posixshmem.REFCOUNT_SIZE]
self._buf = memoryview(self._mmap)[_posixshmem.REFCOUNT_SIZE:]

if self._track_resource:
from .resource_tracker import register
register(self.name, "shared_memory")

else:

Expand Down Expand Up @@ -176,8 +184,9 @@ def __init__(self, name=None, create=False, size=0):
size = _winapi.VirtualQuerySize(p_buf)
self._mmap = mmap.mmap(-1, size, tagname=name)

self._size = size
self._buf = memoryview(self._mmap)
self._size = size
self._buf = memoryview(self._mmap)
self._refcount = None

def __del__(self):
try:
Expand Down Expand Up @@ -215,11 +224,17 @@ def name(self):
@property
def size(self):
"Size in bytes."
return self._size
if _USE_POSIX:
return self._size - _posixshmem.REFCOUNT_SIZE
else:
return self._size

def close(self):
"""Closes access to the shared memory from this instance but does
not destroy the shared memory block."""
if self._refcount is not None:
self._refcount.release()
self._refcount = None
if self._buf is not None:
self._buf.release()
self._buf = None
Expand All @@ -237,10 +252,25 @@ def unlink(self):
called once (and only once) across all processes which have access
to the shared memory block."""
if _USE_POSIX and self._name:
from .resource_tracker import unregister
_posixshmem.shm_unlink(self._name)
unregister(self._name, "shared_memory")

if self._track_resource:
from .resource_tracker import unregister
unregister(self.name, "shared_memory")

def cleanup_shared_memory(name):
try:
shm = SharedMemory(name, track_resource=False)
refcount = _posixshmem.shm_dec_refcount(shm._refcount)
if refcount == 0: shm.unlink()
except FileNotFoundError: # Segment with name has already been unlinked
pass

def shm_inc_refcount(name):
try:
shm = SharedMemory(name, track_resource=False)
_posixshmem.shm_inc_refcount(shm._refcount)
except FileNotFoundError: # Segment with name has already been unlinked
pass

_encoding = "utf8"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix shared_memory's resource tracking by using reference counting
1 change: 1 addition & 0 deletions Modules/Setup
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ _signal -DPy_BUILD_CORE_BUILTIN -I$(srcdir)/Include/internal signalmodule.c
_stat _stat.c # stat.h interface
time -DPy_BUILD_CORE_BUILTIN -I$(srcdir)/Include/internal timemodule.c # -lm # time operations and variables
_thread -DPy_BUILD_CORE_BUILTIN -I$(srcdir)/Include/internal _threadmodule.c # low-level threading interface
_posixshmem -DPy_BUILD_CORE_BUILTIN -I$(srcdir)/Include/internal _multiprocessing/posixshmem.c # shared memory interface dependent on 'pycore_atomic.h'

# access to ISO C locale support
_locale -DPy_BUILD_CORE_BUILTIN _localemodule.c # -lintl
Expand Down
76 changes: 75 additions & 1 deletion Modules/_multiprocessing/clinic/posixshmem.c.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,85 @@ _posixshmem_shm_unlink(PyObject *module, PyObject *const *args, Py_ssize_t nargs

#endif /* defined(HAVE_SHM_UNLINK) */

PyDoc_STRVAR(_posixshmem_shm_inc_refcount__doc__,
"shm_inc_refcount($module, /, ptr)\n"
"--\n"
"\n"
"Increment Reference Count of the memoryview object");

#define _POSIXSHMEM_SHM_INC_REFCOUNT_METHODDEF \
{"shm_inc_refcount", (PyCFunction)(void(*)(void))_posixshmem_shm_inc_refcount, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_inc_refcount__doc__},

static int
_posixshmem_shm_inc_refcount_impl(PyObject *module, PyObject *ptr);

static PyObject *
_posixshmem_shm_inc_refcount(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
static const char * const _keywords[] = {"ptr", NULL};
static _PyArg_Parser _parser = {NULL, _keywords, "shm_inc_refcount", 0};
PyObject *argsbuf[1];
PyObject *ptr;
int _return_value;

args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf);
if (!args) {
goto exit;
}
ptr = args[0];
_return_value = _posixshmem_shm_inc_refcount_impl(module, ptr);
if ((_return_value == -1) && PyErr_Occurred()) {
goto exit;
}
return_value = PyLong_FromLong((long)_return_value);

exit:
return return_value;
}

PyDoc_STRVAR(_posixshmem_shm_dec_refcount__doc__,
"shm_dec_refcount($module, /, ptr)\n"
"--\n"
"\n"
"Decrement Reference Count of the memoryview object");

#define _POSIXSHMEM_SHM_DEC_REFCOUNT_METHODDEF \
{"shm_dec_refcount", (PyCFunction)(void(*)(void))_posixshmem_shm_dec_refcount, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_dec_refcount__doc__},

static int
_posixshmem_shm_dec_refcount_impl(PyObject *module, PyObject *ptr);

static PyObject *
_posixshmem_shm_dec_refcount(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
static const char * const _keywords[] = {"ptr", NULL};
static _PyArg_Parser _parser = {NULL, _keywords, "shm_dec_refcount", 0};
PyObject *argsbuf[1];
PyObject *ptr;
int _return_value;

args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf);
if (!args) {
goto exit;
}
ptr = args[0];
_return_value = _posixshmem_shm_dec_refcount_impl(module, ptr);
if ((_return_value == -1) && PyErr_Occurred()) {
goto exit;
}
return_value = PyLong_FromLong((long)_return_value);

exit:
return return_value;
}

#ifndef _POSIXSHMEM_SHM_OPEN_METHODDEF
#define _POSIXSHMEM_SHM_OPEN_METHODDEF
#endif /* !defined(_POSIXSHMEM_SHM_OPEN_METHODDEF) */

#ifndef _POSIXSHMEM_SHM_UNLINK_METHODDEF
#define _POSIXSHMEM_SHM_UNLINK_METHODDEF
#endif /* !defined(_POSIXSHMEM_SHM_UNLINK_METHODDEF) */
/*[clinic end generated code: output=bca8e78d0f43ef1a input=a9049054013a1b77]*/
/*[clinic end generated code: output=cd4bd3692d1ce532 input=a9049054013a1b77]*/
41 changes: 41 additions & 0 deletions Modules/_multiprocessing/posixshmem.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ posixshmem - A Python extension that provides shm_open() and shm_unlink()
#define PY_SSIZE_T_CLEAN

#include <Python.h>
#include <stdio.h>
#include "pycore_atomic.h"

// for shm_open() and shm_unlink()
#ifdef HAVE_SYS_MMAN_H
Expand Down Expand Up @@ -101,11 +103,44 @@ _posixshmem_shm_unlink_impl(PyObject *module, PyObject *path)
}
#endif /* HAVE_SHM_UNLINK */

/*[clinic input]
_posixshmem.shm_inc_refcount -> int
ptr: object

Increment Reference Count of the memoryview object

[clinic start generated code]*/

static int
_posixshmem_shm_inc_refcount_impl(PyObject *module, PyObject *ptr)
/*[clinic end generated code: output=9ed5b4d016975d06 input=b7c1fe6ce39b7bb4]*/
{
Py_buffer *buf = PyMemoryView_GET_BUFFER(ptr);
return _Py_atomic_fetch_add_relaxed((_Py_atomic_int*)(buf->buf), 1) + 1;
}
/*[clinic input]
_posixshmem.shm_dec_refcount -> int
ptr: object

Decrement Reference Count of the memoryview object

[clinic start generated code]*/

static int
_posixshmem_shm_dec_refcount_impl(PyObject *module, PyObject *ptr)
/*[clinic end generated code: output=16ab284487281c72 input=0aab6ded127aa5c3]*/
{
Py_buffer *buf = PyMemoryView_GET_BUFFER(ptr);
return _Py_atomic_fetch_sub_relaxed((_Py_atomic_int*)(buf->buf), 1) - 1;
}

#include "clinic/posixshmem.c.h"

static PyMethodDef module_methods[ ] = {
_POSIXSHMEM_SHM_OPEN_METHODDEF
_POSIXSHMEM_SHM_UNLINK_METHODDEF
_POSIXSHMEM_SHM_INC_REFCOUNT_METHODDEF
_POSIXSHMEM_SHM_DEC_REFCOUNT_METHODDEF
{NULL} /* Sentinel */
};

Expand All @@ -118,6 +153,8 @@ static struct PyModuleDef this_module = {
module_methods, // m_methods
};

const char *NAME_REFCOUNT_SIZE = "REFCOUNT_SIZE";

/* Module init function */
PyMODINIT_FUNC
PyInit__posixshmem(void) {
Expand All @@ -126,5 +163,9 @@ PyInit__posixshmem(void) {
if (!module) {
return NULL;
}
if (PyModule_AddIntConstant(module, NAME_REFCOUNT_SIZE, sizeof(_Py_atomic_int))) {
Py_XDECREF(module);
return NULL;
}
return module;
}
1 change: 1 addition & 0 deletions Tools/c-analyzer/cpython/_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def clean_lines(text):
Modules/main.c Py_BUILD_CORE 1
Modules/posixmodule.c Py_BUILD_CORE 1
Modules/signalmodule.c Py_BUILD_CORE 1
Modules/_multiprocessing/posixshmem.c Py_BUILD_CORE 1
Modules/_threadmodule.c Py_BUILD_CORE 1
Modules/_tracemalloc.c Py_BUILD_CORE 1
Modules/_asynciomodule.c Py_BUILD_CORE 1
Expand Down