Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 47 additions & 27 deletions sigstore/_internal/fulcio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@
from typing import List, Optional
from urllib.parse import urljoin

import pem
import requests
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.x509 import (
Certificate,
CertificateSigningRequest,
ExtensionNotFound,
PrecertificateSignedCertificateTimestamps,
load_pem_x509_certificate,
)
Expand All @@ -51,8 +49,8 @@

DEFAULT_FULCIO_URL = "https://fulcio.sigstore.dev"
STAGING_FULCIO_URL = "https://fulcio.sigstage.dev"
SIGNING_CERT_ENDPOINT = "/api/v1/signingCert"
ROOT_CERT_ENDPOINT = "/api/v1/rootCert"
SIGNING_CERT_ENDPOINT = "/api/v2/signingCert"
TRUST_BUNDLE_ENDPOINT = "/api/v2/trustBundle"


class SCTHashAlgorithm(IntEnum):
Expand Down Expand Up @@ -162,10 +160,10 @@ class FulcioCertificateSigningResponse:


@dataclass(frozen=True)
class FulcioRootResponse:
"""Root certificate response"""
class FulcioTrustBundleResponse:
"""Trust bundle response, containing a list of certificate chains"""

root_cert: Certificate
trust_bundle: List[List[Certificate]]


class FulcioClientError(Exception):
Expand Down Expand Up @@ -212,16 +210,29 @@ def post(
except (AttributeError, KeyError):
raise FulcioClientError from http_error

if resp.json().get("signedCertificateEmbeddedSct"):
sct_embedded = True
try:
certificates = resp.json()["signedCertificateEmbeddedSct"]["chain"]["certificates"]
except KeyError:
raise FulcioClientError("Fulcio response missing certificate chain")
else:
sct_embedded = False
try:
certificates = resp.json()["signedCertificateDetachedSct"]["chain"]["certificates"]
except KeyError:
raise FulcioClientError("Fulcio response missing certificate chain")

# Cryptography doesn't have chain verification/building built in
# https://github.com/pyca/cryptography/issues/2381
try:
cert_pem, *chain_pems = pem.parse(resp.content)
cert = load_pem_x509_certificate(cert_pem.as_bytes())
chain = [load_pem_x509_certificate(c.as_bytes()) for c in chain_pems]
except ValueError:
raise FulcioClientError(f"Did not find a cert in Fulcio response: {resp}")
if len(certificates) < 2:
raise FulcioClientError(
f"Certificate chain is too short: {len(certificates)} < 2"
)
cert = load_pem_x509_certificate(certificates[0].encode())
chain = [load_pem_x509_certificate(c.encode()) for c in certificates[1:]]

try:
if sct_embedded:
# Try to retrieve the embedded SCTs within the cert.
precert_scts_extension = cert.extensions.get_extension_for_class(
PrecertificateSignedCertificateTimestamps
Expand Down Expand Up @@ -262,16 +273,17 @@ def _opaque16(value: bytes) -> bytes:
raw_sct = _opaque16(_opaque16(raw_sct_list_bytes))

sct = precert_scts_extension[0]
except ExtensionNotFound:
else:
# If we don't have any embedded SCTs, then we might be dealing
# with a Fulcio instance that provides detached SCTs.

# The SCT header is a base64-encoded payload, which in turn
# The detached SCT is a base64-encoded payload, which in turn
# is a JSON representation of the SignedCertificateTimestamp
# in RFC 6962 (subsec. 3.2).
sct_b64 = resp.headers.get("SCT")
if sct_b64 is None:
raise FulcioClientError("Fulcio response did not include a SCT header")
try:
sct_b64 = resp.json()["signedCertificateDetachedSct"]["signedCertificateTimestamp"]
except KeyError:
raise FulcioClientError("Fulcio response did not include a detached SCT")

try:
sct_json = json.loads(base64.b64decode(sct_b64).decode())
Expand All @@ -291,16 +303,24 @@ def _opaque16(value: bytes) -> bytes:
return FulcioCertificateSigningResponse(cert, chain, sct, raw_sct)


class FulcioRootCert(Endpoint):
def get(self) -> FulcioRootResponse:
"""Get the root certificate"""
class FulcioTrustBundle(Endpoint):
def get(self) -> FulcioTrustBundleResponse:
"""Get the certificate chains from Fulcio"""
resp: requests.Response = self.session.get(self.url)
try:
resp.raise_for_status()
except requests.HTTPError as http_error:
raise FulcioClientError from http_error
root_cert: Certificate = load_pem_x509_certificate(resp.content)
return FulcioRootResponse(root_cert)

trust_bundle_json = resp.json()
chains: List[List[Certificate]] = []
for certificate_chain in trust_bundle_json["chains"]:
chain: List[Certificate] = []
for certificate in certificate_chain["certificates"]:
cert: Certificate = load_pem_x509_certificate(certificate.encode())
chain.append(cert)
chains.append(chain)
return FulcioTrustBundleResponse(chains)


class FulcioClient:
Expand All @@ -327,7 +347,7 @@ def signing_cert(self) -> FulcioSigningCert:
)

@property
def root_cert(self) -> FulcioRootCert:
return FulcioRootCert(
urljoin(self.url, ROOT_CERT_ENDPOINT), session=self.session
def trust_bundle(self) -> FulcioTrustBundle:
return FulcioTrustBundle(
urljoin(self.url, TRUST_BUNDLE_ENDPOINT), session=self.session
)