Skip to content

Commit 9beb8a3

Browse files
committed
Metadata API: add dsse support
Add TUF-specific DSSE (`Envelope`) implementation and define abstract interface (`BaseMetadata`) for common `Envelope` and `Metadata` operations: - get_payload() -> Signed - sign() -> Signature - verify_delegate() -> None **Details** - `Envelope` inherits and calls generic methods from base `Envelope` in securesystemslib to sign and verify using the DSSE protocol. - `Envelope` overrides `sign` to add an `append` option, which is not available in the base `Envelope`. - `Envelope` provides a `from_signed` factory method, which serializes a `Signed` instance as payload. - `Envelope.get_payload` takes a `SignedDeserializer` instance to deserialize the payload contents (default: `SignedJSONDeserializer`). `Metadata.get_payload` just returns the already deserialized `signed` attribute. - `Metadata.[sign|verify_delegate]` methods take a `SignedSerializer` instance to serialize the payload prior to signing/verifying (default: `CanonicalJSONSerializer`). `Envelope.[sign|verify_delegate]` just signs/verifies the already serialized payload. - `BaseMetadata` subclasses inherit `[to|from]_[bytes|file]` convenience methods from `SerializationMixin`. In turn they must provide `_default_[de|s]erializer`s to be used by those methods. - `BaseMetadata` provides default `JSON[Des|S]erializer` for both `Envelope` and `Metadata`. - `JSONSerializer` requires a class to implement a `to_dict` method, which is defined by the `JSONSerializable` interface. `BaseMetadata` classes are `JSONSerializable`. - `JSONDeserializer` can deserialize json bytes into both `Envelope` and `Metadata`. It case handles based on the presence of a certain field. Signed-off-by: Lukas Puehringer <lukas.puehringer@nyu.edu>
1 parent 7319f29 commit 9beb8a3

File tree

3 files changed

+200
-60
lines changed

3 files changed

+200
-60
lines changed

tuf/api/metadata.py

Lines changed: 141 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,16 @@
5151

5252
from securesystemslib import exceptions as sslib_exceptions
5353
from securesystemslib import hash as sslib_hash
54+
from securesystemslib.metadata import Envelope as BaseEnvelope
5455
from securesystemslib.serialization import JSONSerializable
5556
from securesystemslib.signer import Key, Signature, Signer
5657

5758
from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError
5859
from tuf.api.serialization import (
59-
MetadataDeserializer,
60-
MetadataSerializer,
60+
BaseDeserializer,
61+
BaseSerializer,
6162
SerializationMixin,
63+
SignedDeserializer,
6264
SignedSerializer,
6365
)
6466

@@ -80,7 +82,138 @@
8082
T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")
8183

8284

83-
class Metadata(Generic[T], JSONSerializable, SerializationMixin):
85+
class BaseMetadata(SerializationMixin, JSONSerializable, metaclass=abc.ABCMeta):
86+
"""A common metadata interface for Envelope (DSSE) and Metadata objects."""
87+
88+
@staticmethod
89+
def _default_deserializer() -> BaseDeserializer:
90+
"""Default deserializer for Serialization Mixin."""
91+
# pylint: disable=import-outside-toplevel
92+
from tuf.api.serialization.json import JSONDeserializer
93+
94+
return JSONDeserializer()
95+
96+
@staticmethod
97+
def _default_serializer() -> BaseSerializer:
98+
"""Default serializer for Serialization Mixin."""
99+
# pylint: disable=import-outside-toplevel
100+
from tuf.api.serialization.json import JSONSerializer
101+
102+
return JSONSerializer(compact=True)
103+
104+
@staticmethod
105+
def _get_role_and_keys(
106+
signed: "Signed", delegated_role: str
107+
) -> Tuple["Role", Dict[str, Key]]:
108+
"""Return the keys and role for delegated_role"""
109+
110+
role: Optional[Role] = None
111+
if isinstance(signed, Root):
112+
keys = signed.keys
113+
role = signed.roles.get(delegated_role)
114+
elif isinstance(signed, Targets):
115+
if signed.delegations is None:
116+
raise ValueError(f"No delegation found for {delegated_role}")
117+
118+
keys = signed.delegations.keys
119+
if signed.delegations.roles is not None:
120+
role = signed.delegations.roles.get(delegated_role)
121+
elif signed.delegations.succinct_roles is not None:
122+
if signed.delegations.succinct_roles.is_delegated_role(
123+
delegated_role
124+
):
125+
role = signed.delegations.succinct_roles
126+
else:
127+
raise TypeError("Call is valid only on delegator metadata")
128+
129+
if role is None:
130+
raise ValueError(f"No delegation found for {delegated_role}")
131+
132+
return (role, keys)
133+
134+
@abc.abstractmethod
135+
def get_signed(self) -> "Signed":
136+
raise NotImplementedError
137+
138+
@abc.abstractmethod
139+
def sign(
140+
self,
141+
signer: Signer,
142+
) -> Signature:
143+
raise NotImplementedError
144+
145+
@abc.abstractmethod
146+
def verify_delegate(
147+
self,
148+
delegated_role: str,
149+
delegated_metadata: "BaseMetadata",
150+
) -> None:
151+
raise NotImplementedError
152+
153+
154+
class Envelope(Generic[T], BaseMetadata, BaseEnvelope):
155+
"""DSSE Envelope for tuf payloads."""
156+
157+
DEFAULT_PAYLOAD_TYPE = "application/vnd.tuf"
158+
159+
@classmethod
160+
def from_signed(
161+
cls, signed: "Signed", serializer: SignedSerializer = None
162+
) -> "Envelope":
163+
"""Creates DSSE envelope with signed bytes as payload."""
164+
165+
if serializer is None:
166+
# Use local scope import to avoid circular import errors
167+
# pylint: disable=import-outside-toplevel
168+
from tuf.api.serialization.json import JSONSerializer
169+
170+
serializer = JSONSerializer(compact=True)
171+
172+
return cls(
173+
payload=serializer.serialize(signed),
174+
payload_type=cls.DEFAULT_PAYLOAD_TYPE,
175+
signatures=[],
176+
)
177+
178+
def get_signed(self, deserializer: SignedDeserializer = None) -> "Signed":
179+
if deserializer is None:
180+
# Use local scope import to avoid circular import errors
181+
# pylint: disable=import-outside-toplevel
182+
from tuf.api.serialization.json import SignedJSONDeserializer
183+
184+
deserializer = SignedJSONDeserializer()
185+
186+
return BaseEnvelope.get_payload(self, deserializer)
187+
188+
def sign(
189+
self,
190+
signer: Signer,
191+
append: bool = False,
192+
) -> Signature:
193+
194+
if not append:
195+
self.signatures.clear()
196+
197+
return BaseEnvelope.sign(self, signer)
198+
199+
def verify_delegate(
200+
self,
201+
delegated_role: str,
202+
delegated_metadata: "BaseMetadata",
203+
) -> None:
204+
signed = self.get_signed(None)
205+
role, keys = self._get_role_and_keys(signed, delegated_role)
206+
207+
try:
208+
_ = BaseEnvelope.verify(
209+
delegated_metadata, keys.values(), role.threshold
210+
)
211+
212+
except sslib_exceptions.UnverifiedSignatureError as e:
213+
raise UnsignedMetadataError from e
214+
215+
216+
class Metadata(Generic[T], BaseMetadata):
84217
"""A container for signed TUF metadata.
85218
86219
Provides methods to convert to and from dictionary, read and write to and
@@ -149,6 +282,9 @@ def __eq__(self, other: Any) -> bool:
149282
and self.unrecognized_fields == other.unrecognized_fields
150283
)
151284

285+
def get_signed(self) -> "Signed":
286+
return self.signed
287+
152288
@classmethod
153289
def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
154290
"""Creates ``Metadata`` object from its json/dict representation.
@@ -198,22 +334,6 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]":
198334
unrecognized_fields=metadata,
199335
)
200336

201-
@staticmethod
202-
def _default_deserializer() -> MetadataDeserializer:
203-
"""Default Deserializer to be used for deserialization."""
204-
# pylint: disable=import-outside-toplevel
205-
from tuf.api.serialization.json import JSONDeserializer
206-
207-
return JSONDeserializer()
208-
209-
@staticmethod
210-
def _default_serializer() -> MetadataSerializer:
211-
"""Default Serializer to be used for serialization."""
212-
# pylint: disable=import-outside-toplevel
213-
from tuf.api.serialization.json import JSONSerializer
214-
215-
return JSONSerializer(compact=True)
216-
217337
def to_dict(self) -> Dict[str, Any]:
218338
"""Returns the dict representation of self."""
219339

@@ -275,35 +395,6 @@ def sign(
275395

276396
return signature
277397

278-
def _get_role_and_keys(
279-
self, delegated_role: str
280-
) -> Tuple["Role", Dict[str, Key]]:
281-
"""Return the keys and role for delegated_role"""
282-
283-
role: Optional[Role] = None
284-
if isinstance(self.signed, Root):
285-
keys = self.signed.keys
286-
role = self.signed.roles.get(delegated_role)
287-
elif isinstance(self.signed, Targets):
288-
if self.signed.delegations is None:
289-
raise ValueError(f"No delegation found for {delegated_role}")
290-
291-
keys = self.signed.delegations.keys
292-
if self.signed.delegations.roles is not None:
293-
role = self.signed.delegations.roles.get(delegated_role)
294-
elif self.signed.delegations.succinct_roles is not None:
295-
if self.signed.delegations.succinct_roles.is_delegated_role(
296-
delegated_role
297-
):
298-
role = self.signed.delegations.succinct_roles
299-
else:
300-
raise TypeError("Call is valid only on delegator metadata")
301-
302-
if role is None:
303-
raise ValueError(f"No delegation found for {delegated_role}")
304-
305-
return (role, keys)
306-
307398
def verify_delegate(
308399
self,
309400
delegated_role: str,
@@ -333,14 +424,15 @@ def verify_delegate(
333424
signed_serializer = CanonicalJSONSerializer()
334425

335426
data = signed_serializer.serialize(delegated_metadata.signed)
336-
role, keys = self._get_role_and_keys(delegated_role)
427+
role, keys = self._get_role_and_keys(self.signed, delegated_role)
337428

338429
# verify that delegated_metadata is signed by threshold of unique keys
339430
signing_keys = set()
340431
for keyid in role.keyids:
341432
if keyid not in keys:
342433
logger.info("No key for keyid %s", keyid)
343434
continue
435+
344436
if keyid not in delegated_metadata.signatures:
345437
logger.info("No signature for keyid %s", keyid)
346438
continue

tuf/api/serialization/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
MetadataSerializer: TypeAlias = BaseSerializer
3333
MetadataDeserializer: TypeAlias = BaseDeserializer
3434
SignedSerializer: TypeAlias = BaseSerializer
35+
SignedDeserializer: TypeAlias = BaseDeserializer
3536

3637

3738
class SerializationError(RepositoryError):

tuf/api/serialization/json.py

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
metadata to the OLPC Canonical JSON format for signature generation and
88
verification.
99
"""
10-
from typing import Optional
10+
from typing import Optional, Type
1111

1212
from securesystemslib.formats import encode_canonical
1313
from securesystemslib.serialization import (
@@ -19,7 +19,16 @@
1919
# ... to allow de/serializing Metadata and Signed objects here, while also
2020
# creating default de/serializers there (see metadata local scope imports).
2121
# NOTE: A less desirable alternative would be to add more abstraction layers.
22-
from tuf.api.metadata import Metadata, Signed
22+
from tuf.api.metadata import (
23+
BaseMetadata,
24+
Envelope,
25+
Metadata,
26+
Root,
27+
Signed,
28+
Snapshot,
29+
Targets,
30+
Timestamp,
31+
)
2332
from tuf.api.serialization import (
2433
DeserializationError,
2534
SerializationError,
@@ -28,23 +37,31 @@
2837

2938

3039
class JSONDeserializer(BaseJSONDeserializer):
31-
"""Provides JSON to Metadata deserialize method."""
40+
"""Provides JSON to ``BaseMetadata`` deserialize method."""
41+
42+
def deserialize(self, raw_data: bytes) -> BaseMetadata:
43+
"""Deserialize utf-8 encoded JSON bytes into ``BaseMetadata`` instance.
3244
33-
def deserialize(self, raw_data: bytes) -> Metadata:
34-
"""Deserialize utf-8 encoded JSON bytes into Metadata object."""
45+
Creates ``Metadata`` or ``Envelope`` instance based on presence of
46+
``payload`` or ``signed`` field."""
3547

3648
try:
3749
json_dict = super().deserialize(raw_data)
38-
metadata_obj = Metadata.from_dict(json_dict)
50+
51+
if "payload" in json_dict:
52+
return Envelope.from_dict(json_dict)
53+
54+
if "signed" in json_dict:
55+
return Metadata.from_dict(json_dict)
56+
57+
raise ValueError("unrecognized metadata")
3958

4059
except Exception as e:
4160
raise DeserializationError("Failed to deserialize JSON") from e
4261

43-
return metadata_obj
44-
4562

4663
class JSONSerializer(BaseJSONSerializer):
47-
"""Provides Metadata to JSON serialize method.
64+
"""Provides ``BaseMetadata`` to JSON serialize method.
4865
4966
Args:
5067
compact: A boolean indicating if the JSON bytes generated in
@@ -59,8 +76,8 @@ def __init__(self, compact: bool = False, validate: Optional[bool] = False):
5976
super().__init__(compact)
6077
self.validate = validate
6178

62-
def serialize(self, obj: Metadata) -> bytes:
63-
"""Serialize Metadata object into utf-8 encoded JSON bytes."""
79+
def serialize(self, obj: BaseMetadata) -> bytes:
80+
"""Serialize ``BaseMetadata`` object into utf-8 encoded JSON bytes."""
6481

6582
try:
6683
json_bytes = BaseJSONSerializer.serialize(self, obj)
@@ -81,6 +98,36 @@ def serialize(self, obj: Metadata) -> bytes:
8198
return json_bytes
8299

83100

101+
class SignedJSONDeserializer(BaseJSONDeserializer):
102+
"""Provides JSON to ``Signed`` deserialize method."""
103+
104+
def deserialize(self, raw_data: bytes) -> Signed:
105+
"""Deserialize utf-8 encoded JSON bytes into ``Signed`` instance.
106+
107+
Creates ``Targets``, ``Snapshot``, ``Timestamp`` or ``Root`` instance
108+
based on value in ``_type`` field."""
109+
try:
110+
json_dict = super().deserialize(raw_data)
111+
112+
_type = json_dict["_type"]
113+
114+
if _type == Targets.type:
115+
_cls: Type[Signed] = Targets
116+
elif _type == Snapshot.type:
117+
_cls = Snapshot
118+
elif _type == Timestamp.type:
119+
_cls = Timestamp
120+
elif _type == Root.type:
121+
_cls = Root
122+
else:
123+
raise ValueError(f'unrecognized metadata type "{_type}"')
124+
125+
except Exception as e:
126+
raise SerializationError("Failed to serialize JSON") from e
127+
128+
return _cls.from_dict(json_dict)
129+
130+
84131
class CanonicalJSONSerializer(SignedSerializer):
85132
"""Provides Signed to OLPC Canonical JSON serialize method."""
86133

0 commit comments

Comments
 (0)