Skip to content

bpo-45545: chdir __exit__ is not safe #29218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions Lib/contextlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import _collections_abc
from collections import deque
from errno import ENAMETOOLONG
from functools import wraps
from types import MethodType, GenericAlias

Expand Down Expand Up @@ -767,13 +768,29 @@ async def __aexit__(self, *excinfo):
class chdir(AbstractContextManager):
"""Non thread-safe context manager to change the current working directory."""

def __init__(self, path):
def __init__(self, path, *, ignore_errors=False):
self.path = path
self.ignore_errors = ignore_errors
self._old_cwd = []

def __enter__(self):
self._old_cwd.append(os.getcwd())
os.chdir(self.path)

def __exit__(self, *excinfo):
os.chdir(self._old_cwd.pop())
abs_return = self._old_cwd.pop()
try:
os.chdir(abs_return)
except OSError as exc:
if exc.errno == ENAMETOOLONG:
try:
cwd = os.getcwd()
if os.path.commonpath([abs_return, cwd]):
os.chdir(os.path.relpath(abs_return, cwd))
else:
raise
except OSError:
if not self.ignore_errors:
raise
elif not self.ignore_errors:
raise
83 changes: 83 additions & 0 deletions Lib/test/test_contextlib.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Unit tests for contextlib.py, and other context managers."""

import io
import errno
import os
import shutil
import sys
import tempfile
import threading
Expand Down Expand Up @@ -1249,6 +1251,87 @@ def test_exception(self):
self.assertEqual(str(re), "boom")
self.assertEqual(os.getcwd(), old_cwd)

def test_with_os_chdir(self):
old_cwd = os.getcwd()
target1 = os.path.join(os.path.dirname(__file__), 'data')
target2 = os.path.join(os.path.dirname(__file__), 'ziptestdata')

with chdir(target1):
self.assertEqual(os.getcwd(), target1)
os.chdir(target2)
self.assertEqual(os.getcwd(), target2)
self.assertEqual(os.getcwd(), old_cwd)

def test_long_path(self):
if not (hasattr(os, 'pathconf_names')
and 'PC_NAME_MAX' in os.pathconf_names
and 'PC_PATH_MAX' in os.pathconf_names):
self.skipTest('Cannot determine a path max to test against.')

nmax = os.pathconf(os.getcwd(), 'PC_NAME_MAX')
pmax = os.pathconf(os.getcwd(), 'PC_PATH_MAX')
long_path = old_cwd = os.getcwd()
dirname = '_' * nmax

try:
for _ in range(pmax // (nmax + 1)):
long_path = os.path.join(long_path, dirname)
os.mkdir(dirname)
os.chdir(dirname)
os.mkdir(dirname)
try:
os.getcwd()
except OSError as exc:
if exc.errno == errno.ENOENT:
# This will rais an exception in __enter__ when we are
# testing for an exception in __exit__
self.skipTest('Cannot retrieve cwd that is longer than '
'PC_PATH_MAX')
raise

with chdir(dirname):
self.assertEqual(os.getcwd(), os.path.join(long_path, dirname))
self.assertEqual(os.getcwd(), long_path)
finally:
os.chdir(old_cwd)
shutil.rmtree(os.path.join(old_cwd, dirname), ignore_errors=True)


def test_deleted_dir_without_ignore(self):
old_cwd = os.getcwd()
os.mkdir('dead')
os.mkdir('alive')

try:
with self.assertRaises(FileNotFoundError):
with chdir('dead'):
with chdir(os.path.join(old_cwd, 'alive')):
os.rmdir(os.path.join(old_cwd, 'dead'))
self.assertEqual(os.getcwd(), old_cwd)
finally:
os.chdir(old_cwd)
if os.path.isdir('dead'):
os.rmdir('dead')
if os.path.isdir('alive'):
os.rmdir('alive')

def test_deleted_dir_with_ignore(self):
old_cwd = os.getcwd()
os.mkdir('dead')
os.mkdir('alive')

try:
with chdir('dead', ignore_errors=True):
with chdir(os.path.join(old_cwd, 'alive'), ignore_errors=True):
os.rmdir(os.path.join(old_cwd, 'dead'))
self.assertEqual(os.getcwd(), old_cwd)
finally:
os.chdir(old_cwd)
if os.path.isdir('dead'):
os.rmdir('dead')
if os.path.isdir('alive'):
os.rmdir('alive')


if __name__ == "__main__":
unittest.main()