From 34a69a0b3e387578c11aa83399e0a8c208a0e172 Mon Sep 17 00:00:00 2001 From: Amine Date: Mon, 16 Jun 2025 21:56:27 +0100 Subject: [PATCH 1/4] feat(heuristics): add Fake Email analyzer to validate maintainer email domains Signed-off-by: Amine --- pyproject.toml | 1 + .../pypi_heuristics/heuristics.py | 3 + .../pypi_heuristics/metadata/fake_email.py | 96 ++++++++++ .../checks/detect_malicious_metadata_check.py | 7 + .../malware_analyzer/pypi/test_fake_email.py | 166 ++++++++++++++++++ 5 files changed, 273 insertions(+) create mode 100644 src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py create mode 100644 tests/malware_analyzer/pypi/test_fake_email.py diff --git a/pyproject.toml b/pyproject.toml index 74705364b..4b8cf02ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "problog >= 2.2.6,<3.0.0", "cryptography >=44.0.0,<45.0.0", "semgrep == 1.113.0", + "dnspython >=2.7.0,<3.0.0", ] keywords = [] # https://pypi.org/classifiers/ diff --git a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py index eebce5764..c37f763a5 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py @@ -43,6 +43,9 @@ class Heuristics(str, Enum): #: Indicates that the package source code contains suspicious code patterns. SUSPICIOUS_PATTERNS = "suspicious_patterns" + #: Indicates that the package maintainer's email address is suspicious or invalid. + FAKE_EMAIL = "fake_email" + class HeuristicResult(str, Enum): """Result type indicating the outcome of a heuristic.""" diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py new file mode 100644 index 000000000..e29fe123e --- /dev/null +++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py @@ -0,0 +1,96 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""The heuristic analyzer to check the email address of the package maintainers.""" + +import logging +import re + +import dns.resolver as dns_resolver + +from macaron.errors import HeuristicAnalyzerValueError +from macaron.json_tools import JsonType +from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + +logger: logging.Logger = logging.getLogger(__name__) + + +class FakeEmailAnalyzer(BaseHeuristicAnalyzer): + """Analyze the email address of the package maintainers.""" + + def __init__(self) -> None: + super().__init__( + name="fake_email_analyzer", + heuristic=Heuristics.FAKE_EMAIL, + depends_on=None, + ) + + def is_valid_email(self, email: str) -> bool: + """Check if the email format is valid and the domain has MX records. + + Parameters + ---------- + email: str + The email address to check. + + Returns + ------- + bool: + ``True`` if the email address is valid, ``False`` otherwise. + + Raises + ------ + HeuristicAnalyzerValueError + if the failure is due to DNS resolution. + """ + if not re.match(r"[^@]+@[^@]+\.[^@]+", email): + return False + + domain = email.split("@")[1] + try: + records = dns_resolver.resolve(domain, "MX") + if not records: + return False + except Exception as err: + err_message = f"Failed to resolve domain {domain}: {err}" + raise HeuristicAnalyzerValueError(err_message) from err + return True + + def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: + """Analyze the package. + + Parameters + ---------- + pypi_package_json: PyPIPackageJsonAsset + The PyPI package JSON asset object. + + Returns + ------- + tuple[HeuristicResult, dict[str, JsonType]]: + The result and related information collected during the analysis. + + Raises + ------ + HeuristicAnalyzerValueError + if the analysis fails. + """ + response = pypi_package_json.download("") + if not response: + error_message = "Failed to download package JSON " + return HeuristicResult.FAIL, {"message": error_message} + + data = pypi_package_json.package_json + author_email = data.get("info", {}).get("author_email", None) + maintainer_email = data.get("info", {}).get("maintainer_email", None) + if maintainer_email is None and author_email is None: + message = "No maintainers are available" + return HeuristicResult.SKIP, {"message": message} + + if author_email is not None and not self.is_valid_email(author_email): + return HeuristicResult.FAIL, {"email": author_email} + if maintainer_email is not None and not self.is_valid_email(maintainer_email): + return HeuristicResult.FAIL, {"email": maintainer_email} + + return HeuristicResult.PASS, {} diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index 8514a458d..646f7acc3 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -20,6 +20,7 @@ from macaron.malware_analyzer.pypi_heuristics.metadata.anomalous_version import AnomalousVersionAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer +from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer @@ -358,6 +359,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: WheelAbsenceAnalyzer, AnomalousVersionAnalyzer, TyposquattingPresenceAnalyzer, + FakeEmailAnalyzer, ] # name used to query the result of all problog rules, so it can be accessed outside the model. @@ -425,6 +427,10 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: failed({Heuristics.ONE_RELEASE.value}), failed({Heuristics.ANOMALOUS_VERSION.value}). + % Package released recently with the a maintainer email address that is not valid. + {Confidence.MEDIUM.value}::trigger(malware_medium_confidence_3) :- + quickUndetailed, + failed({Heuristics.FAKE_EMAIL.value}). % ----- Evaluation ----- % Aggregate result @@ -432,6 +438,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: {problog_result_access} :- trigger(malware_high_confidence_2). {problog_result_access} :- trigger(malware_high_confidence_3). {problog_result_access} :- trigger(malware_high_confidence_4). + {problog_result_access} :- trigger(malware_medium_confidence_3). {problog_result_access} :- trigger(malware_medium_confidence_2). {problog_result_access} :- trigger(malware_medium_confidence_1). query({problog_result_access}). diff --git a/tests/malware_analyzer/pypi/test_fake_email.py b/tests/malware_analyzer/pypi/test_fake_email.py new file mode 100644 index 000000000..aa080a6b8 --- /dev/null +++ b/tests/malware_analyzer/pypi/test_fake_email.py @@ -0,0 +1,166 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""Tests for the FakeEmailAnalyzer heuristic.""" + + +from collections.abc import Generator +from unittest.mock import MagicMock, patch + +import pytest + +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult +from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + + +@pytest.fixture(name="analyzer") +def analyzer_fixture() -> FakeEmailAnalyzer: + """Pytest fixture to create a FakeEmailAnalyzer instance.""" + return FakeEmailAnalyzer() + + +@pytest.fixture(name="pypi_package_json_asset_mock") +def pypi_package_json_asset_mock_fixture() -> MagicMock: + """Pytest fixture for a mock PyPIPackageJsonAsset.""" + mock_asset = MagicMock(spec=PyPIPackageJsonAsset) + # Default to successful download, tests can override + mock_asset.download = MagicMock(return_value=True) + # package_json should be set by each test to simulate different PyPI responses + mock_asset.package_json = {} + return mock_asset + + +@pytest.fixture(name="mock_dns_resolve") +def mock_dns_resolve_fixture() -> Generator[MagicMock]: + """General purpose mock for dns.resolver.resolve. + + Patches where dns_resolver is imported in the module under test. + """ + with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.dns_resolver.resolve") as mock_resolve: + # Default behavior: simulate successful MX record lookup. + mock_mx_record = MagicMock() + mock_mx_record.exchange = "mail.default-domain.com" + mock_resolve.return_value = [mock_mx_record] + yield mock_resolve + + +# Tests for the analyze method +def test_analyze_download_failure(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test the analyzer fails if downloading package JSON fails.""" + pypi_package_json_asset_mock.download.return_value = False + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.FAIL + assert "message" in info + assert isinstance(info["message"], str) + assert "Failed to download package JSON" in info["message"] + pypi_package_json_asset_mock.download.assert_called_once_with("") + + +def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test the analyzer skips if no author_email or maintainer_email is present.""" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}} + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.SKIP + assert info["message"] == "No maintainers are available" + + +def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test the analyzer skips if 'info' key is missing in PyPI data.""" + pypi_package_json_asset_mock.package_json = {} # No 'info' key + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.SKIP + assert info["message"] == "No maintainers are available" + + +def test_analyze_fail_empty_author_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test analyzer fails for empty author_email string (maintainer_email is None).""" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": "", "maintainer_email": None}} + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.FAIL + assert info["email"] == "" + + +def test_analyze_pass_only_maintainer_email_valid( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock +) -> None: + """Test analyzer passes when only maintainer_email is present and valid.""" + mock_mx_record = MagicMock() + mock_mx_record.exchange = "mail.example.net" + mock_dns_resolve.return_value = [mock_mx_record] + + pypi_package_json_asset_mock.package_json = { + "info": {"author_email": None, "maintainer_email": "maintainer@example.net"} + } + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.PASS + assert info == {} + mock_dns_resolve.assert_called_once_with("example.net", "MX") + + +def test_analyze_pass_both_emails_valid( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock +) -> None: + """Test the analyzer passes when both emails are present and valid.""" + + def side_effect_dns_resolve(domain: str, record_type: str = "MX") -> list[MagicMock]: + mock_mx = MagicMock() + domains = { + "MX": {"example.com", "example.net"}, + } + if domain not in domains.get(record_type, set()): + pytest.fail(f"Unexpected domain for DNS resolve: {domain}") + mock_mx.exchange = f"mail.{domain}" + return [mock_mx] + + mock_dns_resolve.side_effect = side_effect_dns_resolve + + pypi_package_json_asset_mock.package_json = { + "info": {"author_email": "author@example.com", "maintainer_email": "maintainer@example.net"} + } + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.PASS + assert info == {} + assert mock_dns_resolve.call_count == 2 + mock_dns_resolve.assert_any_call("example.com", "MX") + mock_dns_resolve.assert_any_call("example.net", "MX") + + +def test_analyze_fail_author_email_invalid_format( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock +) -> None: + """Test analyzer fails when author_email has an invalid format.""" + pypi_package_json_asset_mock.package_json = { + "info": {"author_email": "bad_email_format", "maintainer_email": "maintainer@example.net"} + } + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.FAIL + assert info["email"] == "bad_email_format" + mock_dns_resolve.assert_not_called() # Regex check fails before DNS lookup + + +# Tests for the is_valid_email method +def test_is_valid_email_valid_email_with_mx(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: + """Test is_valid_email returns True for a valid email with MX records.""" + mock_mx_record = MagicMock() + mock_mx_record.exchange = "mail.example.com" + mock_dns_resolve.return_value = [mock_mx_record] + assert analyzer.is_valid_email("test@example.com") is True + mock_dns_resolve.assert_called_once_with("example.com", "MX") + + +def test_is_valid_email_invalid_format(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: + """Test is_valid_email method with various invalid email formats.""" + assert not analyzer.is_valid_email("not_an_email") + assert not analyzer.is_valid_email("test@") + assert not analyzer.is_valid_email("@example.com") + assert not analyzer.is_valid_email("test@example") + assert not analyzer.is_valid_email("") + mock_dns_resolve.assert_not_called() + + +def test_is_valid_email_no_mx_records_returned(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: + """Test is_valid_email returns False if DNS resolve returns no MX records.""" + mock_dns_resolve.return_value = [] # Simulate no MX records found + assert analyzer.is_valid_email("test@no-mx-domain.com") is False + mock_dns_resolve.assert_called_once_with("no-mx-domain.com", "MX") From 18715d1a99ca99b0210dc650c3211fae47feebc1 Mon Sep 17 00:00:00 2001 From: Amine Date: Sun, 29 Jun 2025 12:21:30 +0100 Subject: [PATCH 2/4] refactor: remove redundant package download Signed-off-by: Amine --- .../pypi_heuristics/metadata/fake_email.py | 5 ----- tests/malware_analyzer/pypi/test_fake_email.py | 11 ----------- 2 files changed, 16 deletions(-) diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py index e29fe123e..79ee5f00d 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py @@ -76,11 +76,6 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes HeuristicAnalyzerValueError if the analysis fails. """ - response = pypi_package_json.download("") - if not response: - error_message = "Failed to download package JSON " - return HeuristicResult.FAIL, {"message": error_message} - data = pypi_package_json.package_json author_email = data.get("info", {}).get("author_email", None) maintainer_email = data.get("info", {}).get("maintainer_email", None) diff --git a/tests/malware_analyzer/pypi/test_fake_email.py b/tests/malware_analyzer/pypi/test_fake_email.py index aa080a6b8..27867b55d 100644 --- a/tests/malware_analyzer/pypi/test_fake_email.py +++ b/tests/malware_analyzer/pypi/test_fake_email.py @@ -46,17 +46,6 @@ def mock_dns_resolve_fixture() -> Generator[MagicMock]: # Tests for the analyze method -def test_analyze_download_failure(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: - """Test the analyzer fails if downloading package JSON fails.""" - pypi_package_json_asset_mock.download.return_value = False - result, info = analyzer.analyze(pypi_package_json_asset_mock) - assert result == HeuristicResult.FAIL - assert "message" in info - assert isinstance(info["message"], str) - assert "Failed to download package JSON" in info["message"] - pypi_package_json_asset_mock.download.assert_called_once_with("") - - def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: """Test the analyzer skips if no author_email or maintainer_email is present.""" pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}} From 59f4c61b7e2f0a2ab26584644530aa24206056f0 Mon Sep 17 00:00:00 2001 From: Amine Date: Sat, 5 Jul 2025 19:55:48 +0100 Subject: [PATCH 3/4] docs: update documentation Signed-off-by: Amine --- src/macaron/malware_analyzer/README.md | 5 +++++ src/macaron/slsa_analyzer/build_tool/gradle.py | 4 ++-- src/macaron/slsa_analyzer/build_tool/maven.py | 4 ++-- src/macaron/slsa_analyzer/build_tool/pip.py | 4 ++-- src/macaron/slsa_analyzer/build_tool/poetry.py | 4 ++-- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/macaron/malware_analyzer/README.md b/src/macaron/malware_analyzer/README.md index 7aeda9417..07faa91cf 100644 --- a/src/macaron/malware_analyzer/README.md +++ b/src/macaron/malware_analyzer/README.md @@ -56,6 +56,11 @@ When a heuristic fails, with `HeuristicResult.FAIL`, then that is an indicator b - **Description**: Checks if the package name is suspiciously similar to any package name in a predefined list of popular packages. The similarity check incorporates the Jaro-Winkler distance and considers keyboard layout proximity to identify potential typosquatting. - **Rule**: Return `HeuristicResult.FAIL` if the similarity ratio between the package name and any popular package name meets or exceeds a defined threshold; otherwise, return `HeuristicResult.PASS`. - **Dependency**: None. + +11. **Fake Email** + - **Description**: Checks if the package maintainer or author has a suspicious or invalid email . + - **Rule**: Return `HeuristicResult.FAIL` if the email format is invalid or the email domain has no MX records ; otherwise, return `HeuristicResult.PASS`. + - **Dependency**: None. ### Source Code Analysis with Semgrep **PyPI Source Code Analyzer** - **Description**: Uses Semgrep, with default rules written in `src/macaron/resources/pypi_malware_rules` and custom rules available by supplying a path to `custom_semgrep_rules` in `defaults.ini`, to scan the package `.tar` source code. diff --git a/src/macaron/slsa_analyzer/build_tool/gradle.py b/src/macaron/slsa_analyzer/build_tool/gradle.py index 2cc491934..607e98579 100644 --- a/src/macaron/slsa_analyzer/build_tool/gradle.py +++ b/src/macaron/slsa_analyzer/build_tool/gradle.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Gradle class which inherits BaseBuildTool. @@ -122,7 +122,7 @@ def get_dep_analyzer(self) -> CycloneDxGradle: raise DependencyAnalyzerError("No default dependency analyzer is found.") if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_gradle")): raise DependencyAnalyzerError( - f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.", + f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.", ) tool_name, tool_version = tuple( diff --git a/src/macaron/slsa_analyzer/build_tool/maven.py b/src/macaron/slsa_analyzer/build_tool/maven.py index 69323ad9c..e6c11c13e 100644 --- a/src/macaron/slsa_analyzer/build_tool/maven.py +++ b/src/macaron/slsa_analyzer/build_tool/maven.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Maven class which inherits BaseBuildTool. @@ -116,7 +116,7 @@ def get_dep_analyzer(self) -> CycloneDxMaven: raise DependencyAnalyzerError("No default dependency analyzer is found.") if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_maven")): raise DependencyAnalyzerError( - f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_maven')} is not valid.", + f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_maven')} is not valid.", ) tool_name, tool_version = tuple( diff --git a/src/macaron/slsa_analyzer/build_tool/pip.py b/src/macaron/slsa_analyzer/build_tool/pip.py index 5abf0c0ba..c0e970ab9 100644 --- a/src/macaron/slsa_analyzer/build_tool/pip.py +++ b/src/macaron/slsa_analyzer/build_tool/pip.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Pip class which inherits BaseBuildTool. @@ -88,7 +88,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer: tool_name = "cyclonedx_py" if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"): raise DependencyAnalyzerError( - f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.", + f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.", ) return CycloneDxPython( resources_path=global_config.resources_path, diff --git a/src/macaron/slsa_analyzer/build_tool/poetry.py b/src/macaron/slsa_analyzer/build_tool/poetry.py index eeb54216b..54e3899f1 100644 --- a/src/macaron/slsa_analyzer/build_tool/poetry.py +++ b/src/macaron/slsa_analyzer/build_tool/poetry.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Poetry class which inherits BaseBuildTool. @@ -126,7 +126,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer: tool_name = "cyclonedx_py" if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"): raise DependencyAnalyzerError( - f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.", + f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.", ) return CycloneDxPython( resources_path=global_config.resources_path, From d99495c9761a2fa4db18feb0e9295eebae37cb85 Mon Sep 17 00:00:00 2001 From: Amine Date: Sat, 12 Jul 2025 16:12:40 +0100 Subject: [PATCH 4/4] refactor(fake-email): replace dns_resolver with email-validator for email domain validation Signed-off-by: Amine --- pyproject.toml | 2 +- src/macaron/malware_analyzer/README.md | 4 +- .../pypi_heuristics/metadata/fake_email.py | 61 ++++--- .../malware_analyzer/pypi/test_fake_email.py | 161 ++++++++---------- 4 files changed, 107 insertions(+), 121 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4b8cf02ad..e3f36d626 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,7 @@ dependencies = [ "problog >= 2.2.6,<3.0.0", "cryptography >=44.0.0,<45.0.0", "semgrep == 1.113.0", - "dnspython >=2.7.0,<3.0.0", + "email_validator >=2.2.0", ] keywords = [] # https://pypi.org/classifiers/ diff --git a/src/macaron/malware_analyzer/README.md b/src/macaron/malware_analyzer/README.md index 07faa91cf..d3acff68d 100644 --- a/src/macaron/malware_analyzer/README.md +++ b/src/macaron/malware_analyzer/README.md @@ -58,8 +58,8 @@ When a heuristic fails, with `HeuristicResult.FAIL`, then that is an indicator b - **Dependency**: None. 11. **Fake Email** - - **Description**: Checks if the package maintainer or author has a suspicious or invalid email . - - **Rule**: Return `HeuristicResult.FAIL` if the email format is invalid or the email domain has no MX records ; otherwise, return `HeuristicResult.PASS`. + - **Description**: Checks if the package maintainer or author has a suspicious or invalid email. + - **Rule**: Return `HeuristicResult.FAIL` if the email is invalid; otherwise, return `HeuristicResult.PASS`. - **Dependency**: None. ### Source Code Analysis with Semgrep **PyPI Source Code Analyzer** diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py index 79ee5f00d..8899e24b6 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py @@ -4,12 +4,10 @@ """The heuristic analyzer to check the email address of the package maintainers.""" import logging -import re -import dns.resolver as dns_resolver +from email_validator import EmailNotValidError, ValidatedEmail, validate_email -from macaron.errors import HeuristicAnalyzerValueError -from macaron.json_tools import JsonType +from macaron.json_tools import JsonType, json_extract from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset @@ -27,7 +25,7 @@ def __init__(self) -> None: depends_on=None, ) - def is_valid_email(self, email: str) -> bool: + def is_valid_email(self, email: str) -> ValidatedEmail | None: """Check if the email format is valid and the domain has MX records. Parameters @@ -37,26 +35,21 @@ def is_valid_email(self, email: str) -> bool: Returns ------- - bool: - ``True`` if the email address is valid, ``False`` otherwise. + ValidatedEmail | None + The validated email object if the email is valid, otherwise None. Raises ------ HeuristicAnalyzerValueError if the failure is due to DNS resolution. """ - if not re.match(r"[^@]+@[^@]+\.[^@]+", email): - return False - - domain = email.split("@")[1] + emailinfo = None try: - records = dns_resolver.resolve(domain, "MX") - if not records: - return False - except Exception as err: - err_message = f"Failed to resolve domain {domain}: {err}" - raise HeuristicAnalyzerValueError(err_message) from err - return True + emailinfo = validate_email(email, check_deliverability=True) + except EmailNotValidError as err: + err_message = f"Invalid email address: {email}. Error: {err}" + logger.warning(err_message) + return emailinfo def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: """Analyze the package. @@ -76,16 +69,22 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes HeuristicAnalyzerValueError if the analysis fails. """ - data = pypi_package_json.package_json - author_email = data.get("info", {}).get("author_email", None) - maintainer_email = data.get("info", {}).get("maintainer_email", None) - if maintainer_email is None and author_email is None: - message = "No maintainers are available" - return HeuristicResult.SKIP, {"message": message} - - if author_email is not None and not self.is_valid_email(author_email): - return HeuristicResult.FAIL, {"email": author_email} - if maintainer_email is not None and not self.is_valid_email(maintainer_email): - return HeuristicResult.FAIL, {"email": maintainer_email} - - return HeuristicResult.PASS, {} + package_json = pypi_package_json.package_json + author_email = json_extract(package_json, ["info", "author_email"], str) + maintainer_email = json_extract(package_json, ["info", "maintainer_email"], str) + + if not author_email and not maintainer_email: + return HeuristicResult.SKIP, {"message": "No author or maintainer email available."} + + validated_emails: list[JsonType] = [] + details = ["normalized", "local_part", "domain"] + + for email in [author_email, maintainer_email]: + if email: + email_info = self.is_valid_email(email) + if not email_info: + return HeuristicResult.FAIL, {"email": email} + + validated_emails.append({key: getattr(email_info, key) for key in details}) + + return HeuristicResult.PASS, {"validated_emails": validated_emails} diff --git a/tests/malware_analyzer/pypi/test_fake_email.py b/tests/malware_analyzer/pypi/test_fake_email.py index 27867b55d..fc12e9d01 100644 --- a/tests/malware_analyzer/pypi/test_fake_email.py +++ b/tests/malware_analyzer/pypi/test_fake_email.py @@ -8,6 +8,7 @@ from unittest.mock import MagicMock, patch import pytest +from email_validator import EmailNotValidError from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer @@ -15,7 +16,7 @@ @pytest.fixture(name="analyzer") -def analyzer_fixture() -> FakeEmailAnalyzer: +def analyzer_() -> FakeEmailAnalyzer: """Pytest fixture to create a FakeEmailAnalyzer instance.""" return FakeEmailAnalyzer() @@ -24,34 +25,23 @@ def analyzer_fixture() -> FakeEmailAnalyzer: def pypi_package_json_asset_mock_fixture() -> MagicMock: """Pytest fixture for a mock PyPIPackageJsonAsset.""" mock_asset = MagicMock(spec=PyPIPackageJsonAsset) - # Default to successful download, tests can override - mock_asset.download = MagicMock(return_value=True) - # package_json should be set by each test to simulate different PyPI responses mock_asset.package_json = {} return mock_asset -@pytest.fixture(name="mock_dns_resolve") -def mock_dns_resolve_fixture() -> Generator[MagicMock]: - """General purpose mock for dns.resolver.resolve. +@pytest.fixture(name="mock_validate_email") +def mock_validate_email_fixture() -> Generator[MagicMock]: + """Patch validate_email and mock its behavior.""" + with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.validate_email") as mock: + yield mock - Patches where dns_resolver is imported in the module under test. - """ - with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.dns_resolver.resolve") as mock_resolve: - # Default behavior: simulate successful MX record lookup. - mock_mx_record = MagicMock() - mock_mx_record.exchange = "mail.default-domain.com" - mock_resolve.return_value = [mock_mx_record] - yield mock_resolve - -# Tests for the analyze method def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: """Test the analyzer skips if no author_email or maintainer_email is present.""" pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}} result, info = analyzer.analyze(pypi_package_json_asset_mock) assert result == HeuristicResult.SKIP - assert info["message"] == "No maintainers are available" + assert info["message"] == "No author or maintainer email available." def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: @@ -59,97 +49,94 @@ def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json pypi_package_json_asset_mock.package_json = {} # No 'info' key result, info = analyzer.analyze(pypi_package_json_asset_mock) assert result == HeuristicResult.SKIP - assert info["message"] == "No maintainers are available" + assert info["message"] == "No author or maintainer email available." + +def test_analyze_fail_invalid_email( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock +) -> None: + """Test analyzer fails for an invalid email format.""" + invalid_email = "invalid-email" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": invalid_email, "maintainer_email": None}} + mock_validate_email.side_effect = EmailNotValidError("Invalid email.") -def test_analyze_fail_empty_author_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: - """Test analyzer fails for empty author_email string (maintainer_email is None).""" - pypi_package_json_asset_mock.package_json = {"info": {"author_email": "", "maintainer_email": None}} result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.FAIL - assert info["email"] == "" + assert info == {"email": invalid_email} + mock_validate_email.assert_called_once_with(invalid_email, check_deliverability=True) def test_analyze_pass_only_maintainer_email_valid( - analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock ) -> None: """Test analyzer passes when only maintainer_email is present and valid.""" - mock_mx_record = MagicMock() - mock_mx_record.exchange = "mail.example.net" - mock_dns_resolve.return_value = [mock_mx_record] + email = "maintainer@example.net" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": email}} + + mock_email_info = MagicMock() + mock_email_info.normalized = "maintainer@example.net" + mock_email_info.local_part = "maintainer" + mock_email_info.domain = "example.net" + mock_validate_email.return_value = mock_email_info - pypi_package_json_asset_mock.package_json = { - "info": {"author_email": None, "maintainer_email": "maintainer@example.net"} - } result, info = analyzer.analyze(pypi_package_json_asset_mock) assert result == HeuristicResult.PASS - assert info == {} - mock_dns_resolve.assert_called_once_with("example.net", "MX") + assert info["validated_emails"] == [ + {"normalized": "maintainer@example.net", "local_part": "maintainer", "domain": "example.net"} + ] + mock_validate_email.assert_called_once_with(email, check_deliverability=True) def test_analyze_pass_both_emails_valid( - analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock ) -> None: """Test the analyzer passes when both emails are present and valid.""" - def side_effect_dns_resolve(domain: str, record_type: str = "MX") -> list[MagicMock]: - mock_mx = MagicMock() - domains = { - "MX": {"example.com", "example.net"}, - } - if domain not in domains.get(record_type, set()): - pytest.fail(f"Unexpected domain for DNS resolve: {domain}") - mock_mx.exchange = f"mail.{domain}" - return [mock_mx] + def side_effect(email: str, check_deliverability: bool) -> MagicMock: # pylint: disable=unused-argument + local_part, domain = email.split("@") + mock_email_info = MagicMock() + mock_email_info.normalized = email + mock_email_info.local_part = local_part + mock_email_info.domain = domain + return mock_email_info - mock_dns_resolve.side_effect = side_effect_dns_resolve + mock_validate_email.side_effect = side_effect pypi_package_json_asset_mock.package_json = { "info": {"author_email": "author@example.com", "maintainer_email": "maintainer@example.net"} } result, info = analyzer.analyze(pypi_package_json_asset_mock) assert result == HeuristicResult.PASS - assert info == {} - assert mock_dns_resolve.call_count == 2 - mock_dns_resolve.assert_any_call("example.com", "MX") - mock_dns_resolve.assert_any_call("example.net", "MX") - - -def test_analyze_fail_author_email_invalid_format( - analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock -) -> None: - """Test analyzer fails when author_email has an invalid format.""" - pypi_package_json_asset_mock.package_json = { - "info": {"author_email": "bad_email_format", "maintainer_email": "maintainer@example.net"} - } - result, info = analyzer.analyze(pypi_package_json_asset_mock) - assert result == HeuristicResult.FAIL - assert info["email"] == "bad_email_format" - mock_dns_resolve.assert_not_called() # Regex check fails before DNS lookup - - -# Tests for the is_valid_email method -def test_is_valid_email_valid_email_with_mx(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: - """Test is_valid_email returns True for a valid email with MX records.""" - mock_mx_record = MagicMock() - mock_mx_record.exchange = "mail.example.com" - mock_dns_resolve.return_value = [mock_mx_record] - assert analyzer.is_valid_email("test@example.com") is True - mock_dns_resolve.assert_called_once_with("example.com", "MX") - - -def test_is_valid_email_invalid_format(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: - """Test is_valid_email method with various invalid email formats.""" - assert not analyzer.is_valid_email("not_an_email") - assert not analyzer.is_valid_email("test@") - assert not analyzer.is_valid_email("@example.com") - assert not analyzer.is_valid_email("test@example") - assert not analyzer.is_valid_email("") - mock_dns_resolve.assert_not_called() - - -def test_is_valid_email_no_mx_records_returned(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: - """Test is_valid_email returns False if DNS resolve returns no MX records.""" - mock_dns_resolve.return_value = [] # Simulate no MX records found - assert analyzer.is_valid_email("test@no-mx-domain.com") is False - mock_dns_resolve.assert_called_once_with("no-mx-domain.com", "MX") + assert mock_validate_email.call_count == 2 + + validated_emails = info.get("validated_emails") + assert isinstance(validated_emails, list) + assert len(validated_emails) == 2 + assert {"normalized": "author@example.com", "local_part": "author", "domain": "example.com"} in validated_emails + assert { + "normalized": "maintainer@example.net", + "local_part": "maintainer", + "domain": "example.net", + } in validated_emails + + +def test_is_valid_email_success(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None: + """Test is_valid_email returns the validation object on success.""" + mock_validated_email = MagicMock() + mock_validated_email.normalized = "test@example.com" + mock_validated_email.local_part = "test" + mock_validated_email.domain = "example.com" + + mock_validate_email.return_value = mock_validated_email + result = analyzer.is_valid_email("test@example.com") + assert result == mock_validated_email + mock_validate_email.assert_called_once_with("test@example.com", check_deliverability=True) + + +def test_is_valid_email_failure(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None: + """Test is_valid_email returns None on failure.""" + mock_validate_email.side_effect = EmailNotValidError("The email address is not valid.") + result = analyzer.is_valid_email("invalid-email") + assert result is None + mock_validate_email.assert_called_once_with("invalid-email", check_deliverability=True)