-
Notifications
You must be signed in to change notification settings - Fork 28
feat(heuristics): add Fake Email analyzer to validate maintainer email domain #1106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -56,6 +56,11 @@ When a heuristic fails, with `HeuristicResult.FAIL`, then that is an indicator b | |||||
- **Description**: Checks if the package name is suspiciously similar to any package name in a predefined list of popular packages. The similarity check incorporates the Jaro-Winkler distance and considers keyboard layout proximity to identify potential typosquatting. | ||||||
- **Rule**: Return `HeuristicResult.FAIL` if the similarity ratio between the package name and any popular package name meets or exceeds a defined threshold; otherwise, return `HeuristicResult.PASS`. | ||||||
- **Dependency**: None. | ||||||
|
||||||
11. **Fake Email** | ||||||
- **Description**: Checks if the package maintainer or author has a suspicious or invalid email . | ||||||
- **Rule**: Return `HeuristicResult.FAIL` if the email format is invalid or the email domain has no MX records ; otherwise, return `HeuristicResult.PASS`. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
- **Dependency**: None. | ||||||
### Source Code Analysis with Semgrep | ||||||
**PyPI Source Code Analyzer** | ||||||
- **Description**: Uses Semgrep, with default rules written in `src/macaron/resources/pypi_malware_rules` and custom rules available by supplying a path to `custom_semgrep_rules` in `defaults.ini`, to scan the package `.tar` source code. | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""The heuristic analyzer to check the email address of the package maintainers.""" | ||
|
||
import logging | ||
import re | ||
|
||
import dns.resolver as dns_resolver | ||
|
||
from macaron.errors import HeuristicAnalyzerValueError | ||
from macaron.json_tools import JsonType | ||
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics | ||
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
|
||
class FakeEmailAnalyzer(BaseHeuristicAnalyzer): | ||
"""Analyze the email address of the package maintainers.""" | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
name="fake_email_analyzer", | ||
heuristic=Heuristics.FAKE_EMAIL, | ||
depends_on=None, | ||
) | ||
|
||
def is_valid_email(self, email: str) -> bool: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validating email addresses is a complex task. What we have here is more like a sanity check, verifying that the address is vaguely of the right format. That may be enough for the purpose of this check, in which case this method should be renamed and re-documented to make that clear. Alternatively, if we do really want to ensure that email addresses are valid, this method will need to be expanded considerably. @behnazh-w There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As part of the above, I think the regex checking and dns resolution steps should be split into separate functions. This could also simplify the related tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another alternative is to use the Python library |
||
"""Check if the email format is valid and the domain has MX records. | ||
|
||
Parameters | ||
---------- | ||
email: str | ||
The email address to check. | ||
|
||
Returns | ||
------- | ||
bool: | ||
``True`` if the email address is valid, ``False`` otherwise. | ||
|
||
Raises | ||
------ | ||
HeuristicAnalyzerValueError | ||
if the failure is due to DNS resolution. | ||
""" | ||
if not re.match(r"[^@]+@[^@]+\.[^@]+", email): | ||
return False | ||
|
||
domain = email.split("@")[1] | ||
try: | ||
records = dns_resolver.resolve(domain, "MX") | ||
if not records: | ||
return False | ||
except Exception as err: | ||
err_message = f"Failed to resolve domain {domain}: {err}" | ||
raise HeuristicAnalyzerValueError(err_message) from err | ||
return True | ||
|
||
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: | ||
"""Analyze the package. | ||
|
||
Parameters | ||
---------- | ||
pypi_package_json: PyPIPackageJsonAsset | ||
The PyPI package JSON asset object. | ||
|
||
Returns | ||
------- | ||
tuple[HeuristicResult, dict[str, JsonType]]: | ||
The result and related information collected during the analysis. | ||
|
||
Raises | ||
------ | ||
HeuristicAnalyzerValueError | ||
if the analysis fails. | ||
""" | ||
data = pypi_package_json.package_json | ||
author_email = data.get("info", {}).get("author_email", None) | ||
maintainer_email = data.get("info", {}).get("maintainer_email", None) | ||
if maintainer_email is None and author_email is None: | ||
message = "No maintainers are available" | ||
return HeuristicResult.SKIP, {"message": message} | ||
|
||
if author_email is not None and not self.is_valid_email(author_email): | ||
return HeuristicResult.FAIL, {"email": author_email} | ||
if maintainer_email is not None and not self.is_valid_email(maintainer_email): | ||
return HeuristicResult.FAIL, {"email": maintainer_email} | ||
|
||
return HeuristicResult.PASS, {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""Tests for the FakeEmailAnalyzer heuristic.""" | ||
|
||
|
||
from collections.abc import Generator | ||
from unittest.mock import MagicMock, patch | ||
|
||
import pytest | ||
|
||
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult | ||
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer | ||
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset | ||
|
||
|
||
@pytest.fixture(name="analyzer") | ||
def analyzer_fixture() -> FakeEmailAnalyzer: | ||
"""Pytest fixture to create a FakeEmailAnalyzer instance.""" | ||
return FakeEmailAnalyzer() | ||
|
||
|
||
@pytest.fixture(name="pypi_package_json_asset_mock") | ||
def pypi_package_json_asset_mock_fixture() -> MagicMock: | ||
"""Pytest fixture for a mock PyPIPackageJsonAsset.""" | ||
mock_asset = MagicMock(spec=PyPIPackageJsonAsset) | ||
# Default to successful download, tests can override | ||
mock_asset.download = MagicMock(return_value=True) | ||
# package_json should be set by each test to simulate different PyPI responses | ||
mock_asset.package_json = {} | ||
return mock_asset | ||
|
||
|
||
@pytest.fixture(name="mock_dns_resolve") | ||
def mock_dns_resolve_fixture() -> Generator[MagicMock]: | ||
"""General purpose mock for dns.resolver.resolve. | ||
|
||
Patches where dns_resolver is imported in the module under test. | ||
""" | ||
with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.dns_resolver.resolve") as mock_resolve: | ||
# Default behavior: simulate successful MX record lookup. | ||
mock_mx_record = MagicMock() | ||
mock_mx_record.exchange = "mail.default-domain.com" | ||
mock_resolve.return_value = [mock_mx_record] | ||
yield mock_resolve | ||
|
||
|
||
# Tests for the analyze method | ||
def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: | ||
"""Test the analyzer skips if no author_email or maintainer_email is present.""" | ||
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.SKIP | ||
assert info["message"] == "No maintainers are available" | ||
|
||
|
||
def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: | ||
"""Test the analyzer skips if 'info' key is missing in PyPI data.""" | ||
pypi_package_json_asset_mock.package_json = {} # No 'info' key | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.SKIP | ||
assert info["message"] == "No maintainers are available" | ||
|
||
|
||
def test_analyze_fail_empty_author_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: | ||
"""Test analyzer fails for empty author_email string (maintainer_email is None).""" | ||
pypi_package_json_asset_mock.package_json = {"info": {"author_email": "", "maintainer_email": None}} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.FAIL | ||
assert info["email"] == "" | ||
|
||
|
||
def test_analyze_pass_only_maintainer_email_valid( | ||
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock | ||
) -> None: | ||
"""Test analyzer passes when only maintainer_email is present and valid.""" | ||
mock_mx_record = MagicMock() | ||
mock_mx_record.exchange = "mail.example.net" | ||
mock_dns_resolve.return_value = [mock_mx_record] | ||
|
||
pypi_package_json_asset_mock.package_json = { | ||
"info": {"author_email": None, "maintainer_email": "[email protected]"} | ||
} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.PASS | ||
assert info == {} | ||
mock_dns_resolve.assert_called_once_with("example.net", "MX") | ||
|
||
|
||
def test_analyze_pass_both_emails_valid( | ||
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock | ||
) -> None: | ||
"""Test the analyzer passes when both emails are present and valid.""" | ||
|
||
def side_effect_dns_resolve(domain: str, record_type: str = "MX") -> list[MagicMock]: | ||
mock_mx = MagicMock() | ||
domains = { | ||
"MX": {"example.com", "example.net"}, | ||
} | ||
if domain not in domains.get(record_type, set()): | ||
pytest.fail(f"Unexpected domain for DNS resolve: {domain}") | ||
mock_mx.exchange = f"mail.{domain}" | ||
return [mock_mx] | ||
|
||
mock_dns_resolve.side_effect = side_effect_dns_resolve | ||
|
||
pypi_package_json_asset_mock.package_json = { | ||
"info": {"author_email": "[email protected]", "maintainer_email": "[email protected]"} | ||
} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.PASS | ||
assert info == {} | ||
assert mock_dns_resolve.call_count == 2 | ||
mock_dns_resolve.assert_any_call("example.com", "MX") | ||
mock_dns_resolve.assert_any_call("example.net", "MX") | ||
|
||
|
||
def test_analyze_fail_author_email_invalid_format( | ||
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock | ||
) -> None: | ||
"""Test analyzer fails when author_email has an invalid format.""" | ||
pypi_package_json_asset_mock.package_json = { | ||
"info": {"author_email": "bad_email_format", "maintainer_email": "[email protected]"} | ||
} | ||
result, info = analyzer.analyze(pypi_package_json_asset_mock) | ||
assert result == HeuristicResult.FAIL | ||
assert info["email"] == "bad_email_format" | ||
mock_dns_resolve.assert_not_called() # Regex check fails before DNS lookup | ||
|
||
|
||
# Tests for the is_valid_email method | ||
def test_is_valid_email_valid_email_with_mx(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: | ||
"""Test is_valid_email returns True for a valid email with MX records.""" | ||
mock_mx_record = MagicMock() | ||
mock_mx_record.exchange = "mail.example.com" | ||
mock_dns_resolve.return_value = [mock_mx_record] | ||
assert analyzer.is_valid_email("[email protected]") is True | ||
mock_dns_resolve.assert_called_once_with("example.com", "MX") | ||
|
||
|
||
def test_is_valid_email_invalid_format(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: | ||
"""Test is_valid_email method with various invalid email formats.""" | ||
assert not analyzer.is_valid_email("not_an_email") | ||
assert not analyzer.is_valid_email("test@") | ||
assert not analyzer.is_valid_email("@example.com") | ||
assert not analyzer.is_valid_email("test@example") | ||
assert not analyzer.is_valid_email("") | ||
mock_dns_resolve.assert_not_called() | ||
|
||
|
||
def test_is_valid_email_no_mx_records_returned(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: | ||
"""Test is_valid_email returns False if DNS resolve returns no MX records.""" | ||
mock_dns_resolve.return_value = [] # Simulate no MX records found | ||
assert analyzer.is_valid_email("[email protected]") is False | ||
mock_dns_resolve.assert_called_once_with("no-mx-domain.com", "MX") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.