|
51 | 51 |
|
52 | 52 | from securesystemslib import exceptions as sslib_exceptions
|
53 | 53 | from securesystemslib import hash as sslib_hash
|
| 54 | +from securesystemslib.metadata import Envelope as BaseEnvelope |
54 | 55 | from securesystemslib.serialization import JSONSerializable
|
55 | 56 | from securesystemslib.signer import Key, Signature, Signer
|
56 | 57 |
|
57 | 58 | from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError
|
58 | 59 | from tuf.api.serialization import (
|
59 |
| - MetadataDeserializer, |
60 |
| - MetadataSerializer, |
| 60 | + BaseDeserializer, |
| 61 | + BaseSerializer, |
61 | 62 | SerializationMixin,
|
| 63 | + SignedDeserializer, |
62 | 64 | SignedSerializer,
|
63 | 65 | )
|
64 | 66 |
|
|
80 | 82 | T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")
|
81 | 83 |
|
82 | 84 |
|
83 |
| -class Metadata(Generic[T], JSONSerializable, SerializationMixin): |
| 85 | +class BaseMetadata(SerializationMixin, JSONSerializable, metaclass=abc.ABCMeta): |
| 86 | + """A common metadata interface for Envelope (DSSE) and Metadata objects.""" |
| 87 | + |
| 88 | + @staticmethod |
| 89 | + def _default_deserializer() -> BaseDeserializer: |
| 90 | + """Default deserializer for Serialization Mixin.""" |
| 91 | + # pylint: disable=import-outside-toplevel |
| 92 | + from tuf.api.serialization.json import JSONDeserializer |
| 93 | + |
| 94 | + return JSONDeserializer() |
| 95 | + |
| 96 | + @staticmethod |
| 97 | + def _default_serializer() -> BaseSerializer: |
| 98 | + """Default serializer for Serialization Mixin.""" |
| 99 | + # pylint: disable=import-outside-toplevel |
| 100 | + from tuf.api.serialization.json import JSONSerializer |
| 101 | + |
| 102 | + return JSONSerializer(compact=True) |
| 103 | + |
| 104 | + @staticmethod |
| 105 | + def _get_role_and_keys( |
| 106 | + signed: "Signed", delegated_role: str |
| 107 | + ) -> Tuple["Role", Dict[str, Key]]: |
| 108 | + """Return the keys and role for delegated_role""" |
| 109 | + |
| 110 | + role: Optional[Role] = None |
| 111 | + if isinstance(signed, Root): |
| 112 | + keys = signed.keys |
| 113 | + role = signed.roles.get(delegated_role) |
| 114 | + elif isinstance(signed, Targets): |
| 115 | + if signed.delegations is None: |
| 116 | + raise ValueError(f"No delegation found for {delegated_role}") |
| 117 | + |
| 118 | + keys = signed.delegations.keys |
| 119 | + if signed.delegations.roles is not None: |
| 120 | + role = signed.delegations.roles.get(delegated_role) |
| 121 | + elif signed.delegations.succinct_roles is not None: |
| 122 | + if signed.delegations.succinct_roles.is_delegated_role( |
| 123 | + delegated_role |
| 124 | + ): |
| 125 | + role = signed.delegations.succinct_roles |
| 126 | + else: |
| 127 | + raise TypeError("Call is valid only on delegator metadata") |
| 128 | + |
| 129 | + if role is None: |
| 130 | + raise ValueError(f"No delegation found for {delegated_role}") |
| 131 | + |
| 132 | + return (role, keys) |
| 133 | + |
| 134 | + @abc.abstractmethod |
| 135 | + def get_signed(self) -> "Signed": |
| 136 | + raise NotImplementedError |
| 137 | + |
| 138 | + @abc.abstractmethod |
| 139 | + def sign( |
| 140 | + self, |
| 141 | + signer: Signer, |
| 142 | + ) -> Signature: |
| 143 | + raise NotImplementedError |
| 144 | + |
| 145 | + @abc.abstractmethod |
| 146 | + def verify_delegate( |
| 147 | + self, |
| 148 | + delegated_role: str, |
| 149 | + delegated_metadata: "BaseMetadata", |
| 150 | + ) -> None: |
| 151 | + raise NotImplementedError |
| 152 | + |
| 153 | + |
| 154 | +class Envelope(Generic[T], BaseMetadata, BaseEnvelope): |
| 155 | + """DSSE Envelope for tuf payloads.""" |
| 156 | + |
| 157 | + DEFAULT_PAYLOAD_TYPE = "application/vnd.tuf" |
| 158 | + |
| 159 | + @classmethod |
| 160 | + def from_signed( |
| 161 | + cls, signed: "Signed", serializer: SignedSerializer = None |
| 162 | + ) -> "Envelope": |
| 163 | + """Creates DSSE envelope with signed bytes as payload.""" |
| 164 | + |
| 165 | + if serializer is None: |
| 166 | + # Use local scope import to avoid circular import errors |
| 167 | + # pylint: disable=import-outside-toplevel |
| 168 | + from tuf.api.serialization.json import JSONSerializer |
| 169 | + |
| 170 | + serializer = JSONSerializer(compact=True) |
| 171 | + |
| 172 | + return cls( |
| 173 | + payload=serializer.serialize(signed), |
| 174 | + payload_type=cls.DEFAULT_PAYLOAD_TYPE, |
| 175 | + signatures=[], |
| 176 | + ) |
| 177 | + |
| 178 | + def get_signed(self, deserializer: SignedDeserializer = None) -> "Signed": |
| 179 | + if deserializer is None: |
| 180 | + # Use local scope import to avoid circular import errors |
| 181 | + # pylint: disable=import-outside-toplevel |
| 182 | + from tuf.api.serialization.json import SignedJSONDeserializer |
| 183 | + |
| 184 | + deserializer = SignedJSONDeserializer() |
| 185 | + |
| 186 | + return BaseEnvelope.get_payload(self, deserializer) |
| 187 | + |
| 188 | + def sign( |
| 189 | + self, |
| 190 | + signer: Signer, |
| 191 | + append: bool = False, |
| 192 | + ) -> Signature: |
| 193 | + |
| 194 | + if not append: |
| 195 | + self.signatures.clear() |
| 196 | + |
| 197 | + return BaseEnvelope.sign(self, signer) |
| 198 | + |
| 199 | + def verify_delegate( |
| 200 | + self, |
| 201 | + delegated_role: str, |
| 202 | + delegated_metadata: "BaseMetadata", |
| 203 | + ) -> None: |
| 204 | + signed = self.get_signed(None) |
| 205 | + role, keys = self._get_role_and_keys(signed, delegated_role) |
| 206 | + |
| 207 | + try: |
| 208 | + _ = BaseEnvelope.verify( |
| 209 | + delegated_metadata, keys.values(), role.threshold |
| 210 | + ) |
| 211 | + |
| 212 | + except sslib_exceptions.UnverifiedSignatureError as e: |
| 213 | + raise UnsignedMetadataError from e |
| 214 | + |
| 215 | + |
| 216 | +class Metadata(Generic[T], BaseMetadata): |
84 | 217 | """A container for signed TUF metadata.
|
85 | 218 |
|
86 | 219 | Provides methods to convert to and from dictionary, read and write to and
|
@@ -149,6 +282,9 @@ def __eq__(self, other: Any) -> bool:
|
149 | 282 | and self.unrecognized_fields == other.unrecognized_fields
|
150 | 283 | )
|
151 | 284 |
|
| 285 | + def get_signed(self) -> "Signed": |
| 286 | + return self.signed |
| 287 | + |
152 | 288 | @classmethod
|
153 | 289 | def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
|
154 | 290 | """Creates ``Metadata`` object from its json/dict representation.
|
@@ -198,22 +334,6 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
|
198 | 334 | unrecognized_fields=metadata,
|
199 | 335 | )
|
200 | 336 |
|
201 |
| - @staticmethod |
202 |
| - def _default_deserializer() -> MetadataDeserializer: |
203 |
| - """Default Deserializer to be used for deserialization.""" |
204 |
| - # pylint: disable=import-outside-toplevel |
205 |
| - from tuf.api.serialization.json import JSONDeserializer |
206 |
| - |
207 |
| - return JSONDeserializer() |
208 |
| - |
209 |
| - @staticmethod |
210 |
| - def _default_serializer() -> MetadataSerializer: |
211 |
| - """Default Serializer to be used for serialization.""" |
212 |
| - # pylint: disable=import-outside-toplevel |
213 |
| - from tuf.api.serialization.json import JSONSerializer |
214 |
| - |
215 |
| - return JSONSerializer(compact=True) |
216 |
| - |
217 | 337 | def to_dict(self) -> Dict[str, Any]:
|
218 | 338 | """Returns the dict representation of self."""
|
219 | 339 |
|
@@ -275,35 +395,6 @@ def sign(
|
275 | 395 |
|
276 | 396 | return signature
|
277 | 397 |
|
278 |
| - def _get_role_and_keys( |
279 |
| - self, delegated_role: str |
280 |
| - ) -> Tuple["Role", Dict[str, Key]]: |
281 |
| - """Return the keys and role for delegated_role""" |
282 |
| - |
283 |
| - role: Optional[Role] = None |
284 |
| - if isinstance(self.signed, Root): |
285 |
| - keys = self.signed.keys |
286 |
| - role = self.signed.roles.get(delegated_role) |
287 |
| - elif isinstance(self.signed, Targets): |
288 |
| - if self.signed.delegations is None: |
289 |
| - raise ValueError(f"No delegation found for {delegated_role}") |
290 |
| - |
291 |
| - keys = self.signed.delegations.keys |
292 |
| - if self.signed.delegations.roles is not None: |
293 |
| - role = self.signed.delegations.roles.get(delegated_role) |
294 |
| - elif self.signed.delegations.succinct_roles is not None: |
295 |
| - if self.signed.delegations.succinct_roles.is_delegated_role( |
296 |
| - delegated_role |
297 |
| - ): |
298 |
| - role = self.signed.delegations.succinct_roles |
299 |
| - else: |
300 |
| - raise TypeError("Call is valid only on delegator metadata") |
301 |
| - |
302 |
| - if role is None: |
303 |
| - raise ValueError(f"No delegation found for {delegated_role}") |
304 |
| - |
305 |
| - return (role, keys) |
306 |
| - |
307 | 398 | def verify_delegate(
|
308 | 399 | self,
|
309 | 400 | delegated_role: str,
|
@@ -333,14 +424,15 @@ def verify_delegate(
|
333 | 424 | signed_serializer = CanonicalJSONSerializer()
|
334 | 425 |
|
335 | 426 | data = signed_serializer.serialize(delegated_metadata.signed)
|
336 |
| - role, keys = self._get_role_and_keys(delegated_role) |
| 427 | + role, keys = self._get_role_and_keys(self.signed, delegated_role) |
337 | 428 |
|
338 | 429 | # verify that delegated_metadata is signed by threshold of unique keys
|
339 | 430 | signing_keys = set()
|
340 | 431 | for keyid in role.keyids:
|
341 | 432 | if keyid not in keys:
|
342 | 433 | logger.info("No key for keyid %s", keyid)
|
343 | 434 | continue
|
| 435 | + |
344 | 436 | if keyid not in delegated_metadata.signatures:
|
345 | 437 | logger.info("No signature for keyid %s", keyid)
|
346 | 438 | continue
|
|
0 commit comments