Skip to content

bpo-39850: Add support for abstract sockets in multiprocessing #18866

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def arbitrary_address(family):
if family == 'AF_INET':
return ('localhost', 0)
elif family == 'AF_UNIX':
# Prefer abstract sockets if possible to avoid problems with the address
# size. When coding portable applications, some implementations have
# sun_path as short as 92 bytes in the sockaddr_un struct.
if util.abstract_sockets_supported:
return f"\0listener-{os.getpid()}-{next(_mmap_counter)}"
return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
Expand Down Expand Up @@ -102,7 +107,7 @@ def address_type(address):
return 'AF_INET'
elif type(address) is str and address.startswith('\\\\'):
return 'AF_PIPE'
elif type(address) is str:
elif type(address) is str or util.is_abstract_socket_namespace(address):
return 'AF_UNIX'
else:
raise ValueError('address type of %r unrecognized' % address)
Expand Down Expand Up @@ -597,7 +602,8 @@ def __init__(self, address, family, backlog=1):
self._family = family
self._last_accepted = None

if family == 'AF_UNIX':
if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
# Linux abstract socket namespaces do not need to be explicitly unlinked
self._unlink = util.Finalize(
self, os.unlink, args=(address,), exitpriority=0
)
Expand Down
6 changes: 4 additions & 2 deletions Lib/multiprocessing/forkserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def _stop_unlocked(self):
os.waitpid(self._forkserver_pid, 0)
self._forkserver_pid = None

os.unlink(self._forkserver_address)
if not util.is_abstract_socket_namespace(self._forkserver_address):
os.unlink(self._forkserver_address)
self._forkserver_address = None

def set_forkserver_preload(self, modules_names):
Expand Down Expand Up @@ -135,7 +136,8 @@ def ensure_running(self):
with socket.socket(socket.AF_UNIX) as listener:
address = connection.arbitrary_address('AF_UNIX')
listener.bind(address)
os.chmod(address, 0o600)
if not util.is_abstract_socket_namespace(address):
os.chmod(address, 0o600)
listener.listen()

# all client processes own the write end of the "alive" pipe;
Expand Down
6 changes: 5 additions & 1 deletion Lib/multiprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1262,8 +1262,12 @@ class SharedMemoryServer(Server):

def __init__(self, *args, **kwargs):
Server.__init__(self, *args, **kwargs)
address = self.address
# The address of Linux abstract namespaces can be bytes
if isinstance(address, bytes):
address = os.fsdecode(address)
self.shared_memory_context = \
_SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
_SharedMemoryTracker(f"shm_{address}_{getpid()}")
util.debug(f"SharedMemoryServer started by pid {getpid()}")

def create(self, c, typeid, /, *args, **kwargs):
Expand Down
23 changes: 23 additions & 0 deletions Lib/multiprocessing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,29 @@ def log_to_stderr(level=None):
_log_to_stderr = True
return _logger


# Abstract socket support

def _platform_supports_abstract_sockets():
if sys.platform == "linux":
return True
if hasattr(sys, 'getandroidapilevel'):
return True
return False


def is_abstract_socket_namespace(address):
if not address:
return False
if isinstance(address, bytes):
return address[0] == 0
elif isinstance(address, str):
return address[0] == "\0"
raise TypeError('address type of {address!r} unrecognized')


abstract_sockets_supported = _platform_supports_abstract_sockets()

#
# Function returning a temp directory which will be removed on exit
#
Expand Down
13 changes: 13 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3274,6 +3274,19 @@ def test_context(self):
if self.TYPE == 'processes':
self.assertRaises(OSError, l.accept)

@unittest.skipUnless(util.abstract_sockets_supported,
"test needs abstract socket support")
def test_abstract_socket(self):
with self.connection.Listener("\0something") as listener:
with self.connection.Client(listener.address) as client:
with listener.accept() as d:
client.send(1729)
self.assertEqual(d.recv(), 1729)

if self.TYPE == 'processes':
self.assertRaises(OSError, listener.accept)


class _TestListenerClient(BaseTestCase):

ALLOWED_TYPES = ('processes', 'threads')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
:mod:`multiprocessing` now supports abstract socket addresses (if abstract sockets
are supported in the running platform). When creating arbitrary addresses (like when
default-constructing :class:`multiprocessing.connection.Listener` objects) abstract
sockets are preferred to avoid the case when the temporary-file-generated address is
too large for an AF_UNIX socket address. Patch by Pablo Galindo.