Skip to content

Refactor _get_metadata_file() from updater.py #1205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions tests/test_endless_data_attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ def test_with_tuf(self):
self.repository_updater.refresh()

except tuf.exceptions.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
self.assertTrue(isinstance(mirror_error, securesystemslib.exceptions.Error))
for _, mirror_error in six.iteritems(exception.mirror_errors):
# Throw tuf.exceptions.InvalidMetadataJSONError
# because the metadata is not a valud JSON file.
self.assertTrue(isinstance(mirror_error, tuf.exceptions.InvalidMetadataJSONError))

else:
self.fail('TUF did not prevent an endless data attack.')
Expand Down
49 changes: 47 additions & 2 deletions tests/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -1742,10 +1742,55 @@ def test_11__verify_metadata_file(self):

self.assertRaises(tuf.exceptions.InvalidMetadataJSONError,
self.repository_updater._verify_metadata_file,
metadata_file_object, 'root')
metadata_file_object, 'root', None)


def test_13__targets_of_role(self):
def test_12__validate_metadata_version(self):
# Test for valid metadata version with expected_version.
self.repository_updater._validate_metadata_version(
expected_version=1, metadata_role='root', version_downloaded=1)

# Test for valid metadata version without expected_version.
self.repository_updater._validate_metadata_version(
expected_version=None, metadata_role='root', version_downloaded=1)

# Test for expected_version different than version downloaded.
self.assertRaises(tuf.exceptions.BadVersionNumberError,
self.repository_updater._validate_metadata_version,
expected_version=2, metadata_role='root', version_downloaded=1)

# Test without expected_version and version_downloaded < current_version.
self.assertRaises(tuf.exceptions.ReplayedMetadataError,
self.repository_updater._validate_metadata_version,
expected_version=None, metadata_role='root', version_downloaded=0)


def test_13__validate_spec_version(self):
# Tests when metadata spec ver is compatible with tuf.SPECIFICATION_VERSION

# metadata spec ver = tuf.SPECIFICATION_VERSION
self.repository_updater._validate_spec_version(tuf.SPECIFICATION_VERSION)

code_spec_ver_split = tuf.SPECIFICATION_VERSION.split('.')
code_spec_major = int(code_spec_ver_split[0])
code_spec_minor= int(code_spec_ver_split[1])

# metadata major ver is the same as tuf.SPECIFICATION_VERSION major ver
# but metadata minor ver != tuf.SPECIFICATION_VERSION minor ver
metadata_spec = [str(code_spec_major), str(code_spec_minor + 1), '0']
separator = '.'
metadata_spec = separator.join(metadata_spec)
self.repository_updater._validate_spec_version(metadata_spec)

# Test when metadata spec ver is NOT compatible
# with tuf.SPECIFICATION_VERSION
metadata_spec = [str(code_spec_major + 1), str(code_spec_minor), '0']
metadata_spec = separator.join(metadata_spec)
self.assertRaises(tuf.exceptions.UnsupportedSpecificationError,
self.repository_updater._validate_spec_version, metadata_spec)


def test_15__targets_of_role(self):
# Test case where a list of targets is given. By default, the 'targets'
# parameter is None.
targets = [{'filepath': 'file1.txt', 'fileinfo': {'length': 1, 'hashes': {'sha256': 'abc'}}}]
Expand Down
201 changes: 118 additions & 83 deletions tuf/client/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,9 +1373,101 @@ def _verify_root_self_signed(self, signable):



def _validate_metadata_version(self, expected_version, metadata_role,
version_downloaded):
"""
<Purpose>
Non-public method validating the metadata version number.
If the expected_version is unspecified, ensure that the version number
downloaded is equal or greater than the currently trusted version number
for 'metadata_role'.

<Arguments>
expected_version:
An integer or "None" value representing the expected and required
version number of the 'metadata_role' file downloaded.

metadata_role:
The role name of the metadata (e.g., 'root', 'targets').

version_downloaded:
The version of the newly downloaded metadata file.

<Exceptions>
tuf.exceptions.BadVersionNumberError:
In case the expected_version is not None and version_downloaded
is not equal to it.

tuf.exceptions.ReplayedMetadataError:
if expected_version is None and version_downloaded is lower than
the current version.
"""

if expected_version is not None:
if version_downloaded != expected_version:
raise tuf.exceptions.BadVersionNumberError('Downloaded'
' version number: ' + repr(version_downloaded) + '. Version'
' number MUST be: ' + repr(expected_version))

else:
try:
current_version = self.metadata['current'][metadata_role]['version']

if version_downloaded < current_version:
raise tuf.exceptions.ReplayedMetadataError(metadata_role,
version_downloaded, current_version)

except KeyError:
logger.info(metadata_role + ' not available locally.')



def _validate_spec_version(self, metadata_spec_version_str):
"""
<Purpose>
Non-public method verifying a metadata specification version.
It is assumed that "spec_version" is in (major.minor.fix) format
and that releases with the same major version number maintain
backward compatibility.

<Arguments>
metadata_spec_version:
A string representing the metadata spec version. It is assumed that
it is in semantic versioning (major.minor.fix) format.

<Exceptions>
tuf.exceptions.UnsupportedSpecificationError:
In case the metadata major spec version is not supported.

securesystemslib.exceptions.FormatError:
In case the metadata_spec_version string is not in the expected format.
"""

try:
metadata_spec_version = metadata_spec_version_str.split('.')
code_spec_version = tuf.SPECIFICATION_VERSION.split('.')

if metadata_spec_version[0]!= code_spec_version[0]:
raise tuf.exceptions.UnsupportedSpecificationError(
'Incompatible spec_version. Got "%s", expected "%s".' %
(metadata_spec_version_str, code_spec_version))

# Warn if spec_version minor version number does not not match
if metadata_spec_version[1]!= code_spec_version[1]:
logger.info("Received metadata has minor version mismatch. Got %s"
", expected %s. Continuing" % (metadata_spec_version_str,
tuf.SPECIFICATION_VERSION))

except (ValueError, TypeError) as error:
six.raise_from(securesystemslib.exceptions.FormatError('Improperly'
' formatted spec_version, Must be in semver (major.minor.patch)'
'format'),
error)



def _verify_metadata_file(self, metadata_file_object,
metadata_role):
metadata_role, expected_version):
"""
<Purpose>
Non-public method that verifies a metadata file. An exception is
Expand All @@ -1390,15 +1482,29 @@ def _verify_metadata_file(self, metadata_file_object,
The role name of the metadata (e.g., 'root', 'targets',
'unclaimed').

expected_version:
An integer or "None" value representing the expected and required
version number of the 'metadata_role' file downloaded.

<Exceptions>
securesystemslib.exceptions.FormatError:
In case the metadata file is valid JSON, but not valid TUF metadata.
In case the metadata file is valid JSON, but not valid TUF metadata
or when the metadata spec version is not in the rigth format.

tuf.exceptions.InvalidMetadataJSONError:
In case the metadata file is not valid JSON.

tuf.exceptions.UnsupportedSpecificationError:
In case the metadata spec version is not supported.

tuf.exceptions.ReplayedMetadataError:
In case the downloaded metadata file is older than the current one.
In case the downloaded metadata file is older than the current one or
if expected_version is None and version_downloaded is lower than the
trusted current version.

tuf.exceptions.BadVersionNumberError:
In case the expected_version is not None and version_downloaded is not
equal to it.

tuf.exceptions.RepositoryError:
In case the repository is somehow inconsistent; e.g. a parent has not
Expand Down Expand Up @@ -1429,6 +1535,11 @@ def _verify_metadata_file(self, metadata_file_object,
# 'securesystemslib.exceptions.FormatError' if not.
tuf.formats.check_signable_object_format(metadata_signable)

self._validate_spec_version(metadata_signable['signed']['spec_version'])

self._validate_metadata_version(expected_version, metadata_role,
metadata_signable['signed']['version'])

# Is 'metadata_signable' expired?
self._ensure_not_expired(metadata_signable['signed'], metadata_role)

Expand Down Expand Up @@ -1482,8 +1593,8 @@ def _get_metadata_file(self, metadata_role, remote_filename,
downloaded.

expected_version:
The expected and required version number of the 'metadata_role' file
downloaded. 'expected_version' is an integer.
An integer representing the expected and required version number
of the 'metadata_role' file downloaded.

<Exceptions>
tuf.exceptions.NoWorkingMirrorError:
Expand All @@ -1510,85 +1621,9 @@ def _get_metadata_file(self, metadata_role, remote_filename,
try:
file_object = tuf.download.unsafe_download(file_mirror,
upperbound_filelength)
file_object.seek(0)

# Verify 'file_object' according to the callable function.
# 'file_object' is also verified if decompressed above (i.e., the
# uncompressed version).
metadata_signable = \
securesystemslib.util.load_json_string(file_object.read().decode('utf-8'))

# Determine if the specification version number is supported. It is
# assumed that "spec_version" is in (major.minor.fix) format, (for
# example: "1.4.3") and that releases with the same major version
# number maintain backwards compatibility. Consequently, if the major
# version number of new metadata equals our expected major version
# number, the new metadata is safe to parse.
try:
metadata_spec_version = metadata_signable['signed']['spec_version']
metadata_spec_version_split = metadata_spec_version.split('.')
metadata_spec_major_version = int(metadata_spec_version_split[0])
metadata_spec_minor_version = int(metadata_spec_version_split[1])

code_spec_version_split = tuf.SPECIFICATION_VERSION.split('.')
code_spec_major_version = int(code_spec_version_split[0])
code_spec_minor_version = int(code_spec_version_split[1])

if metadata_spec_major_version != code_spec_major_version:
raise tuf.exceptions.UnsupportedSpecificationError(
'Downloaded metadata that specifies an unsupported '
'spec_version. This code supports major version number: ' +
repr(code_spec_major_version) + '; however, the obtained '
'metadata lists version number: ' + str(metadata_spec_version))

#report to user if minor versions do not match, continue with update
if metadata_spec_minor_version != code_spec_minor_version:
logger.info("Downloaded metadata that specifies a different minor " +
"spec_version. This code has version " +
str(tuf.SPECIFICATION_VERSION) +
" and the metadata lists version number " +
str(metadata_spec_version) +
". The update will continue as the major versions match.")

except (ValueError, TypeError) as error:
six.raise_from(securesystemslib.exceptions.FormatError('Improperly'
' formatted spec_version, which must be in major.minor.fix format'),
error)

# If the version number is unspecified, ensure that the version number
# downloaded is greater than the currently trusted version number for
# 'metadata_role'.
version_downloaded = metadata_signable['signed']['version']

if expected_version is not None:
# Verify that the downloaded version matches the version expected by
# the caller.
if version_downloaded != expected_version:
raise tuf.exceptions.BadVersionNumberError('Downloaded'
' version number: ' + repr(version_downloaded) + '. Version'
' number MUST be: ' + repr(expected_version))

# The caller does not know which version to download. Verify that the
# downloaded version is at least greater than the one locally
# available.
else:
# Verify that the version number of the locally stored
# 'timestamp.json', if available, is less than what was downloaded.
# Otherwise, accept the new timestamp with version number
# 'version_downloaded'.

try:
current_version = \
self.metadata['current'][metadata_role]['version']

if version_downloaded < current_version:
raise tuf.exceptions.ReplayedMetadataError(metadata_role,
version_downloaded, current_version)

except KeyError:
logger.info(metadata_role + ' not available locally.')

self._verify_metadata_file(file_object, metadata_role)
self._verify_metadata_file(file_object, metadata_role,
expected_version)

except Exception as exception:
# Remember the error from this mirror, and "reset" the target file.
Expand Down