Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 100 additions & 17 deletions src/distro/distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,80 @@ def uname_attr(self, attribute: str) -> str:
"""
return self._uname_info.get(attribute, "")

@staticmethod
def __abs_path_join(root_path: str, abs_path: str) -> str:
rel_path = os.path.splitdrive(abs_path)[1].lstrip(os.sep)
if os.altsep is not None:
rel_path = rel_path.lstrip(os.altsep)

return os.path.join(root_path, rel_path)

def __resolve_chroot_symlink_as_needed(self, link_location: str) -> str:
"""
Resolves a potential symlink in ``link_location`` against
``self.root_dir`` if inside the chroot, else just return the original
path.
We're doing this check at a central place, to making the calling code
more readable and to de-duplicate.
"""
if self.root_dir is None:
return link_location

# resolve `self.root_dir`, once and for all.
root_dir = os.path.realpath(self.root_dir)

# consider non-absolute `link_location` relative to `root_dir` (as
# `os.path.commonpath` does not support mixing absolute and relative
# paths).
if not os.path.isabs(link_location):
link_location = self.__abs_path_join(root_dir, link_location)

seen_paths = set()
while True:
# while `link_location` _should_ be relative to chroot (either
# passed from trusted code or already resolved by previous loop
# iteration), we enforce this check as `self.os_release_file` and
# `self.distro_release_file` may be user-supplied.
if os.path.commonpath([root_dir, link_location]) != root_dir:
raise FileNotFoundError

if not os.path.islink(link_location):
# assert _final_ path is actually inside chroot (this is
# required to address `..` usages, potentially leading to
# outside, after subsequent link resolutions).
if (
os.path.commonpath([root_dir, os.path.realpath(link_location)])
!= root_dir
):
raise FileNotFoundError

return link_location

resolved = os.readlink(link_location)
if not os.path.isabs(resolved):
# compute resolved path relatively to previous `link_location`
# and accordingly to chroot. We also canonize "top" `..`
# components (relatively to `root_dir`), as they would
# legitimately resolve to chroot itself).
resolved = os.path.relpath(
os.path.join(os.path.dirname(link_location), resolved),
start=root_dir,
).lstrip(
os.pardir
+ os.pathsep
+ (os.altsep if os.altsep is not None else "")
)

# "move" back (absolute) path inside the chroot
resolved = self.__abs_path_join(root_dir, resolved)

# prevent symlinks infinite loop
if resolved in seen_paths:
raise FileNotFoundError

seen_paths.add(link_location)
link_location = resolved

@cached_property
def _os_release_info(self) -> Dict[str, str]:
"""
Expand All @@ -1099,10 +1173,14 @@ def _os_release_info(self) -> Dict[str, str]:
Returns:
A dictionary containing all information items.
"""
if os.path.isfile(self.os_release_file):
with open(self.os_release_file, encoding="utf-8") as release_file:
try:
with open(
self.__resolve_chroot_symlink_as_needed(self.os_release_file),
encoding="utf-8",
) as release_file:
return self._parse_os_release_content(release_file)
return {}
except FileNotFoundError:
return {}

@staticmethod
def _parse_os_release_content(lines: TextIO) -> Dict[str, str]:
Expand Down Expand Up @@ -1223,7 +1301,10 @@ def _oslevel_info(self) -> str:
def _debian_version(self) -> str:
try:
with open(
os.path.join(self.etc_dir, "debian_version"), encoding="ascii"
self.__resolve_chroot_symlink_as_needed(
os.path.join(self.etc_dir, "debian_version")
),
encoding="ascii",
) as fp:
return fp.readline().rstrip()
except FileNotFoundError:
Expand All @@ -1233,7 +1314,10 @@ def _debian_version(self) -> str:
def _armbian_version(self) -> str:
try:
with open(
os.path.join(self.etc_dir, "armbian-release"), encoding="ascii"
self.__resolve_chroot_symlink_as_needed(
os.path.join(self.etc_dir, "armbian-release")
),
encoding="ascii",
) as fp:
return self._parse_os_release_content(fp).get("version", "")
except FileNotFoundError:
Expand Down Expand Up @@ -1285,9 +1369,10 @@ def _distro_release_info(self) -> Dict[str, str]:
try:
basenames = [
basename
for basename in os.listdir(self.etc_dir)
for basename in os.listdir(
self.__resolve_chroot_symlink_as_needed(self.etc_dir)
)
if basename not in _DISTRO_RELEASE_IGNORE_BASENAMES
and os.path.isfile(os.path.join(self.etc_dir, basename))
]
# We sort for repeatability in cases where there are multiple
# distro specific files; e.g. CentOS, Oracle, Enterprise all
Expand All @@ -1303,12 +1388,13 @@ def _distro_release_info(self) -> Dict[str, str]:
match = _DISTRO_RELEASE_BASENAME_PATTERN.match(basename)
if match is None:
continue
filepath = os.path.join(self.etc_dir, basename)
distro_info = self._parse_distro_release_file(filepath)
# NOTE: _parse_distro_release_file below will be resolving for us
unresolved_filepath = os.path.join(self.etc_dir, basename)
distro_info = self._parse_distro_release_file(unresolved_filepath)
# The name is always present if the pattern matches.
if "name" not in distro_info:
continue
self.distro_release_file = filepath
self.distro_release_file = unresolved_filepath
break
else: # the loop didn't "break": no candidate.
return {}
Expand Down Expand Up @@ -1342,7 +1428,9 @@ def _parse_distro_release_file(self, filepath: str) -> Dict[str, str]:
A dictionary containing all information items.
"""
try:
with open(filepath, encoding="utf-8") as fp:
with open(
self.__resolve_chroot_symlink_as_needed(filepath), encoding="utf-8"
) as fp:
# Only parse the first line. For instance, on SLES there
# are multiple lines. We don't want them...
return self._parse_distro_release_content(fp.readline())
Expand Down Expand Up @@ -1402,12 +1490,7 @@ def main() -> None:
args = parser.parse_args()

if args.root_dir:
dist = LinuxDistribution(
include_lsb=False,
include_uname=False,
include_oslevel=False,
root_dir=args.root_dir,
)
dist = LinuxDistribution(root_dir=args.root_dir)
else:
dist = _distro

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=absolute_symlinks
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=root_dir_non_escape
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=root_dir_os_release_file
55 changes: 52 additions & 3 deletions tests/test_distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,6 @@ def setup_method(self, test_method: FunctionType) -> None:
dist = test_method.__name__.split("_")[1]
root_dir = os.path.join(DISTROS_DIR, dist)
self.distro = distro.LinuxDistribution(
include_lsb=False,
include_uname=False,
include_oslevel=False,
os_release_file="",
distro_release_file="path-to-non-existing-file",
root_dir=root_dir,
Expand Down Expand Up @@ -774,6 +771,58 @@ def test_empty_release(self) -> None:
desired_outcome = {"id": "empty"}
self._test_outcome(desired_outcome)

def test_root_dir_os_release_file_relative(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_os_release_file"),
os_release_file="tmp/os-release",
)
desired_outcome = {"id": "root_dir_os_release_file"}
self._test_outcome(desired_outcome)

def test_root_dir_os_release_file_absolute(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_os_release_file"),
os_release_file="/tmp/os-release",
)
# as we honor `os_release_file`, loading existing file outside of root_dir has
# been prevented (empty data)
self._test_outcome({})

def test_root_dir_absolute_symlinks(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_absolute_symlinks")
)
desired_outcome = {"id": "absolute_symlinks"}
self._test_outcome(desired_outcome)

def test_root_dir_escape(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_escape")
)
# loading existing file outside of root_dir has been prevented (empty data)
self._test_outcome({})

def test_root_dir_escape_abs(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_escape_abs")
)
# loading existing file outside of root_dir has been prevented (empty data)
self._test_outcome({})

def test_root_dir_non_escape(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_non_escape")
)
desired_outcome = {"id": "root_dir_non_escape"}
self._test_outcome(desired_outcome)

def test_root_dir_symlinks_loop(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_symlinks_loop")
)
# due to symbolic links loop, loading of file has been prevented (empty data)
self._test_outcome({})

def test_dontincludeuname(self) -> None:
self._setup_for_distro(os.path.join(TESTDISTROS, "distro", "dontincludeuname"))

Expand Down