diff --git a/tests/test_endless_data_attack.py b/tests/test_endless_data_attack.py index 759119ae9d..d37e422e5e 100755 --- a/tests/test_endless_data_attack.py +++ b/tests/test_endless_data_attack.py @@ -269,8 +269,10 @@ def test_with_tuf(self): self.repository_updater.refresh() except tuf.exceptions.NoWorkingMirrorError as exception: - for mirror_url, mirror_error in six.iteritems(exception.mirror_errors): - self.assertTrue(isinstance(mirror_error, securesystemslib.exceptions.Error)) + for _, mirror_error in six.iteritems(exception.mirror_errors): + # Throw tuf.exceptions.InvalidMetadataJSONError + # because the metadata is not a valud JSON file. + self.assertTrue(isinstance(mirror_error, tuf.exceptions.InvalidMetadataJSONError)) else: self.fail('TUF did not prevent an endless data attack.') diff --git a/tests/test_updater.py b/tests/test_updater.py index bf66cc5ca2..b4286e0bda 100755 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -1742,10 +1742,55 @@ def test_11__verify_metadata_file(self): self.assertRaises(tuf.exceptions.InvalidMetadataJSONError, self.repository_updater._verify_metadata_file, - metadata_file_object, 'root') + metadata_file_object, 'root', None) - def test_13__targets_of_role(self): + def test_12__validate_metadata_version(self): + # Test for valid metadata version with expected_version. + self.repository_updater._validate_metadata_version( + expected_version=1, metadata_role='root', version_downloaded=1) + + # Test for valid metadata version without expected_version. + self.repository_updater._validate_metadata_version( + expected_version=None, metadata_role='root', version_downloaded=1) + + # Test for expected_version different than version downloaded. + self.assertRaises(tuf.exceptions.BadVersionNumberError, + self.repository_updater._validate_metadata_version, + expected_version=2, metadata_role='root', version_downloaded=1) + + # Test without expected_version and version_downloaded < current_version. + self.assertRaises(tuf.exceptions.ReplayedMetadataError, + self.repository_updater._validate_metadata_version, + expected_version=None, metadata_role='root', version_downloaded=0) + + + def test_13__validate_spec_version(self): + # Tests when metadata spec ver is compatible with tuf.SPECIFICATION_VERSION + + # metadata spec ver = tuf.SPECIFICATION_VERSION + self.repository_updater._validate_spec_version(tuf.SPECIFICATION_VERSION) + + code_spec_ver_split = tuf.SPECIFICATION_VERSION.split('.') + code_spec_major = int(code_spec_ver_split[0]) + code_spec_minor= int(code_spec_ver_split[1]) + + # metadata major ver is the same as tuf.SPECIFICATION_VERSION major ver + # but metadata minor ver != tuf.SPECIFICATION_VERSION minor ver + metadata_spec = [str(code_spec_major), str(code_spec_minor + 1), '0'] + separator = '.' + metadata_spec = separator.join(metadata_spec) + self.repository_updater._validate_spec_version(metadata_spec) + + # Test when metadata spec ver is NOT compatible + # with tuf.SPECIFICATION_VERSION + metadata_spec = [str(code_spec_major + 1), str(code_spec_minor), '0'] + metadata_spec = separator.join(metadata_spec) + self.assertRaises(tuf.exceptions.UnsupportedSpecificationError, + self.repository_updater._validate_spec_version, metadata_spec) + + + def test_15__targets_of_role(self): # Test case where a list of targets is given. By default, the 'targets' # parameter is None. targets = [{'filepath': 'file1.txt', 'fileinfo': {'length': 1, 'hashes': {'sha256': 'abc'}}}] diff --git a/tuf/client/updater.py b/tuf/client/updater.py index d9ffb1f0a9..c2aef4009a 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -1373,9 +1373,101 @@ def _verify_root_self_signed(self, signable): + def _validate_metadata_version(self, expected_version, metadata_role, + version_downloaded): + """ + + Non-public method validating the metadata version number. + If the expected_version is unspecified, ensure that the version number + downloaded is equal or greater than the currently trusted version number + for 'metadata_role'. + + + expected_version: + An integer or "None" value representing the expected and required + version number of the 'metadata_role' file downloaded. + + metadata_role: + The role name of the metadata (e.g., 'root', 'targets'). + + version_downloaded: + The version of the newly downloaded metadata file. + + + tuf.exceptions.BadVersionNumberError: + In case the expected_version is not None and version_downloaded + is not equal to it. + + tuf.exceptions.ReplayedMetadataError: + if expected_version is None and version_downloaded is lower than + the current version. + """ + + if expected_version is not None: + if version_downloaded != expected_version: + raise tuf.exceptions.BadVersionNumberError('Downloaded' + ' version number: ' + repr(version_downloaded) + '. Version' + ' number MUST be: ' + repr(expected_version)) + + else: + try: + current_version = self.metadata['current'][metadata_role]['version'] + + if version_downloaded < current_version: + raise tuf.exceptions.ReplayedMetadataError(metadata_role, + version_downloaded, current_version) + + except KeyError: + logger.info(metadata_role + ' not available locally.') + + + + def _validate_spec_version(self, metadata_spec_version_str): + """ + + Non-public method verifying a metadata specification version. + It is assumed that "spec_version" is in (major.minor.fix) format + and that releases with the same major version number maintain + backward compatibility. + + + metadata_spec_version: + A string representing the metadata spec version. It is assumed that + it is in semantic versioning (major.minor.fix) format. + + + tuf.exceptions.UnsupportedSpecificationError: + In case the metadata major spec version is not supported. + + securesystemslib.exceptions.FormatError: + In case the metadata_spec_version string is not in the expected format. + """ + + try: + metadata_spec_version = metadata_spec_version_str.split('.') + code_spec_version = tuf.SPECIFICATION_VERSION.split('.') + + if metadata_spec_version[0]!= code_spec_version[0]: + raise tuf.exceptions.UnsupportedSpecificationError( + 'Incompatible spec_version. Got "%s", expected "%s".' % + (metadata_spec_version_str, code_spec_version)) + + # Warn if spec_version minor version number does not not match + if metadata_spec_version[1]!= code_spec_version[1]: + logger.info("Received metadata has minor version mismatch. Got %s" + ", expected %s. Continuing" % (metadata_spec_version_str, + tuf.SPECIFICATION_VERSION)) + + except (ValueError, TypeError) as error: + six.raise_from(securesystemslib.exceptions.FormatError('Improperly' + ' formatted spec_version, Must be in semver (major.minor.patch)' + 'format'), + error) + + def _verify_metadata_file(self, metadata_file_object, - metadata_role): + metadata_role, expected_version): """ Non-public method that verifies a metadata file. An exception is @@ -1390,15 +1482,29 @@ def _verify_metadata_file(self, metadata_file_object, The role name of the metadata (e.g., 'root', 'targets', 'unclaimed'). + expected_version: + An integer or "None" value representing the expected and required + version number of the 'metadata_role' file downloaded. + securesystemslib.exceptions.FormatError: - In case the metadata file is valid JSON, but not valid TUF metadata. + In case the metadata file is valid JSON, but not valid TUF metadata + or when the metadata spec version is not in the rigth format. tuf.exceptions.InvalidMetadataJSONError: In case the metadata file is not valid JSON. + tuf.exceptions.UnsupportedSpecificationError: + In case the metadata spec version is not supported. + tuf.exceptions.ReplayedMetadataError: - In case the downloaded metadata file is older than the current one. + In case the downloaded metadata file is older than the current one or + if expected_version is None and version_downloaded is lower than the + trusted current version. + + tuf.exceptions.BadVersionNumberError: + In case the expected_version is not None and version_downloaded is not + equal to it. tuf.exceptions.RepositoryError: In case the repository is somehow inconsistent; e.g. a parent has not @@ -1429,6 +1535,11 @@ def _verify_metadata_file(self, metadata_file_object, # 'securesystemslib.exceptions.FormatError' if not. tuf.formats.check_signable_object_format(metadata_signable) + self._validate_spec_version(metadata_signable['signed']['spec_version']) + + self._validate_metadata_version(expected_version, metadata_role, + metadata_signable['signed']['version']) + # Is 'metadata_signable' expired? self._ensure_not_expired(metadata_signable['signed'], metadata_role) @@ -1482,8 +1593,8 @@ def _get_metadata_file(self, metadata_role, remote_filename, downloaded. expected_version: - The expected and required version number of the 'metadata_role' file - downloaded. 'expected_version' is an integer. + An integer representing the expected and required version number + of the 'metadata_role' file downloaded. tuf.exceptions.NoWorkingMirrorError: @@ -1510,85 +1621,9 @@ def _get_metadata_file(self, metadata_role, remote_filename, try: file_object = tuf.download.unsafe_download(file_mirror, upperbound_filelength) - file_object.seek(0) - - # Verify 'file_object' according to the callable function. - # 'file_object' is also verified if decompressed above (i.e., the - # uncompressed version). - metadata_signable = \ - securesystemslib.util.load_json_string(file_object.read().decode('utf-8')) - - # Determine if the specification version number is supported. It is - # assumed that "spec_version" is in (major.minor.fix) format, (for - # example: "1.4.3") and that releases with the same major version - # number maintain backwards compatibility. Consequently, if the major - # version number of new metadata equals our expected major version - # number, the new metadata is safe to parse. - try: - metadata_spec_version = metadata_signable['signed']['spec_version'] - metadata_spec_version_split = metadata_spec_version.split('.') - metadata_spec_major_version = int(metadata_spec_version_split[0]) - metadata_spec_minor_version = int(metadata_spec_version_split[1]) - - code_spec_version_split = tuf.SPECIFICATION_VERSION.split('.') - code_spec_major_version = int(code_spec_version_split[0]) - code_spec_minor_version = int(code_spec_version_split[1]) - - if metadata_spec_major_version != code_spec_major_version: - raise tuf.exceptions.UnsupportedSpecificationError( - 'Downloaded metadata that specifies an unsupported ' - 'spec_version. This code supports major version number: ' + - repr(code_spec_major_version) + '; however, the obtained ' - 'metadata lists version number: ' + str(metadata_spec_version)) - - #report to user if minor versions do not match, continue with update - if metadata_spec_minor_version != code_spec_minor_version: - logger.info("Downloaded metadata that specifies a different minor " + - "spec_version. This code has version " + - str(tuf.SPECIFICATION_VERSION) + - " and the metadata lists version number " + - str(metadata_spec_version) + - ". The update will continue as the major versions match.") - - except (ValueError, TypeError) as error: - six.raise_from(securesystemslib.exceptions.FormatError('Improperly' - ' formatted spec_version, which must be in major.minor.fix format'), - error) - - # If the version number is unspecified, ensure that the version number - # downloaded is greater than the currently trusted version number for - # 'metadata_role'. - version_downloaded = metadata_signable['signed']['version'] - - if expected_version is not None: - # Verify that the downloaded version matches the version expected by - # the caller. - if version_downloaded != expected_version: - raise tuf.exceptions.BadVersionNumberError('Downloaded' - ' version number: ' + repr(version_downloaded) + '. Version' - ' number MUST be: ' + repr(expected_version)) - - # The caller does not know which version to download. Verify that the - # downloaded version is at least greater than the one locally - # available. - else: - # Verify that the version number of the locally stored - # 'timestamp.json', if available, is less than what was downloaded. - # Otherwise, accept the new timestamp with version number - # 'version_downloaded'. - - try: - current_version = \ - self.metadata['current'][metadata_role]['version'] - - if version_downloaded < current_version: - raise tuf.exceptions.ReplayedMetadataError(metadata_role, - version_downloaded, current_version) - - except KeyError: - logger.info(metadata_role + ' not available locally.') - self._verify_metadata_file(file_object, metadata_role) + self._verify_metadata_file(file_object, metadata_role, + expected_version) except Exception as exception: # Remember the error from this mirror, and "reset" the target file.