Skip to content

Commit cbf3549

Browse files
committed
feat: use provenance to find commits for supported PURL types.
Signed-off-by: Ben Selwyn-Smith <[email protected]>
1 parent 40abe9e commit cbf3549

File tree

10 files changed

+840
-20
lines changed

10 files changed

+840
-20
lines changed
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
2+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
3+
4+
"""This module contains methods for extracting repository and commit metadata from provenance files."""
5+
import logging
6+
from typing import overload
7+
8+
from macaron.slsa_analyzer.provenance import intoto
9+
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload
10+
from macaron.util import JsonType
11+
12+
logger: logging.Logger = logging.getLogger(__name__)
13+
14+
15+
def extract_repo_and_commit_from_provenance(payload: InTotoPayload) -> tuple[str, str]:
16+
"""Extract the repository and commit metadata from the passed provenance payload.
17+
18+
Parameters
19+
----------
20+
payload: InTotoPayload
21+
The payload to extract from.
22+
23+
Returns
24+
-------
25+
tuple[str, str]
26+
The repository URL and commit hash if found, a pair of empty strings otherwise.
27+
"""
28+
predicate_type = payload.statement.get("predicateType")
29+
if isinstance(payload, InTotoV1Payload):
30+
if isinstance(payload, InTotoV1Payload):
31+
if predicate_type == "https://slsa.dev/provenance/v1":
32+
return _extract_from_slsa_v1(payload)
33+
elif isinstance(payload, InTotoV01Payload):
34+
if predicate_type == "https://slsa.dev/provenance/v0.2":
35+
return _extract_from_slsa_v02(payload)
36+
if predicate_type == "https://slsa.dev/provenance/v0.1":
37+
return _extract_from_slsa_v01(payload)
38+
if predicate_type == "https://witness.testifysec.com/attestation-collection/v0.1":
39+
return _extract_from_witness_provenance(payload)
40+
41+
logger.debug(
42+
"Extraction from provenance not supported for versions: predicate_type %s, in-toto %s.",
43+
predicate_type,
44+
payload.__class__,
45+
)
46+
return "", ""
47+
48+
49+
def _extract_from_slsa_v01(payload: InTotoV01Payload) -> tuple[str, str]:
50+
"""Extract the repository and commit metadata from the slsa v01 provenance payload."""
51+
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
52+
if not predicate:
53+
return "", ""
54+
55+
# The repository URL and commit are stored inside an entry in the list of predicate -> materials.
56+
# In predicate -> recipe -> definedInMaterial we find the list index that points to the correct entry.
57+
list_index = _json_extract(predicate, ["recipe", "definedInMaterial"], int)
58+
if not list_index:
59+
return "", ""
60+
61+
material_list = _json_extract(predicate, ["materials"], list)
62+
if not material_list:
63+
return "", ""
64+
65+
material = material_list[list_index]
66+
if not material or not isinstance(material, dict):
67+
return "", ""
68+
69+
uri = material.get("uri")
70+
if not uri:
71+
logger.debug("Could not extract repository URL.")
72+
repo = _clean_spdx(uri)
73+
74+
digest_set = material.get("digest")
75+
if not digest_set or not isinstance(digest_set, dict):
76+
return "", ""
77+
commit = _extract_commit_from_digest(digest_set)
78+
if not commit:
79+
logger.debug("Could not extract commit.")
80+
return "", ""
81+
82+
return repo, commit
83+
84+
85+
def _extract_from_slsa_v02(payload: InTotoV01Payload) -> tuple[str, str]:
86+
"""Extract the repository and commit metadata from the slsa v02 provenance payload."""
87+
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
88+
if not predicate:
89+
return "", ""
90+
91+
# The repository URL and commit are stored within the predicate -> invocation -> configSource object.
92+
# See https://slsa.dev/spec/v0.2/provenance
93+
uri = _json_extract(predicate, ["invocation", "configSource", "uri"], str)
94+
if not uri:
95+
logger.debug("Could not extract repo URL.")
96+
return "", ""
97+
repo = _clean_spdx(uri)
98+
99+
digest_set = _json_extract(predicate, ["invocation", "configSource", "digest"], dict)
100+
if not digest_set:
101+
return "", ""
102+
commit = _extract_commit_from_digest(digest_set)
103+
if not commit:
104+
logger.debug("Could not extract commit.")
105+
return "", ""
106+
107+
return repo, commit
108+
109+
110+
def _extract_from_slsa_v1(payload: InTotoV1Payload) -> tuple[str, str]:
111+
"""Extract the repository and commit metadata from the slsa v1 provenance payload."""
112+
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
113+
if not predicate:
114+
return "", ""
115+
116+
build_def = _json_extract(predicate, ["buildDefinition"], dict)
117+
if not build_def:
118+
return "", ""
119+
build_type = _json_extract(build_def, ["buildType"], str)
120+
if not build_type:
121+
return "", ""
122+
123+
# Extract the repository URL.
124+
repo = None
125+
if build_type == "https://slsa-framework.github.io/gcb-buildtypes/triggered-build/v1":
126+
repo = _json_extract(build_def, ["externalParameters", "sourceToBuild", "repository"], str)
127+
if not repo:
128+
repo = _json_extract(build_def, ["externalParameters", "configSource", "repository"], str)
129+
if build_type == "https://slsa-framework.github.io/github-actions-buildtypes/workflow/v1":
130+
repo = _json_extract(build_def, ["externalParameters", "workflow", "repository"], str)
131+
132+
if not repo:
133+
logger.debug("Failed to extract repository URL from provenance.")
134+
return "", ""
135+
136+
# Extract the commit hash.
137+
commit = None
138+
deps = _json_extract(build_def, ["resolvedDependencies"], list)
139+
if not deps:
140+
return "", ""
141+
for dep in deps:
142+
if not isinstance(dep, dict):
143+
continue
144+
uri = dep["uri"]
145+
url = _clean_spdx(uri)
146+
if url != repo:
147+
continue
148+
if build_type == "https://slsa-framework.github.io/gcb-buildtypes/triggered-build/v1":
149+
commit_dict = _json_extract(dep, ["digest"], dict)
150+
if not commit_dict:
151+
continue
152+
commit = _extract_commit_from_digest(commit_dict)
153+
if build_type == "https://slsa-framework.github.io/github-actions-buildtypes/workflow/v1":
154+
commit = _json_extract(dep, ["digest", "gitCommit"], str)
155+
156+
if not commit:
157+
logger.debug("Failed to extract commit hash from provenance.")
158+
return "", ""
159+
160+
return repo, commit
161+
162+
163+
def _extract_commit_from_digest(digest: dict[str, JsonType]) -> str | None:
164+
"""Extract the commit from the passed DigestSet.
165+
166+
The DigestSet is an in-toto object that maps algorithm types to commit hashes (digests).
167+
"""
168+
# TODO decide on a preference for which algorithm to accept.
169+
if len(digest.keys()) > 1:
170+
logger.debug("DigestSet contains multiple algorithms: %s", digest.keys())
171+
172+
for key in digest:
173+
if key in intoto.v1.VALID_ALGORITHMS:
174+
value = digest.get(key)
175+
if isinstance(value, str):
176+
return value
177+
return None
178+
179+
180+
def _clean_spdx(uri: str) -> str:
181+
"""Clean the passed SPDX URI and return the normalised URL it represents.
182+
183+
A SPDX URI has the form: git+https://example.com@refs/heads/main
184+
"""
185+
url, _, _ = uri.lstrip("git+").rpartition("@")
186+
return url
187+
188+
189+
def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str, str]:
190+
"""Extract the repository and commit metadata from the witness provenance file found at the passed path.
191+
192+
To successfully return the commit and repository URL, the payload must respectively contain a Git attestation, and
193+
either a GitHub or GitLab attestation.
194+
195+
Parameters
196+
----------
197+
payload: InTotoPayload
198+
The payload to extract from.
199+
200+
Returns
201+
-------
202+
tuple[str, str]
203+
The repository URL and commit hash if found, a pair of empty strings otherwise.
204+
"""
205+
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
206+
if not predicate:
207+
return "", ""
208+
attestations = _json_extract(predicate, ["attestations"], list)
209+
if not attestations:
210+
return "", ""
211+
commit: str | None = None
212+
repo: str | None = None
213+
for entry in attestations:
214+
if not isinstance(entry, dict):
215+
continue
216+
entry_type = entry.get("type")
217+
if not entry_type:
218+
continue
219+
if entry_type.startswith("https://witness.dev/attestations/git/"):
220+
commit = _json_extract(entry, ["attestation", "commithash"], str)
221+
elif entry_type.startswith("https://witness.dev/attestations/gitlab/") or entry_type.startswith(
222+
"https://witness.dev/attestations/github/"
223+
):
224+
repo = _json_extract(entry, ["attestation", "projecturl"], str)
225+
226+
if not commit or not repo:
227+
logger.debug("Could not extract repo and commit from provenance.")
228+
return "", ""
229+
230+
return repo, commit
231+
232+
233+
@overload
234+
def _json_extract(entry: dict[str, JsonType], keys: list[str], type_: type[int]) -> int | None:
235+
...
236+
237+
238+
@overload
239+
def _json_extract(entry: dict[str, JsonType], keys: list[str], type_: type[list]) -> list | None:
240+
...
241+
242+
243+
@overload
244+
def _json_extract(entry: dict[str, JsonType], keys: list[str], type_: type[dict]) -> dict | None:
245+
...
246+
247+
248+
@overload
249+
def _json_extract(entry: dict[str, JsonType], keys: list[str], type_: type[str]) -> str | None:
250+
...
251+
252+
253+
def _json_extract(entry: dict[str, JsonType], keys: list[str], type_: type[JsonType]) -> JsonType:
254+
"""Return the value found by following the list of depth-sequential keys inside the passed dictionary.
255+
256+
The value's type is validated against the passed type.
257+
"""
258+
target = entry
259+
for index, key in enumerate(keys):
260+
if key not in target:
261+
logger.debug("Key not found in JSON: %s", key)
262+
return None
263+
next_target = target[key]
264+
if index == len(keys) - 1:
265+
if isinstance(next_target, type_):
266+
return next_target
267+
else:
268+
if not isinstance(next_target, dict):
269+
logger.debug("Expected dict found: %s", next_target.__class__)
270+
break
271+
target = next_target
272+
273+
return None

0 commit comments

Comments
 (0)