Skip to content

feat(heuristics): add Fake Email analyzer to validate maintainer email domain #1106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"problog >= 2.2.6,<3.0.0",
"cryptography >=44.0.0,<45.0.0",
"semgrep == 1.113.0",
"email_validator >=2.2.0",
]
keywords = []
# https://pypi.org/classifiers/
Expand Down
5 changes: 5 additions & 0 deletions src/macaron/malware_analyzer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ When a heuristic fails, with `HeuristicResult.FAIL`, then that is an indicator b
- **Description**: Checks if the package name is suspiciously similar to any package name in a predefined list of popular packages. The similarity check incorporates the Jaro-Winkler distance and considers keyboard layout proximity to identify potential typosquatting.
- **Rule**: Return `HeuristicResult.FAIL` if the similarity ratio between the package name and any popular package name meets or exceeds a defined threshold; otherwise, return `HeuristicResult.PASS`.
- **Dependency**: None.

11. **Fake Email**
- **Description**: Checks if the package maintainer or author has a suspicious or invalid email.
- **Rule**: Return `HeuristicResult.FAIL` if the email is invalid; otherwise, return `HeuristicResult.PASS`.
- **Dependency**: None.
### Source Code Analysis with Semgrep
**PyPI Source Code Analyzer**
- **Description**: Uses Semgrep, with default rules written in `src/macaron/resources/pypi_malware_rules` and custom rules available by supplying a path to `custom_semgrep_rules` in `defaults.ini`, to scan the package `.tar` source code.
Expand Down
3 changes: 3 additions & 0 deletions src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class Heuristics(str, Enum):
#: Indicates that the package source code contains suspicious code patterns.
SUSPICIOUS_PATTERNS = "suspicious_patterns"

#: Indicates that the package maintainer's email address is suspicious or invalid.
FAKE_EMAIL = "fake_email"


class HeuristicResult(str, Enum):
"""Result type indicating the outcome of a heuristic."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""The heuristic analyzer to check the email address of the package maintainers."""

import logging

from email_validator import EmailNotValidError, ValidatedEmail, validate_email

from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class FakeEmailAnalyzer(BaseHeuristicAnalyzer):
"""Analyze the email address of the package maintainers."""

def __init__(self) -> None:
super().__init__(
name="fake_email_analyzer",
heuristic=Heuristics.FAKE_EMAIL,
depends_on=None,
)

def is_valid_email(self, email: str) -> ValidatedEmail | None:
"""Check if the email format is valid and the domain has MX records.

Parameters
----------
email: str
The email address to check.

Returns
-------
ValidatedEmail | None
The validated email object if the email is valid, otherwise None.

Raises
------
HeuristicAnalyzerValueError
if the failure is due to DNS resolution.
"""
emailinfo = None
try:
emailinfo = validate_email(email, check_deliverability=True)
except EmailNotValidError as err:
err_message = f"Invalid email address: {email}. Error: {err}"
logger.warning(err_message)
return emailinfo

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.

Parameters
----------
pypi_package_json: PyPIPackageJsonAsset
The PyPI package JSON asset object.

Returns
-------
tuple[HeuristicResult, dict[str, JsonType]]:
The result and related information collected during the analysis.

Raises
------
HeuristicAnalyzerValueError
if the analysis fails.
"""
package_json = pypi_package_json.package_json
author_email = json_extract(package_json, ["info", "author_email"], str)
maintainer_email = json_extract(package_json, ["info", "maintainer_email"], str)

if not author_email and not maintainer_email:
return HeuristicResult.SKIP, {"message": "No author or maintainer email available."}

validated_emails: list[JsonType] = []
details = ["normalized", "local_part", "domain"]

for email in [author_email, maintainer_email]:
if email:
email_info = self.is_valid_email(email)
if not email_info:
return HeuristicResult.FAIL, {"email": email}

validated_emails.append({key: getattr(email_info, key) for key in details})

return HeuristicResult.PASS, {"validated_emails": validated_emails}
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/gradle.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Gradle class which inherits BaseBuildTool.
Expand Down Expand Up @@ -122,7 +122,7 @@ def get_dep_analyzer(self) -> CycloneDxGradle:
raise DependencyAnalyzerError("No default dependency analyzer is found.")
if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_gradle")):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.",
)

tool_name, tool_version = tuple(
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/maven.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Maven class which inherits BaseBuildTool.
Expand Down Expand Up @@ -116,7 +116,7 @@ def get_dep_analyzer(self) -> CycloneDxMaven:
raise DependencyAnalyzerError("No default dependency analyzer is found.")
if not DependencyAnalyzer.tool_valid(defaults.get("dependency.resolver", "dep_tool_maven")):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_maven')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_maven')} is not valid.",
)

tool_name, tool_version = tuple(
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/pip.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Pip class which inherits BaseBuildTool.
Expand Down Expand Up @@ -88,7 +88,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer:
tool_name = "cyclonedx_py"
if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.",
)
return CycloneDxPython(
resources_path=global_config.resources_path,
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/build_tool/poetry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the Poetry class which inherits BaseBuildTool.
Expand Down Expand Up @@ -126,7 +126,7 @@ def get_dep_analyzer(self) -> DependencyAnalyzer:
tool_name = "cyclonedx_py"
if not DependencyAnalyzer.tool_valid(f"{tool_name}:{cyclonedx_version}"):
raise DependencyAnalyzerError(
f"Dependency analyzer {defaults.get('dependency.resolver','dep_tool_gradle')} is not valid.",
f"Dependency analyzer {defaults.get('dependency.resolver', 'dep_tool_gradle')} is not valid.",
)
return CycloneDxPython(
resources_path=global_config.resources_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from macaron.malware_analyzer.pypi_heuristics.metadata.anomalous_version import AnomalousVersionAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer
Expand Down Expand Up @@ -358,6 +359,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
WheelAbsenceAnalyzer,
AnomalousVersionAnalyzer,
TyposquattingPresenceAnalyzer,
FakeEmailAnalyzer,
]

# name used to query the result of all problog rules, so it can be accessed outside the model.
Expand Down Expand Up @@ -425,13 +427,18 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
failed({Heuristics.ONE_RELEASE.value}),
failed({Heuristics.ANOMALOUS_VERSION.value}).

% Package released recently with the a maintainer email address that is not valid.
{Confidence.MEDIUM.value}::trigger(malware_medium_confidence_3) :-
quickUndetailed,
failed({Heuristics.FAKE_EMAIL.value}).
% ----- Evaluation -----

% Aggregate result
{problog_result_access} :- trigger(malware_high_confidence_1).
{problog_result_access} :- trigger(malware_high_confidence_2).
{problog_result_access} :- trigger(malware_high_confidence_3).
{problog_result_access} :- trigger(malware_high_confidence_4).
{problog_result_access} :- trigger(malware_medium_confidence_3).
{problog_result_access} :- trigger(malware_medium_confidence_2).
{problog_result_access} :- trigger(malware_medium_confidence_1).
query({problog_result_access}).
Expand Down
142 changes: 142 additions & 0 deletions tests/malware_analyzer/pypi/test_fake_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Tests for the FakeEmailAnalyzer heuristic."""


from collections.abc import Generator
from unittest.mock import MagicMock, patch

import pytest
from email_validator import EmailNotValidError

from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset


@pytest.fixture(name="analyzer")
def analyzer_() -> FakeEmailAnalyzer:
"""Pytest fixture to create a FakeEmailAnalyzer instance."""
return FakeEmailAnalyzer()


@pytest.fixture(name="pypi_package_json_asset_mock")
def pypi_package_json_asset_mock_fixture() -> MagicMock:
"""Pytest fixture for a mock PyPIPackageJsonAsset."""
mock_asset = MagicMock(spec=PyPIPackageJsonAsset)
mock_asset.package_json = {}
return mock_asset


@pytest.fixture(name="mock_validate_email")
def mock_validate_email_fixture() -> Generator[MagicMock]:
"""Patch validate_email and mock its behavior."""
with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.validate_email") as mock:
yield mock


def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test the analyzer skips if no author_email or maintainer_email is present."""
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.SKIP
assert info["message"] == "No author or maintainer email available."


def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test the analyzer skips if 'info' key is missing in PyPI data."""
pypi_package_json_asset_mock.package_json = {} # No 'info' key
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.SKIP
assert info["message"] == "No author or maintainer email available."


def test_analyze_fail_invalid_email(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock
) -> None:
"""Test analyzer fails for an invalid email format."""
invalid_email = "invalid-email"
pypi_package_json_asset_mock.package_json = {"info": {"author_email": invalid_email, "maintainer_email": None}}
mock_validate_email.side_effect = EmailNotValidError("Invalid email.")

result, info = analyzer.analyze(pypi_package_json_asset_mock)

assert result == HeuristicResult.FAIL
assert info == {"email": invalid_email}
mock_validate_email.assert_called_once_with(invalid_email, check_deliverability=True)


def test_analyze_pass_only_maintainer_email_valid(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock
) -> None:
"""Test analyzer passes when only maintainer_email is present and valid."""
email = "[email protected]"
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": email}}

mock_email_info = MagicMock()
mock_email_info.normalized = "[email protected]"
mock_email_info.local_part = "maintainer"
mock_email_info.domain = "example.net"
mock_validate_email.return_value = mock_email_info

result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.PASS
assert info["validated_emails"] == [
{"normalized": "[email protected]", "local_part": "maintainer", "domain": "example.net"}
]
mock_validate_email.assert_called_once_with(email, check_deliverability=True)


def test_analyze_pass_both_emails_valid(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_validate_email: MagicMock
) -> None:
"""Test the analyzer passes when both emails are present and valid."""

def side_effect(email: str, check_deliverability: bool) -> MagicMock: # pylint: disable=unused-argument
local_part, domain = email.split("@")
mock_email_info = MagicMock()
mock_email_info.normalized = email
mock_email_info.local_part = local_part
mock_email_info.domain = domain
return mock_email_info

mock_validate_email.side_effect = side_effect

pypi_package_json_asset_mock.package_json = {
"info": {"author_email": "[email protected]", "maintainer_email": "[email protected]"}
}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.PASS
assert mock_validate_email.call_count == 2

validated_emails = info.get("validated_emails")
assert isinstance(validated_emails, list)
assert len(validated_emails) == 2
assert {"normalized": "[email protected]", "local_part": "author", "domain": "example.com"} in validated_emails
assert {
"normalized": "[email protected]",
"local_part": "maintainer",
"domain": "example.net",
} in validated_emails


def test_is_valid_email_success(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None:
"""Test is_valid_email returns the validation object on success."""
mock_validated_email = MagicMock()
mock_validated_email.normalized = "[email protected]"
mock_validated_email.local_part = "test"
mock_validated_email.domain = "example.com"

mock_validate_email.return_value = mock_validated_email
result = analyzer.is_valid_email("[email protected]")
assert result == mock_validated_email
mock_validate_email.assert_called_once_with("[email protected]", check_deliverability=True)


def test_is_valid_email_failure(analyzer: FakeEmailAnalyzer, mock_validate_email: MagicMock) -> None:
"""Test is_valid_email returns None on failure."""
mock_validate_email.side_effect = EmailNotValidError("The email address is not valid.")
result = analyzer.is_valid_email("invalid-email")
assert result is None
mock_validate_email.assert_called_once_with("invalid-email", check_deliverability=True)
Loading