diff --git a/scripts/dev_scripts/integration_tests.sh b/scripts/dev_scripts/integration_tests.sh index c0828fa83..8d85b8b75 100755 --- a/scripts/dev_scripts/integration_tests.sh +++ b/scripts/dev_scripts/integration_tests.sh @@ -99,6 +99,15 @@ if [[ -z "$NO_NPM_TEST" ]]; then $RUN_MACARON analyze -purl pkg:npm/@sigstore/mock@0.1.0 -rp https://github.com/sigstore/sigstore-js -b main -d ebdcfdfbdfeb9c9aeee6df53674ef230613629f5 --skip-deps || log_fail check_or_update_expected_output $COMPARE_JSON_OUT $JSON_RESULT $JSON_EXPECTED || log_fail + + echo -e "\n----------------------------------------------------------------------------------" + echo "semver@7.6.0: Extracting repository URL and commit from provenance while Repo Finder is disabled." + echo -e "----------------------------------------------------------------------------------\n" + JSON_EXPECTED=$WORKSPACE/tests/e2e/expected_results/purl/npm/semver/semver.json + JSON_RESULT=$WORKSPACE/output/reports/npm/semver/semver.json + $RUN_MACARON -dp tests/e2e/defaults/disable_repo_finder.ini analyze -purl pkg:npm/semver@7.6.0 || log_fail + + check_or_update_expected_output $COMPARE_JSON_OUT $JSON_RESULT $JSON_EXPECTED || log_fail fi echo -e "\n----------------------------------------------------------------------------------" diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index ad70156f3..3323f418e 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -142,7 +142,7 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None run_config, analyzer_single_args.sbom_path, analyzer_single_args.skip_deps, - prov_payload=prov_payload, + provenance_payload=prov_payload, ) sys.exit(status_code) diff --git a/src/macaron/errors.py b/src/macaron/errors.py index 5e892e1a6..f05540b6d 100644 --- a/src/macaron/errors.py +++ b/src/macaron/errors.py @@ -58,3 +58,15 @@ class InvalidHTTPResponseError(MacaronError): class CheckRegistryError(MacaronError): """The Check Registry Error class.""" + + +class ProvenanceError(MacaronError): + """When there is an error while extracting from provenance.""" + + +class JsonError(MacaronError): + """When there is an error while extracting from JSON.""" + + +class InvalidAnalysisTargetError(MacaronError): + """When a valid Analysis Target cannot be constructed.""" diff --git a/src/macaron/json_tools.py b/src/macaron/json_tools.py new file mode 100644 index 000000000..64ad2cfd5 --- /dev/null +++ b/src/macaron/json_tools.py @@ -0,0 +1,50 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module provides utility functions for JSON data.""" + +from typing import TypeVar + +from macaron.errors import JsonError +from macaron.util import JsonType + +T = TypeVar("T", bound=JsonType) + + +def json_extract(entry: JsonType, keys: list[str], type_: type[T]) -> T: + """Return the value found by following the list of depth-sequential keys inside the passed JSON dictionary. + + The value must be of the passed type. + + Parameters + ---------- + entry: JsonType + An entry point into a JSON structure. + keys: list[str] + The list of depth-sequential keys within the JSON. + type: type[T] + The type to check the value against and return it as. + + Returns + ------- + T: + The found value as the type of the type parameter. + + Raises + ------ + JsonError + Raised if an error occurs while searching for or validating the value. + """ + target = entry + + for index, key in enumerate(keys): + if not isinstance(target, dict): + raise JsonError(f"Expect the value .{'.'.join(keys[:index])} to be a dict.") + if key not in target: + raise JsonError(f"JSON key '{key}' not found in .{'.'.join(keys[:index])}.") + target = target[key] + + if isinstance(target, type_): + return target + + raise JsonError(f"Expect the value .{'.'.join(keys)} to be of type '{type_}'.") diff --git a/src/macaron/repo_finder/provenance_extractor.py b/src/macaron/repo_finder/provenance_extractor.py new file mode 100644 index 000000000..c30376a34 --- /dev/null +++ b/src/macaron/repo_finder/provenance_extractor.py @@ -0,0 +1,224 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module contains methods for extracting repository and commit metadata from provenance files.""" +import logging + +from macaron.errors import JsonError, ProvenanceError +from macaron.json_tools import json_extract +from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload +from macaron.util import JsonType + +logger: logging.Logger = logging.getLogger(__name__) + + +SLSA_V01_DIGEST_SET_GIT_ALGORITHMS = ["sha1"] +SLSA_V02_DIGEST_SET_GIT_ALGORITHMS = ["sha1"] +SLSA_V1_DIGEST_SET_GIT_ALGORITHMS = ["sha1", "gitCommit"] + + +def extract_repo_and_commit_from_provenance(payload: InTotoPayload) -> tuple[str, str]: + """Extract the repository and commit metadata from the passed provenance payload. + + Parameters + ---------- + payload: InTotoPayload + The payload to extract from. + + Returns + ------- + tuple[str, str] + The repository URL and commit hash if found, a pair of empty strings otherwise. + + Raises + ------ + ProvenanceError + If the extraction process fails for any reason. + """ + repo = "" + commit = "" + predicate_type = payload.statement.get("predicateType") + try: + if isinstance(payload, InTotoV1Payload): + if predicate_type == "https://slsa.dev/provenance/v1": + repo, commit = _extract_from_slsa_v1(payload) + elif isinstance(payload, InTotoV01Payload): + if predicate_type == "https://slsa.dev/provenance/v0.2": + repo, commit = _extract_from_slsa_v02(payload) + if predicate_type == "https://slsa.dev/provenance/v0.1": + repo, commit = _extract_from_slsa_v01(payload) + if predicate_type == "https://witness.testifysec.com/attestation-collection/v0.1": + repo, commit = _extract_from_witness_provenance(payload) + except JsonError as error: + logger.debug(error) + raise ProvenanceError("JSON exception while extracting from provenance.") from error + + if not repo or not commit: + msg = ( + f"Extraction from provenance not supported for versions: " + f"predicate_type {predicate_type}, in-toto {str(type(payload))}." + ) + logger.debug(msg) + raise ProvenanceError(msg) + + logger.debug("Extracted repo and commit from provenance: %s, %s", repo, commit) + return repo, commit + + +def _extract_from_slsa_v01(payload: InTotoV01Payload) -> tuple[str, str]: + """Extract the repository and commit metadata from the slsa v01 provenance payload.""" + predicate: dict[str, JsonType] | None = payload.statement.get("predicate") + if not predicate: + raise ProvenanceError("No predicate in payload statement.") + + # The repository URL and commit are stored inside an entry in the list of predicate -> materials. + # In predicate -> recipe -> definedInMaterial we find the list index that points to the correct entry. + list_index = json_extract(predicate, ["recipe", "definedInMaterial"], int) + material_list = json_extract(predicate, ["materials"], list) + if list_index >= len(material_list): + raise ProvenanceError("Material list index outside of material list bounds.") + material = material_list[list_index] + if not material or not isinstance(material, dict): + raise ProvenanceError("Indexed material list entry is invalid.") + + uri = json_extract(material, ["uri"], str) + + repo = _clean_spdx(uri) + + digest_set = json_extract(material, ["digest"], dict) + commit = _extract_commit_from_digest_set(digest_set, SLSA_V01_DIGEST_SET_GIT_ALGORITHMS) + + if not commit: + raise ProvenanceError("Failed to extract commit hash from provenance.") + + return repo, commit + + +def _extract_from_slsa_v02(payload: InTotoV01Payload) -> tuple[str, str]: + """Extract the repository and commit metadata from the slsa v02 provenance payload.""" + predicate: dict[str, JsonType] | None = payload.statement.get("predicate") + if not predicate: + raise ProvenanceError("No predicate in payload statement.") + + # The repository URL and commit are stored within the predicate -> invocation -> configSource object. + # See https://slsa.dev/spec/v0.2/provenance + uri = json_extract(predicate, ["invocation", "configSource", "uri"], str) + if not uri: + raise ProvenanceError("Failed to extract repository URL from provenance.") + repo = _clean_spdx(uri) + + digest_set = json_extract(predicate, ["invocation", "configSource", "digest"], dict) + commit = _extract_commit_from_digest_set(digest_set, SLSA_V02_DIGEST_SET_GIT_ALGORITHMS) + + if not commit: + raise ProvenanceError("Failed to extract commit hash from provenance.") + + return repo, commit + + +def _extract_from_slsa_v1(payload: InTotoV1Payload) -> tuple[str, str]: + """Extract the repository and commit metadata from the slsa v1 provenance payload.""" + predicate: dict[str, JsonType] | None = payload.statement.get("predicate") + if not predicate: + raise ProvenanceError("No predicate in payload statement.") + + build_def = json_extract(predicate, ["buildDefinition"], dict) + build_type = json_extract(build_def, ["buildType"], str) + + # Extract the repository URL. + repo = "" + if build_type == "https://slsa-framework.github.io/gcb-buildtypes/triggered-build/v1": + try: + repo = json_extract(build_def, ["externalParameters", "sourceToBuild", "repository"], str) + except JsonError: + repo = json_extract(build_def, ["externalParameters", "configSource", "repository"], str) + if build_type == "https://slsa-framework.github.io/github-actions-buildtypes/workflow/v1": + repo = json_extract(build_def, ["externalParameters", "workflow", "repository"], str) + + if not repo: + raise ProvenanceError("Failed to extract repository URL from provenance.") + + # Extract the commit hash. + commit = "" + deps = json_extract(build_def, ["resolvedDependencies"], list) + for dep in deps: + if not isinstance(dep, dict): + continue + uri = json_extract(dep, ["uri"], str) + url = _clean_spdx(uri) + if url != repo: + continue + digest_set = json_extract(dep, ["digest"], dict) + commit = _extract_commit_from_digest_set(digest_set, SLSA_V1_DIGEST_SET_GIT_ALGORITHMS) + + if not commit: + raise ProvenanceError("Failed to extract commit hash from provenance.") + + return repo, commit + + +def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str, str]: + """Extract the repository and commit metadata from the witness provenance file found at the passed path. + + To successfully return the commit and repository URL, the payload must respectively contain a Git attestation, and + either a GitHub or GitLab attestation. + + Parameters + ---------- + payload: InTotoPayload + The payload to extract from. + + Returns + ------- + tuple[str, str] + The repository URL and commit hash if found, a pair of empty strings otherwise. + """ + predicate: dict[str, JsonType] | None = payload.statement.get("predicate") + if not predicate: + raise ProvenanceError("No predicate in payload statement.") + + attestations = json_extract(predicate, ["attestations"], list) + commit = "" + repo = "" + for entry in attestations: + if not isinstance(entry, dict): + continue + entry_type = entry.get("type") + if not entry_type: + continue + if entry_type.startswith("https://witness.dev/attestations/git/"): + commit = json_extract(entry, ["attestation", "commithash"], str) + elif entry_type.startswith("https://witness.dev/attestations/gitlab/") or entry_type.startswith( + "https://witness.dev/attestations/github/" + ): + repo = json_extract(entry, ["attestation", "projecturl"], str) + + if not commit or not repo: + raise ProvenanceError("Could not extract repo and commit from provenance.") + + return repo, commit + + +def _extract_commit_from_digest_set(digest_set: dict[str, JsonType], valid_algorithms: list[str]) -> str: + """Extract the commit from the passed DigestSet. + + The DigestSet is an in-toto object that maps algorithm types to commit hashes (digests). + """ + if len(digest_set.keys()) > 1: + logger.debug("DigestSet contains multiple algorithms: %s", digest_set.keys()) + + for key in digest_set: + if key in valid_algorithms: + value = digest_set.get(key) + if isinstance(value, str): + return value + raise ProvenanceError(f"No valid digest in digest set: {digest_set.keys()} not in {valid_algorithms}") + + +def _clean_spdx(uri: str) -> str: + """Clean the passed SPDX URI and return the normalised URL it represents. + + A SPDX URI has the form: git+https://example.com@refs/heads/main + """ + url, _, _ = uri.lstrip("git+").rpartition("@") + return url diff --git a/src/macaron/repo_finder/provenance_finder.py b/src/macaron/repo_finder/provenance_finder.py new file mode 100644 index 000000000..06018a13a --- /dev/null +++ b/src/macaron/repo_finder/provenance_finder.py @@ -0,0 +1,225 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module contains methods for finding provenance files.""" +import logging +import os +import tempfile + +from packageurl import PackageURL + +from macaron.config.defaults import defaults +from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type +from macaron.slsa_analyzer.checks.provenance_available_check import ProvenanceAvailableException +from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES, JFrogMavenRegistry, NPMRegistry +from macaron.slsa_analyzer.package_registry.npm_registry import NPMAttestationAsset +from macaron.slsa_analyzer.provenance.intoto import InTotoPayload +from macaron.slsa_analyzer.provenance.intoto.errors import LoadIntotoAttestationError +from macaron.slsa_analyzer.provenance.loader import load_provenance_payload +from macaron.slsa_analyzer.provenance.witness import is_witness_provenance_payload, load_witness_verifier_config + +logger: logging.Logger = logging.getLogger(__name__) + + +class ProvenanceFinder: + """This class is used to find and retrieve provenance files from supported registries.""" + + def __init__(self) -> None: + registries = PACKAGE_REGISTRIES + self.npm_registry: NPMRegistry | None = None + self.jfrog_registry: JFrogMavenRegistry | None = None + if registries: + for registry in registries: + if isinstance(registry, NPMRegistry): + self.npm_registry = registry + elif isinstance(registry, JFrogMavenRegistry): + self.jfrog_registry = registry + + def find_provenance(self, purl: PackageURL) -> InTotoPayload | None: + """Find the provenance files of the passed PURL. + + Parameters + ---------- + purl: PackageURL + The PURL to find provenance for. + + Returns + ------- + InTotoPayload | None + The provenance payload if found, or None. + """ + if determine_abstract_purl_type(purl) == AbstractPurlType.REPOSITORY: + # Do not perform this function for repository type targets. + return None + + if purl.type == "npm": + if self.npm_registry: + return ProvenanceFinder.find_npm_provenance(purl, self.npm_registry) + logger.debug("Missing npm registry to find provenance in.") + elif purl.type in ["gradle", "maven"]: + if self.jfrog_registry: + return ProvenanceFinder.find_gav_provenance(purl, self.jfrog_registry) + logger.debug("Missing JFrog registry to find provenance in.") + else: + logger.debug("Provenance finding not supported for PURL type: %s", purl.type) + + return None + + @staticmethod + def find_npm_provenance(purl: PackageURL, npm_registry: NPMRegistry) -> InTotoPayload | None: + """Find and download the NPM based provenance for the passed PURL. + + Parameters + ---------- + purl: PackageURL + The PURL of the analysis target. + npm_registry: NPMRegistry + The npm registry to find provenance in. + + Returns + ------- + InTotoPayload | None + The provenance payload if found, or None. + """ + if not npm_registry.enabled: + logger.debug("The npm registry is not enabled.") + return None + + namespace = purl.namespace + artifact_id = purl.name + version = purl.version + + if not purl.version: + version = npm_registry.get_latest_version(namespace, artifact_id) + + if not version: + logger.debug("Missing version for NPM package.") + return None + + # The size of the asset (in bytes) is added to match the AssetLocator + # protocol and is not used because npm API registry does not provide it, so it is set to zero. + npm_provenance_asset = NPMAttestationAsset( + namespace=namespace, + artifact_id=artifact_id, + version=version, + npm_registry=npm_registry, + size_in_bytes=0, + ) + try: + with tempfile.TemporaryDirectory() as temp_dir: + download_path = os.path.join(temp_dir, f"{artifact_id}.intoto.jsonl") + if not npm_provenance_asset.download(download_path): + logger.debug("Unable to find an npm provenance for %s@%s", artifact_id, version) + return None + + try: + # Load the provenance file. + provenance_payload = load_provenance_payload(download_path) + except LoadIntotoAttestationError as loadintotoerror: + logger.error("Error while loading provenance %s", loadintotoerror) + return None + + return provenance_payload + except OSError as error: + logger.error("Error while storing provenance in the temporary directory: %s", error) + return None + + @staticmethod + def find_gav_provenance(purl: PackageURL, jfrog_registry: JFrogMavenRegistry) -> InTotoPayload | None: + """Find and download the GAV based provenance for the passed PURL. + + Parameters + ---------- + purl: PackageURL + The PURL of the analysis target. + jfrog_registry: JFrogMavenRegistry + The JFrog registry to find provenance in. + + Returns + ------- + InTotoPayload | None + The provenance payload if found, or None. + + Raises + ------ + ProvenanceAvailableException + If the discovered provenance file size exceeds the configured limit. + """ + if not jfrog_registry.enabled: + logger.debug("JFrog registry not enabled.") + return None + + if not purl.namespace or not purl.version: + logger.debug("Missing purl namespace or version for finding provenance in JFrog registry.") + return None + + provenance_extensions = defaults.get_list( + "slsa.verifier", + "provenance_extensions", + fallback=["intoto.jsonl"], + ) + + provenance_assets = jfrog_registry.fetch_assets( + group_id=purl.namespace, + artifact_id=purl.name, + version=purl.version, + extensions=set(provenance_extensions), + ) + + if not provenance_assets: + return None + + max_valid_provenance_size = defaults.getint( + "slsa.verifier", + "max_download_size", + fallback=1000000, + ) + + for provenance_asset in provenance_assets: + if provenance_asset.size_in_bytes > max_valid_provenance_size: + msg = ( + f"The provenance asset {provenance_asset.name} unexpectedly exceeds the " + f"max valid file size of {max_valid_provenance_size} (bytes). " + "The check will not proceed due to potential security risks." + ) + logger.error(msg) + raise ProvenanceAvailableException(msg) + + provenance_filepaths = [] + try: + with tempfile.TemporaryDirectory() as temp_dir: + for provenance_asset in provenance_assets: + provenance_filepath = os.path.join(temp_dir, provenance_asset.name) + if not provenance_asset.download(provenance_filepath): + logger.debug( + "Could not download the provenance %s. Skip verifying...", + provenance_asset.name, + ) + continue + provenance_filepaths.append(provenance_filepath) + except OSError as error: + logger.error("Error while storing provenance in the temporary directory: %s", error) + + provenances = [] + witness_verifier_config = load_witness_verifier_config() + + for provenance_filepath in provenance_filepaths: + try: + provenance_payload = load_provenance_payload(provenance_filepath) + except LoadIntotoAttestationError as error: + logger.error("Error while loading provenance: %s", error) + continue + + if not is_witness_provenance_payload(provenance_payload, witness_verifier_config.predicate_types): + continue + + provenances.append(provenance_payload) + + if not provenances: + logger.debug("No payloads found in provenance files.") + return None + + # We assume that there is only one provenance per GAV. + provenance = provenances[0] + + return provenance diff --git a/src/macaron/repo_finder/repo_finder.py b/src/macaron/repo_finder/repo_finder.py index 999ce0f87..d365f34d8 100644 --- a/src/macaron/repo_finder/repo_finder.py +++ b/src/macaron/repo_finder/repo_finder.py @@ -74,7 +74,7 @@ def find_repo(purl: PackageURL) -> str: return "" # Call Repo Finder and return first valid URL - logger.debug("Analyzing %s with Repo Finder: %s", purl.to_string(), repo_finder.__class__) + logger.debug("Analyzing %s with Repo Finder: %s", purl.to_string(), str(type(repo_finder))) return repo_finder.find_repo(purl) diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index 7eab59b43..1687045b1 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -21,11 +21,21 @@ from macaron.database.database_manager import DatabaseManager, get_db_manager, get_db_session from macaron.database.table_definitions import Analysis, Component, Repository from macaron.dependency_analyzer import DependencyAnalyzer, DependencyInfo -from macaron.errors import CloneError, DuplicateError, InvalidPURLError, PURLNotFoundError, RepoCheckOutError +from macaron.errors import ( + CloneError, + DuplicateError, + InvalidAnalysisTargetError, + InvalidPURLError, + ProvenanceError, + PURLNotFoundError, + RepoCheckOutError, +) from macaron.output_reporter.reporter import FileReporter from macaron.output_reporter.results import Record, Report, SCMStatus from macaron.repo_finder import repo_finder from macaron.repo_finder.commit_finder import find_commit +from macaron.repo_finder.provenance_extractor import extract_repo_and_commit_from_provenance +from macaron.repo_finder.provenance_finder import ProvenanceFinder from macaron.slsa_analyzer import git_url from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.asset import VirtualReleaseAsset @@ -116,7 +126,7 @@ def run( user_config: dict, sbom_path: str = "", skip_deps: bool = False, - prov_payload: InTotoPayload | None = None, + provenance_payload: InTotoPayload | None = None, ) -> int: """Run the analysis and write results to the output path. @@ -131,7 +141,7 @@ def run( The path to the SBOM. skip_deps : bool Flag to skip dependency resolution. - prov_payload : InToToPayload | None + provenance_payload : InToToPayload | None The provenance intoto payload for the main software component. Returns @@ -165,7 +175,7 @@ def run( main_record = self.run_single( main_config, analysis, - prov_payload=prov_payload, + provenance_payload=provenance_payload, ) if main_record.status != SCMStatus.AVAILABLE or not main_record.context: @@ -267,7 +277,7 @@ def run_single( config: Configuration, analysis: Analysis, existing_records: dict[str, Record] | None = None, - prov_payload: InTotoPayload | None = None, + provenance_payload: InTotoPayload | None = None, ) -> Record: """Run the checks for a single repository target. @@ -282,7 +292,7 @@ def run_single( The current analysis instance. existing_records : dict[str, Record] | None The mapping of existing records that the analysis has run successfully. - prov_payload : InToToPayload | None + provenance_payload : InToToPayload | None The provenance intoto payload for the analyzed software component. Returns @@ -290,10 +300,39 @@ def run_single( Record The record of the analysis for this repository. """ + # Parse the PURL. repo_id = config.get_value("id") + try: + parsed_purl = Analyzer.parse_purl(config) + except InvalidPURLError as error: + logger.error(error) + return Record( + record_id=repo_id, + description=str(error), + pre_config=config, + status=SCMStatus.ANALYSIS_FAILED, + ) + + if not provenance_payload and parsed_purl and not config.get_value("path"): + # Try to find the provenance file for the parsed PURL. + provenance_payload = ProvenanceFinder().find_provenance(parsed_purl) + + # Create the analysis target. + available_domains = [git_service.hostname for git_service in GIT_SERVICES if git_service.hostname] + try: + analysis_target = Analyzer.to_analysis_target(config, available_domains, parsed_purl, provenance_payload) + except InvalidAnalysisTargetError as error: + return Record( + record_id=repo_id, + description=str(error), + pre_config=config, + status=SCMStatus.ANALYSIS_FAILED, + ) + + # Create the component. component = None try: - component = self.add_component(config, analysis, existing_records) + component = self.add_component(analysis, analysis_target, existing_records) except PURLNotFoundError as error: logger.error(error) return Record( @@ -321,7 +360,7 @@ def run_single( analyze_ctx.dynamic_data["expectation"] = self.expectations.get_expectation_for_target( analyze_ctx.component.purl.split("@")[0] ) - analyze_ctx.dynamic_data["provenance"] = prov_payload + analyze_ctx.dynamic_data["provenance"] = provenance_payload analyze_ctx.check_results = self.perform_checks(analyze_ctx) return Record( @@ -441,7 +480,10 @@ class AnalysisTarget(NamedTuple): digest: str def add_component( - self, config: Configuration, analysis: Analysis, existing_records: dict[str, Record] | None = None + self, + analysis: Analysis, + analysis_target: AnalysisTarget, + existing_records: dict[str, Record] | None = None, ) -> Component: """Add a software component if it does not exist in the DB already. @@ -450,10 +492,10 @@ def add_component( Parameters ---------- - config: Configuration - The configuration for running Macaron. analysis: Analysis The current analysis instance. + analysis_target: AnalysisTarget + The target of this analysis. existing_records : dict[str, Record] | None The mapping of existing records that the analysis has run successfully. @@ -470,15 +512,6 @@ def add_component( The component is already analyzed in the same session. """ # Note: the component created in this function will be added to the database. - available_domains = [git_service.hostname for git_service in GIT_SERVICES if git_service.hostname] - try: - analysis_target = Analyzer.to_analysis_target(config, available_domains) - except InvalidPURLError as error: - raise PURLNotFoundError("Invalid input PURL.") from error - - if not analysis_target.parsed_purl and not analysis_target.repo_path: - raise PURLNotFoundError("Cannot determine the analysis as PURL and/or repository path is not provided.") - repository = None if analysis_target.repo_path: git_obj = self._prepare_repo( @@ -528,21 +561,18 @@ def add_component( return Component(purl=analysis_target.parsed_purl.to_string(), analysis=analysis, repository=repository) @staticmethod - def to_analysis_target(config: Configuration, available_domains: list[str]) -> AnalysisTarget: - """Resolve the details of a software component from user input. + def parse_purl(config: Configuration) -> PackageURL | None: + """Parse the PURL provided in the input. Parameters ---------- config : Configuration The target configuration that stores the user input values for the software component. - available_domains : list[str] - The list of supported git service host domain. This is used to convert repo-based PURL to a repository path - of the corresponding software component. Returns ------- - AnalysisTarget - The NamedTuple that contains the resolved details for the software component. + PackageURL | None + The parsed PURL, or None if one was not provided as input. Raises ------ @@ -554,26 +584,58 @@ def to_analysis_target(config: Configuration, available_domains: list[str]) -> A # Therefore, their true types are ``str``, and an empty string indicates that the input value is not provided. # The purl might be a PackageURL type, a string, or None, which should be reduced down to an optional # PackageURL type. + purl = config.get_value("purl") + if purl is None or purl == "": + return None + if isinstance(purl, PackageURL): + return purl + try: + # Note that PackageURL.from_string sanitizes the unsafe characters in the purl string, + # which is user-controllable, by calling urllib's `urlsplit` function. + return PackageURL.from_string(purl) + except ValueError as error: + raise InvalidPURLError(f"Invalid input PURL: {purl}") from error + + @staticmethod + def to_analysis_target( + config: Configuration, + available_domains: list[str], + parsed_purl: PackageURL | None, + provenance_payload: InTotoPayload | None = None, + ) -> AnalysisTarget: + """Resolve the details of a software component from user input. + + Parameters + ---------- + config : Configuration + The target configuration that stores the user input values for the software component. + available_domains : list[str] + The list of supported git service host domain. This is used to convert repo-based PURL to a repository path + of the corresponding software component. parsed_purl: PackageURL | None - if config.get_value("purl") is None or config.get_value("purl") == "": - parsed_purl = None - elif isinstance(config.get_value("purl"), PackageURL): - parsed_purl = config.get_value("purl") - else: - try: - # Note that PackageURL.from_string sanitizes the unsafe characters in the purl string, - # which is user-controllable, by calling urllib's `urlsplit` function. - parsed_purl = PackageURL.from_string(config.get_value("purl")) - except ValueError as error: - raise InvalidPURLError(f"Invalid input PURL: {config.get_value('purl')}") from error + The PURL to use for the analysis target, or None if one has not been provided. + provenance_payload : InToToPayload | None + The provenance in-toto payload for the software component. + + Returns + ------- + AnalysisTarget + The NamedTuple that contains the resolved details for the software component. + Raises + ------ + InvalidAnalysisTargetError + Raised if a valid Analysis Target cannot be created. + """ repo_path_input: str = config.get_value("path") input_branch: str = config.get_value("branch") input_digest: str = config.get_value("digest") match (parsed_purl, repo_path_input): case (None, ""): - return Analyzer.AnalysisTarget(parsed_purl=None, repo_path="", branch="", digest="") + raise InvalidAnalysisTargetError( + "Cannot determine the analysis target: PURL and repository path are missing." + ) case (None, _): # If only the repository path is provided, we will use the user-provided repository path to create the @@ -587,10 +649,27 @@ def to_analysis_target(config: Configuration, available_domains: list[str]) -> A case (_, ""): # If a PURL but no repository path is provided, we try to extract the repository path from the PURL. # Note that we can't always extract the repository path from any provided PURL. - repo = "" converted_repo_path = None + repo: str = "" + digest: str = "" # parsed_purl cannot be None here, but mypy cannot detect that without some extra help. if parsed_purl is not None: + if provenance_payload: + # Try to find repository and commit via provenance. + try: + repo, digest = extract_repo_and_commit_from_provenance(provenance_payload) + except ProvenanceError as error: + logger.debug("Failed to extract repo and commit from provenance: %s", error) + + if repo and digest: + return Analyzer.AnalysisTarget( + parsed_purl=parsed_purl, + repo_path=repo, + branch="", + digest=digest, + ) + + # The commit was not found from provenance. Proceed with Repo Finder. converted_repo_path = repo_finder.to_repo_path(parsed_purl, available_domains) if converted_repo_path is None: # Try to find repo from PURL @@ -612,7 +691,9 @@ def to_analysis_target(config: Configuration, available_domains: list[str]) -> A ) case _: - return Analyzer.AnalysisTarget(parsed_purl=None, repo_path="", branch="", digest="") + raise InvalidAnalysisTargetError( + "Cannot determine the analysis target: PURL and repository path are missing." + ) def get_analyze_ctx(self, component: Component) -> AnalyzeContext: """Return the analyze context for a target component. @@ -673,7 +754,6 @@ def _prepare_repo( The pydriller.Git object of the repository or None if error. """ # TODO: separate the logic for handling remote and local repos instead of putting them into this method. - logger.info( "Preparing the repository for the analysis (path=%s, branch=%s, digest=%s)", repo_path, diff --git a/src/macaron/slsa_analyzer/package_registry/npm_registry.py b/src/macaron/slsa_analyzer/package_registry/npm_registry.py index 6ceb01967..7786d0e1b 100644 --- a/src/macaron/slsa_analyzer/package_registry/npm_registry.py +++ b/src/macaron/slsa_analyzer/package_registry/npm_registry.py @@ -185,6 +185,8 @@ def download_attestation_payload(self, url: str, download_path: str) -> bool: logger.debug("dsseEnvelope attribute in the bundle is missing. Skipping...") continue + logger.debug("Found attestation with valid predicateType: %s", att.get("predicateType")) + try: with open(download_path, "w", encoding="utf-8") as file: json.dump(dsse_env, file) @@ -199,6 +201,44 @@ def download_attestation_payload(self, url: str, download_path: str) -> bool: return False + def get_latest_version(self, namespace: str | None, name: str) -> str | None: + """Try to retrieve the latest version of a package from the registry. + + Parameters + ---------- + namespace: str | None + The optional namespace of the package. + name: str + The name of the package. + + Returns + ------- + str | None + The latest version of the package, or None if one cannot be found. + """ + if not name: + return None + + url = f"https://{self.hostname}" + if namespace: + url = f"{url}/{namespace}" + url = f"{url}/{name}/latest" + + response = send_get_http_raw(url, timeout=self.request_timeout) + + if not response or not response.text: + logger.debug("No valid response from NPM server for latest version.") + return None + + json_data = json.loads(response.text) + version: str | None = json_data.get("version") + if not version: + logger.debug("No version found in response from NPM server.") + return None + + logger.debug("Found version for NPM artifact: %s", version) + return version + class NPMAttestationAsset(NamedTuple): """An attestation asset hosted on the npm registry. diff --git a/src/macaron/slsa_analyzer/provenance/intoto/v01/__init__.py b/src/macaron/slsa_analyzer/provenance/intoto/v01/__init__.py index 4e10f3ca8..95fc3b304 100644 --- a/src/macaron/slsa_analyzer/provenance/intoto/v01/__init__.py +++ b/src/macaron/slsa_analyzer/provenance/intoto/v01/__init__.py @@ -161,7 +161,7 @@ def is_valid_digest_set(digest: dict[str, JsonType]) -> TypeGuard[dict[str, str] ``True`` if the digest set is valid according to the spec, in which case its type is narrowed to a ``dict[str, str]``; ``False`` otherwise. """ - for value in digest.values(): - if not isinstance(value, str): + for key in digest: + if not isinstance(digest[key], str): return False return True diff --git a/src/macaron/slsa_analyzer/provenance/intoto/v1/__init__.py b/src/macaron/slsa_analyzer/provenance/intoto/v1/__init__.py index 8133635b4..3ffe08bd6 100644 --- a/src/macaron/slsa_analyzer/provenance/intoto/v1/__init__.py +++ b/src/macaron/slsa_analyzer/provenance/intoto/v1/__init__.py @@ -11,31 +11,6 @@ from macaron.slsa_analyzer.provenance.intoto.errors import ValidateInTotoPayloadError from macaron.util import JsonType -# The full list of cryptographic algorithms supported in SLSA v1 provenance. These are used as keys within the digest -# set of the resource descriptors within the subject. -# See: https://github.com/in-toto/attestation/blob/main/spec/v1/digest_set.md -VALID_ALGORITHMS = [ - "sha256", - "sha224", - "sha384", - "sha512", - "sha512_224", - "sha512_256", - "sha3_224", - "sha3_256", - "sha3_384", - "sha3_512", - "shake128", - "shake256", - "blake2b", - "blake2s", - "ripemd160", - "sm3", - "gost", - "sha1", - "md5", -] - class InTotoV1Statement(TypedDict): """An in-toto version 1 statement. @@ -190,8 +165,6 @@ def is_valid_digest_set(digest: JsonType) -> bool: if not isinstance(digest, dict): return False for key in digest: - if key not in VALID_ALGORITHMS: - return False if not isinstance(digest[key], str): return False return True diff --git a/src/macaron/slsa_analyzer/provenance/witness/__init__.py b/src/macaron/slsa_analyzer/provenance/witness/__init__.py index cbe1afe8e..408fb31ca 100644 --- a/src/macaron/slsa_analyzer/provenance/witness/__init__.py +++ b/src/macaron/slsa_analyzer/provenance/witness/__init__.py @@ -136,8 +136,6 @@ def extract_witness_provenance_subjects(witness_payload: InTotoPayload) -> set[W dict[str, str] A dictionary in which each key is a subject name and each value is the corresponding SHA256 digest. """ - # TODO: add support for in-toto v1 provenances. - if isinstance(witness_payload, InTotoV01Payload): subjects = witness_payload.statement["subject"] subject_digests = set() diff --git a/tests/e2e/defaults/disable_repo_finder.ini b/tests/e2e/defaults/disable_repo_finder.ini new file mode 100644 index 000000000..ec4fd9216 --- /dev/null +++ b/tests/e2e/defaults/disable_repo_finder.ini @@ -0,0 +1,5 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +[repofinder] +find_repos = False diff --git a/tests/e2e/expected_results/purl/npm/semver/semver.json b/tests/e2e/expected_results/purl/npm/semver/semver.json new file mode 100644 index 000000000..9fa549cba --- /dev/null +++ b/tests/e2e/expected_results/purl/npm/semver/semver.json @@ -0,0 +1,334 @@ +{ + "metadata": { + "timestamps": "2024-03-22 09:02:56", + "has_passing_check": true, + "run_checks": [ + "mcn_provenance_available_1", + "mcn_provenance_expectation_1", + "mcn_provenance_witness_level_one_1", + "mcn_trusted_builder_level_three_1", + "mcn_build_as_code_1", + "mcn_build_script_1", + "mcn_build_service_1", + "mcn_infer_artifact_pipeline_1", + "mcn_provenance_level_three_1", + "mcn_version_control_system_1" + ], + "check_tree": { + "mcn_provenance_available_1": { + "mcn_provenance_level_three_1": {}, + "mcn_provenance_expectation_1": {}, + "mcn_provenance_witness_level_one_1": {} + }, + "mcn_version_control_system_1": { + "mcn_trusted_builder_level_three_1": { + "mcn_build_as_code_1": { + "mcn_build_service_1": { + "mcn_build_script_1": {} + }, + "mcn_infer_artifact_pipeline_1": {} + } + } + } + } + }, + "target": { + "info": { + "full_name": "pkg:npm/semver@7.6.0", + "local_cloned_path": "git_repos/github.com/npm/node-semver", + "remote_path": "https://github.com/npm/node-semver", + "branch": null, + "commit_hash": "377f709718053a477ed717089c4403c4fec332a1", + "commit_date": "2024-02-05T09:03:38-08:00" + }, + "provenances": { + "is_inferred": false, + "content": { + "github_actions": [ + { + "_type": "https://in-toto.io/Statement/v0.1", + "subject": [], + "predicateType": "https://slsa.dev/provenance/v0.2", + "predicate": { + "builder": { + "id": "" + }, + "buildType": "", + "invocation": { + "configSource": { + "uri": "", + "digest": { + "sha1": "" + }, + "entryPoint": "" + }, + "parameters": {}, + "environment": {} + }, + "buildConfig": { + "jobID": "", + "stepID": "" + }, + "metadata": { + "buildInvocationId": "", + "buildStartedOn": "", + "buildFinishedOn": "", + "completeness": { + "parameters": "false", + "environment": "false", + "materials": "false" + }, + "reproducible": "false" + }, + "materials": [ + { + "uri": "", + "digest": {} + } + ] + } + } + ], + "npm Registry": [ + { + "_type": "https://in-toto.io/Statement/v1", + "subject": [ + { + "name": "pkg:npm/semver@7.6.0", + "digest": { + "sha512": "127c1786b9705cc93d80abb9fdf971e6cbff6a7e7b024469946de14caebc5bb1510cdfa4f8e5818fae4cefbd7d3a403cd972c1c6b717d0a4878fe5f908e84e56" + } + } + ], + "predicateType": "https://slsa.dev/provenance/v1", + "predicate": { + "buildDefinition": { + "buildType": "https://slsa-framework.github.io/github-actions-buildtypes/workflow/v1", + "externalParameters": { + "workflow": { + "ref": "refs/heads/main", + "repository": "https://github.com/npm/node-semver", + "path": ".github/workflows/release.yml" + } + }, + "internalParameters": { + "github": { + "event_name": "push", + "repository_id": "1357199", + "repository_owner_id": "6078720" + } + }, + "resolvedDependencies": [ + { + "uri": "git+https://github.com/npm/node-semver@refs/heads/main", + "digest": { + "gitCommit": "377f709718053a477ed717089c4403c4fec332a1" + } + } + ] + }, + "runDetails": { + "builder": { + "id": "https://github.com/actions/runner/github-hosted" + }, + "metadata": { + "invocationId": "https://github.com/npm/node-semver/actions/runs/7788106733/attempts/1" + } + } + } + } + ] + } + }, + "checks": { + "summary": { + "DISABLED": 0, + "FAILED": 4, + "PASSED": 5, + "SKIPPED": 0, + "UNKNOWN": 1 + }, + "results": [ + { + "check_id": "mcn_provenance_expectation_1", + "check_description": "Check whether the SLSA provenance for the produced artifact conforms to the expected value.", + "slsa_requirements": [ + "Provenance conforms with expectations - SLSA Level 3" + ], + "justification": [ + "Not Available." + ], + "result_type": "UNKNOWN" + }, + { + "check_id": "mcn_build_as_code_1", + "check_description": "The build definition and configuration executed by the build service is verifiably derived from text file definitions stored in a version control system.", + "slsa_requirements": [ + "Build as code - SLSA Level 3" + ], + "justification": [ + "build_tool_name: npm", + "ci_service_name: github_actions", + "deploy_command: [\"npm\", \"publish\", \"--provenance\", \"--tag=\\\"$1\\\"\"]", + { + "build_trigger": "https://github.com/npm/node-semver/blob/377f709718053a477ed717089c4403c4fec332a1/.github/workflows/release-integration.yml" + } + ], + "result_type": "PASSED" + }, + { + "check_id": "mcn_build_script_1", + "check_description": "Check if the target repo has a valid build script.", + "slsa_requirements": [ + "Scripted Build - SLSA Level 1" + ], + "justification": [ + "Not Available." + ], + "result_type": "PASSED" + }, + { + "check_id": "mcn_build_service_1", + "check_description": "Check if the target repo has a valid build service.", + "slsa_requirements": [ + "Build service - SLSA Level 2" + ], + "justification": [ + "Not Available." + ], + "result_type": "PASSED" + }, + { + "check_id": "mcn_provenance_available_1", + "check_description": "Check whether the target has intoto provenance.", + "slsa_requirements": [ + "Provenance - Available - SLSA Level 1", + "Provenance content - Identifies build instructions - SLSA Level 1", + "Provenance content - Identifies artifacts - SLSA Level 1", + "Provenance content - Identifies builder - SLSA Level 1" + ], + "justification": [ + "asset_name: semver", + { + "asset_url": "https://registry.npmjs.org/-/npm/v1/attestations/semver@7.6.0" + } + ], + "result_type": "PASSED" + }, + { + "check_id": "mcn_version_control_system_1", + "check_description": "Check whether the target repo uses a version control system.", + "slsa_requirements": [ + "Version controlled - SLSA Level 2" + ], + "justification": [ + { + "git_repo": "https://github.com/npm/node-semver" + } + ], + "result_type": "PASSED" + }, + { + "check_id": "mcn_infer_artifact_pipeline_1", + "check_description": "Detects potential pipelines from which an artifact is published.", + "slsa_requirements": [ + "Build as code - SLSA Level 3" + ], + "justification": [ + "Not Available." + ], + "result_type": "FAILED" + }, + { + "check_id": "mcn_provenance_level_three_1", + "check_description": "Check whether the target has SLSA provenance level 3.", + "slsa_requirements": [ + "Provenance - Non falsifiable - SLSA Level 3", + "Provenance content - Includes all build parameters - SLSA Level 3", + "Provenance content - Identifies entry point - SLSA Level 3", + "Provenance content - Identifies source code - SLSA Level 2" + ], + "justification": [ + "Not Available." + ], + "result_type": "FAILED" + }, + { + "check_id": "mcn_provenance_witness_level_one_1", + "check_description": "Check whether the target has a level-1 witness provenance.", + "slsa_requirements": [ + "Provenance - Available - SLSA Level 1", + "Provenance content - Identifies build instructions - SLSA Level 1", + "Provenance content - Identifies artifacts - SLSA Level 1", + "Provenance content - Identifies builder - SLSA Level 1" + ], + "justification": [ + "Not Available." + ], + "result_type": "FAILED" + }, + { + "check_id": "mcn_trusted_builder_level_three_1", + "check_description": "Check whether the target uses a trusted SLSA level 3 builder.", + "slsa_requirements": [ + "Hermetic - SLSA Level 4", + "Isolated - SLSA Level 3", + "Parameterless - SLSA Level 4", + "Ephemeral environment - SLSA Level 3" + ], + "justification": [ + "Not Available." + ], + "result_type": "FAILED" + } + ] + } + }, + "dependencies": { + "analyzed_deps": 0, + "unique_dep_repos": 0, + "checks_summary": [ + { + "check_id": "mcn_provenance_available_1", + "num_deps_pass": 0 + }, + { + "check_id": "mcn_provenance_expectation_1", + "num_deps_pass": 0 + }, + { + "check_id": "mcn_provenance_witness_level_one_1", + "num_deps_pass": 0 + }, + { + "check_id": "mcn_trusted_builder_level_three_1", + "num_deps_pass": 0 + }, + { + "check_id": "mcn_build_as_code_1", + "num_deps_pass": 0 + }, + { + "check_id": "mcn_build_script_1", + "num_deps_pass": 0 + }, + { + "check_id": "mcn_build_service_1", + "num_deps_pass": 0 + }, + { + "check_id": "mcn_infer_artifact_pipeline_1", + "num_deps_pass": 0 + }, + { + "check_id": "mcn_provenance_level_three_1", + "num_deps_pass": 0 + }, + { + "check_id": "mcn_version_control_system_1", + "num_deps_pass": 0 + } + ], + "dep_status": [] + } +} diff --git a/tests/repo_finder/test_provenance_extractor.py b/tests/repo_finder/test_provenance_extractor.py new file mode 100644 index 000000000..1ee27aa4e --- /dev/null +++ b/tests/repo_finder/test_provenance_extractor.py @@ -0,0 +1,457 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module tests the provenance extractor on valid example provenances.""" +import json + +import pytest + +from macaron.errors import ProvenanceError +from macaron.json_tools import json_extract +from macaron.repo_finder.provenance_extractor import extract_repo_and_commit_from_provenance +from macaron.slsa_analyzer.provenance.intoto import validate_intoto_payload +from macaron.util import JsonType + + +@pytest.fixture(name="slsa_v1_gcb_1_provenance") +def slsa_v1_gcb_1_provenance_() -> dict[str, JsonType]: + """Return a valid SLSA v1 provenance using build type gcb and sourceToBuild.""" + return _load_and_validate_json( + """ + { + "_type": "https://in-toto.io/Statement/v1", + "subject": [], + "predicateType": "https://slsa.dev/provenance/v1", + "predicate": { + "buildDefinition": { + "buildType": "https://slsa-framework.github.io/gcb-buildtypes/triggered-build/v1", + "externalParameters": { + "sourceToBuild": { + "repository": "https://github.com/oracle/macaron" + } + }, + "resolvedDependencies": [ + { + "uri": "git+https://github.com/oracle/macaron@refs/heads/staging", + "digest": { "sha1": "51aa22a42ec1bffa71518041a6a6d42d40bf50f0" } + } + ] + } + } + } + """ + ) + + +@pytest.fixture(name="slsa_v1_gcb_2_provenance") +def slsa_v1_gcb_2_provenance_() -> dict[str, JsonType]: + """Return a valid SLSA v1 provenance using build type gcb and configSource.""" + return _load_and_validate_json( + """ + { + "_type": "https://in-toto.io/Statement/v1", + "subject": [], + "predicateType": "https://slsa.dev/provenance/v1", + "predicate": { + "buildDefinition": { + "buildType": "https://slsa-framework.github.io/gcb-buildtypes/triggered-build/v1", + "externalParameters": { + "configSource": { + "repository": "https://github.com/oracle/macaron" + } + }, + "resolvedDependencies": [ + { + "uri": "git+https://github.com/oracle/macaron@refs/heads/staging", + "digest": { + "sha1": "51aa22a42ec1bffa71518041a6a6d42d40bf50f0" + } + } + ] + } + } + } + """ + ) + + +@pytest.fixture(name="slsa_v1_github_provenance") +def slsa_v1_github_provenance_() -> dict[str, JsonType]: + """Return a valid SLSA v1 provenance using build type GitHub.""" + return _load_and_validate_json( + """ + { + "_type": "https://in-toto.io/Statement/v1", + "subject": [], + "predicateType": "https://slsa.dev/provenance/v1", + "predicate": { + "buildDefinition": { + "buildType": "https://slsa-framework.github.io/github-actions-buildtypes/workflow/v1", + "externalParameters": { + "workflow": { + "repository": "https://github.com/oracle/macaron" + } + }, + "resolvedDependencies": [ + { + "uri": "git+https://github.com/oracle/macaron@refs/heads/staging", + "digest": { + "gitCommit": "51aa22a42ec1bffa71518041a6a6d42d40bf50f0" + } + }, + { + "uri": "git+https://github.com/oracle-samples/macaron@refs/heads/main" + } + ] + } + } + } + """ + ) + + +@pytest.fixture(name="slsa_v02_provenance") +def slsa_v02_provenance_() -> dict[str, JsonType]: + """Return a valid SLSA v02 provenance.""" + return _load_and_validate_json( + """ + { + "_type": "https://in-toto.io/Statement/v0.1", + "subject": [], + "predicateType": "https://slsa.dev/provenance/v0.2", + "predicate": { + "invocation": { + "configSource": { + "uri": "git+https://github.com/oracle/macaron@refs/heads/staging", + "digest": { + "sha1": "51aa22a42ec1bffa71518041a6a6d42d40bf50f0" + } + } + } + } + } + """ + ) + + +@pytest.fixture(name="slsa_v01_provenance") +def slsa_v01_provenance_() -> dict[str, JsonType]: + """Return a valid SLSA v01 provenance.""" + return _load_and_validate_json( + """ + { + "_type": "https://in-toto.io/Statement/v0.1", + "subject": [], + "predicateType": "https://slsa.dev/provenance/v0.1", + "predicate": { + "recipe": { + "definedInMaterial": 1 + }, + "materials": [ + { + "uri": "git+https://github.com/oracle-samples/macaron@refs/heads/main" + }, + { + "uri": "git+https://github.com/oracle/macaron@refs/heads/main", + "digest": { + "sha1": "51aa22a42ec1bffa71518041a6a6d42d40bf50f0" + } + } + ] + } + } + """ + ) + + +@pytest.fixture(name="witness_gitlab_provenance") +def witness_gitlab_provenance_() -> dict[str, JsonType]: + """Return a Witness v0.1 provenance with a GitLab attestation.""" + return _load_and_validate_json( + """ + { + "_type": "https://in-toto.io/Statement/v0.1", + "subject": [], + "predicateType": "https://witness.testifysec.com/attestation-collection/v0.1", + "predicate": { + "name": "test", + "attestations": [ + { + "type": "https://witness.dev/attestations/gitlab/v0.1", + "attestation": { + "projecturl": "https://gitlab.com/tinyMediaManager/tinyMediaManager" + } + }, + { + "type": "https://witness.dev/attestations/git/v0.1", + "attestation": { + "commithash": "cf6080a92d1c748ba5f05ea16529e05e5c641a49" + } + } + ] + } + } + """ + ) + + +@pytest.fixture(name="witness_github_provenance") +def witness_github_provenance_() -> dict[str, JsonType]: + """Return a Witness v0.1 provenance with a GitHub attestation.""" + return _load_and_validate_json( + """ + { + "_type": "https://in-toto.io/Statement/v0.1", + "subject": [], + "predicateType": "https://witness.testifysec.com/attestation-collection/v0.1", + "predicate": { + "name": "test", + "attestations": [ + { + "type": "https://witness.dev/attestations/github/v0.1", + "attestation": { + "projecturl": "https://github.com/oracle/macaron" + } + }, + { + "type": "https://witness.dev/attestations/git/v0.1", + "attestation": { + "commithash": "51aa22a42ec1bffa71518041a6a6d42d40bf50f0" + } + } + ] + } + } + """ + ) + + +@pytest.fixture(name="target_repository") +def target_repository_() -> str: + """Return the target repository URL.""" + return "https://github.com/oracle/macaron" + + +@pytest.fixture(name="target_commit") +def target_commit_() -> str: + """Return the target commit hash.""" + return "51aa22a42ec1bffa71518041a6a6d42d40bf50f0" + + +def test_slsa_v1_gcb_1_is_valid( + slsa_v1_gcb_1_provenance: dict[str, JsonType], target_repository: str, target_commit: str +) -> None: + """Test valid SLSA v1 provenance with build type gcb and sourceToBuild.""" + _test_extract_repo_and_commit_from_provenance(slsa_v1_gcb_1_provenance, target_repository, target_commit) + + +@pytest.mark.parametrize( + ("keys", "new_value"), + [ + (["predicate", "buildDefinition", "externalParameters", "sourceToBuild", "repository"], ""), + (["predicate", "buildDefinition", "externalParameters", "sourceToBuild", "repository"], None), + (["predicate", "buildDefinition", "externalParameters", "sourceToBuild", "repository"], "bad_url"), + (["predicate", "buildDefinition", "resolvedDependencies"], ""), + (["predicate", "buildDefinition", "resolvedDependencies"], None), + ], +) +def test_slsa_v1_gcb_1_is_invalid( + slsa_v1_gcb_1_provenance: dict[str, JsonType], keys: list[str], new_value: JsonType +) -> None: + """Test invalidly modified SLSA v1 provenance with build type gcb and sourceToBuild.""" + _json_modify(slsa_v1_gcb_1_provenance, keys, new_value) + with pytest.raises(ProvenanceError): + _test_extract_repo_and_commit_from_provenance(slsa_v1_gcb_1_provenance) + + +def test_slsa_v1_gcb_2_is_valid( + slsa_v1_gcb_2_provenance: dict[str, JsonType], target_repository: str, target_commit: str +) -> None: + """Test valid SLSA v1 provenance with build type gcb and configSource.""" + _test_extract_repo_and_commit_from_provenance(slsa_v1_gcb_2_provenance, target_repository, target_commit) + + +@pytest.mark.parametrize( + ("keys", "new_value"), + [ + (["predicate", "buildDefinition", "externalParameters", "configSource", "repository"], ""), + (["predicate", "buildDefinition", "externalParameters", "configSource", "repository"], None), + (["predicate", "buildDefinition", "externalParameters", "configSource", "repository"], "bad_url"), + ], +) +def test_slsa_v1_gcb_2_is_invalid( + slsa_v1_gcb_2_provenance: dict[str, JsonType], keys: list[str], new_value: JsonType +) -> None: + """Test invalidly modified SLSA v1 provenance with build type gcb and configSource.""" + _json_modify(slsa_v1_gcb_2_provenance, keys, new_value) + with pytest.raises(ProvenanceError): + _test_extract_repo_and_commit_from_provenance(slsa_v1_gcb_2_provenance) + + +def test_slsa_v1_github_is_valid( + slsa_v1_github_provenance: dict[str, JsonType], target_repository: str, target_commit: str +) -> None: + """Test valid SLSA v1 provenance with build type GitHub.""" + _test_extract_repo_and_commit_from_provenance(slsa_v1_github_provenance, target_repository, target_commit) + + +@pytest.mark.parametrize( + ("keys", "new_value"), + [ + (["predicate", "buildDefinition", "externalParameters", "workflow", "repository"], ""), + (["predicate", "buildDefinition", "externalParameters", "workflow", "repository"], None), + (["predicate", "buildDefinition", "externalParameters", "workflow", "repository"], "bad_url"), + ], +) +def test_slsa_v1_github_is_invalid( + slsa_v1_github_provenance: dict[str, JsonType], keys: list[str], new_value: JsonType +) -> None: + """Test invalidly modified SLSA v1 provenance with build type GitHub.""" + _json_modify(slsa_v1_github_provenance, keys, new_value) + with pytest.raises(ProvenanceError): + _test_extract_repo_and_commit_from_provenance(slsa_v1_github_provenance) + + +def test_slsa_v02_is_valid( + slsa_v02_provenance: dict[str, JsonType], target_repository: str, target_commit: str +) -> None: + """Test SLSA v0.2 provenance.""" + _test_extract_repo_and_commit_from_provenance(slsa_v02_provenance, target_repository, target_commit) + + +@pytest.mark.parametrize( + ("keys", "new_value"), + [ + (["predicate", "invocation", "configSource", "uri"], ""), + (["predicate", "invocation", "configSource", "uri"], None), + (["predicate", "invocation", "configSource", "uri"], "bad_url"), + (["predicate", "invocation", "configSource", "digest", "sha1"], ""), + (["predicate", "invocation", "configSource", "digest", "sha1"], None), + ], +) +def test_slsa_v02_is_invalid(slsa_v02_provenance: dict[str, JsonType], keys: list[str], new_value: JsonType) -> None: + """Test invalidly modified SLSA v0.2 provenance.""" + _json_modify(slsa_v02_provenance, keys, new_value) + with pytest.raises(ProvenanceError): + _test_extract_repo_and_commit_from_provenance(slsa_v02_provenance) + + +def test_slsa_v01_is_valid( + slsa_v01_provenance: dict[str, JsonType], target_repository: str, target_commit: str +) -> None: + """Test valid SLSA v0.1 provenance.""" + _test_extract_repo_and_commit_from_provenance(slsa_v01_provenance, target_repository, target_commit) + + +@pytest.mark.parametrize( + "new_value", + [ + "", + None, + ], +) +def test_slsa_v01_is_invalid(slsa_v01_provenance: dict[str, JsonType], new_value: JsonType) -> None: + """Test invalidly modified SLSA v0.1 provenance.""" + materials = json_extract(slsa_v01_provenance, ["predicate", "materials"], list) + material_index = json_extract(slsa_v01_provenance, ["predicate", "recipe", "definedInMaterial"], int) + _json_modify(materials[material_index], ["uri"], new_value) + with pytest.raises(ProvenanceError): + _test_extract_repo_and_commit_from_provenance(slsa_v01_provenance) + + +def test_slsa_v01_invalid_material_index(slsa_v01_provenance: dict[str, JsonType]) -> None: + """Test the SLSA v0.1 provenance with an invalid materials index.""" + _json_modify(slsa_v01_provenance, ["predicate", "recipe", "definedInMaterial"], 10) + with pytest.raises(ProvenanceError): + _test_extract_repo_and_commit_from_provenance(slsa_v01_provenance) + + +def test_witness_gitlab_is_valid(witness_gitlab_provenance: dict[str, JsonType]) -> None: + """Test valid Witness v0.1 GitLab provenance.""" + _test_extract_repo_and_commit_from_provenance( + witness_gitlab_provenance, + "https://gitlab.com/tinyMediaManager/tinyMediaManager", + "cf6080a92d1c748ba5f05ea16529e05e5c641a49", + ) + + +def test_witness_github_is_valid( + witness_github_provenance: dict[str, JsonType], target_repository: str, target_commit: str +) -> None: + """Test valid Witness v0.1 GitHub provenance.""" + _test_extract_repo_and_commit_from_provenance(witness_github_provenance, target_repository, target_commit) + + +@pytest.mark.parametrize( + ("keys", "new_value", "attestation_index"), + [ + (["attestation", "projecturl"], "", 0), + (["attestation", "projecturl"], None, 0), + (["attestation", "commithash"], "", 1), + (["attestation", "commithash"], None, 1), + ], +) +def test_witness_github_is_invalid( + witness_github_provenance: dict[str, JsonType], keys: list[str], new_value: JsonType, attestation_index: int +) -> None: + """Test invalidly modified Witness v0.1 GitHub provenance.""" + attestations = json_extract(witness_github_provenance, ["predicate", "attestations"], list) + _json_modify(attestations[attestation_index], keys, new_value) + with pytest.raises(ProvenanceError): + _test_extract_repo_and_commit_from_provenance(witness_github_provenance) + + +def test_witness_github_remove_attestation(witness_github_provenance: dict[str, JsonType]) -> None: + """Test removing Git attestation from Witness V0.1 GitHub provenance.""" + attestations = json_extract(witness_github_provenance, ["predicate", "attestations"], list) + _json_modify(witness_github_provenance, ["predicate", "attestations"], attestations[:1]) + with pytest.raises(ProvenanceError): + _test_extract_repo_and_commit_from_provenance(witness_github_provenance) + + +@pytest.mark.parametrize( + ("type_", "predicate_type"), + [ + ("https://in-toto.io/Statement/v0.1", "https://slsa.dev/provenance/v1"), + ("https://in-toto.io/Statement/v1", "https://slsa.dev/provenance/v0.2"), + ("https://in-toto.io/Statement/v1", "https://slsa.dev/provenance/v0.1"), + ("https://in-toto.io/Statement/v1", "https://witness.testifysec.com/attestation-collection/v0.1"), + ], +) +def test_invalid_type_payloads(type_: str, predicate_type: str) -> None: + """Test payloads with invalid type combinations.""" + payload: dict[str, JsonType] = {"_type": type_, "predicateType": predicate_type, "subject": [], "predicate": {}} + with pytest.raises(ProvenanceError): + _test_extract_repo_and_commit_from_provenance(payload) + + +def _test_extract_repo_and_commit_from_provenance( + payload: dict[str, JsonType], expected_repo: str = "", expected_commit: str = "" +) -> None: + """Accept a provenance and extraction function, assert the extracted values match the expected ones.""" + provenance = validate_intoto_payload(payload) + repo, commit = extract_repo_and_commit_from_provenance(provenance) + assert expected_repo == repo + assert expected_commit == commit + + +def _json_modify(entry: JsonType, keys: list[str], new_value: JsonType) -> None: + """Modify the value found by following the list of depth-sequential keys inside the passed JSON dictionary. + + The found value will be overwritten by the `new_value` parameter. + If `new_value` is `None`, the value will be removed. + If the final key does not exist, it will be created as `new_value`. + """ + target: dict[str, JsonType] = json_extract(entry, keys[:-1], dict) + + if new_value is None: + del target[keys[-1]] + else: + target[keys[-1]] = new_value + + +def _load_and_validate_json(payload: str) -> dict[str, JsonType]: + """Load payload as JSON and validate it is of type dict.""" + json_payload = json.loads(payload) + assert isinstance(json_payload, dict) + return json_payload diff --git a/tests/repo_finder/test_repo_finder.py b/tests/repo_finder/test_repo_finder.py index 6b724d2e2..ba0bc2b20 100644 --- a/tests/repo_finder/test_repo_finder.py +++ b/tests/repo_finder/test_repo_finder.py @@ -6,73 +6,9 @@ from pathlib import Path import pytest -from packageurl import PackageURL from macaron.config.defaults import load_defaults -from macaron.config.target_config import Configuration from macaron.repo_finder.repo_finder_java import JavaRepoFinder -from macaron.slsa_analyzer.analyzer import Analyzer - - -@pytest.mark.parametrize( - ("config", "available_domains", "expect"), - [ - ( - Configuration({"purl": ""}), - ["github.com", "gitlab.com", "bitbucket.org"], - Analyzer.AnalysisTarget(parsed_purl=None, repo_path="", branch="", digest=""), - ), - ( - Configuration({"purl": "pkg:github.com/apache/maven"}), - ["github.com", "gitlab.com", "bitbucket.org"], - Analyzer.AnalysisTarget( - parsed_purl=PackageURL.from_string("pkg:github.com/apache/maven"), - repo_path="https://github.com/apache/maven", - branch="", - digest="", - ), - ), - ( - Configuration({"purl": "", "path": "https://github.com/apache/maven"}), - ["github.com", "gitlab.com", "bitbucket.org"], - Analyzer.AnalysisTarget( - parsed_purl=None, repo_path="https://github.com/apache/maven", branch="", digest="" - ), - ), - ( - Configuration({"purl": "pkg:maven/apache/maven", "path": "https://github.com/apache/maven"}), - ["github.com", "gitlab.com", "bitbucket.org"], - Analyzer.AnalysisTarget( - parsed_purl=PackageURL.from_string("pkg:maven/apache/maven"), - repo_path="https://github.com/apache/maven", - branch="", - digest="", - ), - ), - ( - Configuration( - { - "purl": "pkg:maven/apache/maven", - "path": "https://github.com/apache/maven", - "branch": "master", - "digest": "abcxyz", - } - ), - ["github.com", "gitlab.com", "bitbucket.org"], - Analyzer.AnalysisTarget( - parsed_purl=PackageURL.from_string("pkg:maven/apache/maven"), - repo_path="https://github.com/apache/maven", - branch="master", - digest="abcxyz", - ), - ), - ], -) -def test_resolve_analysis_target( - config: Configuration, available_domains: list[str], expect: Analyzer.AnalysisTarget -) -> None: - """Test the resolve analysis target method with valid inputs.""" - assert Analyzer.to_analysis_target(config, available_domains) == expect @pytest.mark.parametrize( diff --git a/tests/slsa_analyzer/test_analyzer.py b/tests/slsa_analyzer/test_analyzer.py index d82d6676d..f4e68f321 100644 --- a/tests/slsa_analyzer/test_analyzer.py +++ b/tests/slsa_analyzer/test_analyzer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2023, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module tests the slsa_analyzer.Gh module.""" @@ -12,7 +12,7 @@ from packageurl import PackageURL from macaron.config.target_config import Configuration -from macaron.errors import InvalidPURLError +from macaron.errors import InvalidAnalysisTargetError, InvalidPURLError from macaron.slsa_analyzer.analyzer import Analyzer from ..macaron_testcase import MacaronTestCase @@ -48,11 +48,6 @@ def test_resolve_local_path(self) -> None: @pytest.mark.parametrize( ("config", "available_domains", "expect"), [ - ( - Configuration({"purl": ""}), - ["github.com", "gitlab.com", "bitbucket.org"], - Analyzer.AnalysisTarget(parsed_purl=None, repo_path="", branch="", digest=""), - ), ( Configuration({"purl": "pkg:github.com/apache/maven"}), ["github.com", "gitlab.com", "bitbucket.org"], @@ -103,7 +98,8 @@ def test_resolve_analysis_target( config: Configuration, available_domains: list[str], expect: Analyzer.AnalysisTarget ) -> None: """Test the resolve analysis target method with valid inputs.""" - assert Analyzer.to_analysis_target(config, available_domains) == expect + parsed_purl = Analyzer.parse_purl(config) + assert Analyzer.to_analysis_target(config, available_domains, parsed_purl) == expect @given( @@ -136,7 +132,8 @@ def test_invalid_analysis_target( } ) try: - Analyzer.to_analysis_target(config, available_domains) + purl = Analyzer.parse_purl(config) + Analyzer.to_analysis_target(config, available_domains, purl) except InvalidPURLError: pass @@ -151,4 +148,10 @@ def test_invalid_analysis_target( def test_resolve_analysis_target_invalid_purl(config: Configuration) -> None: """Test the resolve analysis target method with invalid inputs.""" with pytest.raises(InvalidPURLError): - Analyzer.to_analysis_target(config, []) + Analyzer.parse_purl(config) + + +def test_resolve_analysis_target_no_purl_or_repository() -> None: + """Test creation of an Analysis Target when no PURL or repository path is provided.""" + with pytest.raises(InvalidAnalysisTargetError): + Analyzer.to_analysis_target(Configuration(), [], None)