1
1
import os
2
2
import shutil
3
3
import tempfile
4
+ import time
5
+ from datetime import datetime , timedelta
6
+ from io import BytesIO
4
7
from pathlib import Path
5
- from typing import Callable , Tuple , TypeVar
8
+ from typing import Callable , Optional , Tuple , TypeVar
9
+ from zipfile import ZipFile
6
10
7
11
import pytest # type: ignore
12
+ import requests
8
13
9
14
from .client import (BundleMaterials , SignatureCertificateMaterials ,
10
15
SigstoreClient , VerificationMaterials )
13
18
_MakeMaterialsByType = Callable [[str , _M ], Tuple [Path , _M ]]
14
19
_MakeMaterials = Callable [[str ], Tuple [Path , VerificationMaterials ]]
15
20
21
+ _OIDC_BEACON_API_URL = (
22
+ "https://api.github.com/repos/sigstore-conformance/extremely-dangerous-public-oidc-beacon/"
23
+ "actions"
24
+ )
25
+ _OIDC_BEACON_WORKFLOW_ID = 55399612
26
+
27
+
28
+ class OidcTokenError (Exception ):
29
+ pass
30
+
16
31
17
32
def pytest_addoption (parser ):
18
33
"""
19
- Add the `--entrypoint`, `--identity -token`, and `--skip-signing` flags to
34
+ Add the `--entrypoint`, `--github -token`, and `--skip-signing` flags to
20
35
the `pytest` CLI.
21
36
"""
22
37
parser .addoption (
@@ -27,9 +42,9 @@ def pytest_addoption(parser):
27
42
type = str ,
28
43
)
29
44
parser .addoption (
30
- "--identity -token" ,
45
+ "--github -token" ,
31
46
action = "store" ,
32
- help = "the OIDC token to supply to the Sigstore client under test" ,
47
+ help = "the GitHub token to supply to the Sigstore client under test" ,
33
48
required = True ,
34
49
type = str ,
35
50
)
@@ -54,12 +69,91 @@ def pytest_configure(config):
54
69
55
70
56
71
@pytest .fixture
57
- def client (pytestconfig ):
72
+ def identity_token (pytestconfig ):
73
+ gh_token = pytestconfig .getoption ("--github-token" )
74
+ session = requests .Session ()
75
+ headers = {
76
+ "Accept" : "application/vnd.github+json" ,
77
+ "X-GitHub-Api-Version" : "2022-11-28" ,
78
+ "Authorization" : f"Bearer { gh_token } " ,
79
+ }
80
+
81
+ workflow_time : Optional [datetime ] = None
82
+ run_id : str
83
+
84
+ # We need a token that was generated in the last 5 minutes. Keep checking until we find one.
85
+ while workflow_time is None or datetime .now () - workflow_time >= timedelta (
86
+ minutes = 5
87
+ ):
88
+ # If there's a lot of traffic in the GitHub Actions cron queue, we might not have a valid
89
+ # token to use. In that case, wait for 30 seconds and try again.
90
+ if workflow_time is not None :
91
+ # FIXME(jl): logging in pytest?
92
+ # _log("Couldn't find a recent token, waiting...")
93
+ time .sleep (30 )
94
+
95
+ resp : requests .Response = session .get (
96
+ url = _OIDC_BEACON_API_URL + f"/workflows/{ _OIDC_BEACON_WORKFLOW_ID } /runs" ,
97
+ headers = headers ,
98
+ )
99
+ resp .raise_for_status ()
100
+
101
+ resp_json = resp .json ()
102
+ workflow_runs = resp_json ["workflow_runs" ]
103
+ if not workflow_runs :
104
+ raise OidcTokenError (f"Found no workflow runs: { resp_json } " )
105
+
106
+ workflow_run = workflow_runs [0 ]
107
+
108
+ # If the job is still running, the token artifact won't have been generated yet.
109
+ if workflow_run ["status" ] != "completed" :
110
+ continue
111
+
112
+ run_id = workflow_run ["id" ]
113
+ workflow_time = datetime .strptime (
114
+ workflow_run ["run_started_at" ], "%Y-%m-%dT%H:%M:%SZ"
115
+ )
116
+
117
+ resp = session .get (
118
+ url = _OIDC_BEACON_API_URL + f"/runs/{ run_id } /artifacts" ,
119
+ headers = headers ,
120
+ )
121
+ resp .raise_for_status ()
122
+
123
+ resp_json = resp .json ()
124
+ artifacts = resp_json ["artifacts" ]
125
+ if len (artifacts ) != 1 :
126
+ raise OidcTokenError (
127
+ f"Found unexpected number of artifacts on OIDC beacon run: { artifacts } "
128
+ )
129
+
130
+ oidc_artifact = artifacts [0 ]
131
+ if oidc_artifact ["name" ] != "oidc-token" :
132
+ raise OidcTokenError (
133
+ f"Found unexpected artifact on OIDC beacon run: { oidc_artifact ['name' ]} "
134
+ )
135
+ artifact_id = oidc_artifact ["id" ]
136
+
137
+ # Download the OIDC token artifact and unzip the archive.
138
+ resp = session .get (
139
+ url = _OIDC_BEACON_API_URL + f"/artifacts/{ artifact_id } /zip" ,
140
+ headers = headers ,
141
+ )
142
+ resp .raise_for_status ()
143
+
144
+ with ZipFile (BytesIO (resp .content )) as artifact_zip :
145
+ artifact_file = artifact_zip .open ("oidc-token.txt" )
146
+
147
+ # Strip newline.
148
+ return artifact_file .read ().decode ().rstrip ()
149
+
150
+
151
+ @pytest .fixture
152
+ def client (pytestconfig , identity_token ):
58
153
"""
59
154
Parametrize each test with the client under test.
60
155
"""
61
156
entrypoint = pytestconfig .getoption ("--entrypoint" )
62
- identity_token = pytestconfig .getoption ("--identity-token" )
63
157
return SigstoreClient (entrypoint , identity_token )
64
158
65
159
0 commit comments