29
29
from typing import List , Optional
30
30
from urllib .parse import urljoin
31
31
32
- import pem
33
32
import requests
34
33
from cryptography .hazmat .primitives import hashes , serialization
35
34
from cryptography .x509 import (
36
35
Certificate ,
37
36
CertificateSigningRequest ,
38
- ExtensionNotFound ,
39
37
PrecertificateSignedCertificateTimestamps ,
40
38
load_pem_x509_certificate ,
41
39
)
51
49
52
50
DEFAULT_FULCIO_URL = "https://fulcio.sigstore.dev"
53
51
STAGING_FULCIO_URL = "https://fulcio.sigstage.dev"
54
- SIGNING_CERT_ENDPOINT = "/api/v1 /signingCert"
55
- ROOT_CERT_ENDPOINT = "/api/v1/rootCert "
52
+ SIGNING_CERT_ENDPOINT = "/api/v2 /signingCert"
53
+ TRUST_BUNDLE_ENDPOINT = "/api/v2/trustBundle "
56
54
57
55
58
56
class SCTHashAlgorithm (IntEnum ):
@@ -162,10 +160,10 @@ class FulcioCertificateSigningResponse:
162
160
163
161
164
162
@dataclass (frozen = True )
165
- class FulcioRootResponse :
166
- """Root certificate response"""
163
+ class FulcioTrustBundleResponse :
164
+ """Trust bundle response, containing a list of certificate chains """
167
165
168
- root_cert : Certificate
166
+ trust_bundle : List [ List [ Certificate ]]
169
167
170
168
171
169
class FulcioClientError (Exception ):
@@ -212,16 +210,29 @@ def post(
212
210
except (AttributeError , KeyError ):
213
211
raise FulcioClientError from http_error
214
212
213
+ if resp .json ().get ("signedCertificateEmbeddedSct" ):
214
+ sct_embedded = True
215
+ try :
216
+ certificates = resp .json ()["signedCertificateEmbeddedSct" ]["chain" ]["certificates" ]
217
+ except KeyError :
218
+ raise FulcioClientError ("Fulcio response missing certificate chain" )
219
+ else :
220
+ sct_embedded = False
221
+ try :
222
+ certificates = resp .json ()["signedCertificateDetachedSct" ]["chain" ]["certificates" ]
223
+ except KeyError :
224
+ raise FulcioClientError ("Fulcio response missing certificate chain" )
225
+
215
226
# Cryptography doesn't have chain verification/building built in
216
227
# https://github.com/pyca/cryptography/issues/2381
217
- try :
218
- cert_pem , * chain_pems = pem . parse ( resp . content )
219
- cert = load_pem_x509_certificate ( cert_pem . as_bytes ())
220
- chain = [ load_pem_x509_certificate ( c . as_bytes ()) for c in chain_pems ]
221
- except ValueError :
222
- raise FulcioClientError ( f"Did not find a cert in Fulcio response: { resp } " )
228
+ if len ( certificates ) < 2 :
229
+ raise FulcioClientError (
230
+ f"Certificate chain is too short: { len ( certificates ) } < 2"
231
+ )
232
+ cert = load_pem_x509_certificate ( certificates [ 0 ]. encode ())
233
+ chain = [ load_pem_x509_certificate ( c . encode ()) for c in certificates [ 1 :]]
223
234
224
- try :
235
+ if sct_embedded :
225
236
# Try to retrieve the embedded SCTs within the cert.
226
237
precert_scts_extension = cert .extensions .get_extension_for_class (
227
238
PrecertificateSignedCertificateTimestamps
@@ -262,16 +273,17 @@ def _opaque16(value: bytes) -> bytes:
262
273
raw_sct = _opaque16 (_opaque16 (raw_sct_list_bytes ))
263
274
264
275
sct = precert_scts_extension [0 ]
265
- except ExtensionNotFound :
276
+ else :
266
277
# If we don't have any embedded SCTs, then we might be dealing
267
278
# with a Fulcio instance that provides detached SCTs.
268
279
269
- # The SCT header is a base64-encoded payload, which in turn
280
+ # The detached SCT is a base64-encoded payload, which in turn
270
281
# is a JSON representation of the SignedCertificateTimestamp
271
282
# in RFC 6962 (subsec. 3.2).
272
- sct_b64 = resp .headers .get ("SCT" )
273
- if sct_b64 is None :
274
- raise FulcioClientError ("Fulcio response did not include a SCT header" )
283
+ try :
284
+ sct_b64 = resp .json ()["signedCertificateDetachedSct" ]["signedCertificateTimestamp" ]
285
+ except KeyError :
286
+ raise FulcioClientError ("Fulcio response did not include a detached SCT" )
275
287
276
288
try :
277
289
sct_json = json .loads (base64 .b64decode (sct_b64 ).decode ())
@@ -291,16 +303,24 @@ def _opaque16(value: bytes) -> bytes:
291
303
return FulcioCertificateSigningResponse (cert , chain , sct , raw_sct )
292
304
293
305
294
- class FulcioRootCert (Endpoint ):
295
- def get (self ) -> FulcioRootResponse :
296
- """Get the root certificate"""
306
+ class FulcioTrustBundle (Endpoint ):
307
+ def get (self ) -> FulcioTrustBundleResponse :
308
+ """Get the certificate chains from Fulcio """
297
309
resp : requests .Response = self .session .get (self .url )
298
310
try :
299
311
resp .raise_for_status ()
300
312
except requests .HTTPError as http_error :
301
313
raise FulcioClientError from http_error
302
- root_cert : Certificate = load_pem_x509_certificate (resp .content )
303
- return FulcioRootResponse (root_cert )
314
+
315
+ trust_bundle_json = resp .json ()
316
+ chains : List [List [Certificate ]] = []
317
+ for certificate_chain in trust_bundle_json ["chains" ]:
318
+ chain : List [Certificate ] = []
319
+ for certificate in certificate_chain ["certificates" ]:
320
+ cert : Certificate = load_pem_x509_certificate (certificate .encode ())
321
+ chain .append (cert )
322
+ chains .append (chain )
323
+ return FulcioTrustBundleResponse (chains )
304
324
305
325
306
326
class FulcioClient :
@@ -327,7 +347,7 @@ def signing_cert(self) -> FulcioSigningCert:
327
347
)
328
348
329
349
@property
330
- def root_cert (self ) -> FulcioRootCert :
331
- return FulcioRootCert (
332
- urljoin (self .url , ROOT_CERT_ENDPOINT ), session = self .session
350
+ def trust_bundle (self ) -> FulcioTrustBundle :
351
+ return FulcioTrustBundle (
352
+ urljoin (self .url , TRUST_BUNDLE_ENDPOINT ), session = self .session
333
353
)
0 commit comments