-
Notifications
You must be signed in to change notification settings - Fork 28
feat(heuristics): add SimilarProjectAnalyzer to detect structural similarity across packages from same maintainer #1089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -181,3 +181,4 @@ docs/_build | |
bin/ | ||
requirements.txt | ||
.macaron_env_file | ||
.DS_Store |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,233 @@ | ||
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""This analyzer checks if the package has a similar structure to other packages maintained by the same user.""" | ||
|
||
import hashlib | ||
import logging | ||
import tarfile | ||
import typing | ||
|
||
import requests | ||
from bs4 import BeautifulSoup | ||
|
||
from macaron.errors import HeuristicAnalyzerValueError | ||
from macaron.json_tools import JsonType | ||
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics | ||
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
|
||
class SimilarProjectAnalyzer(BaseHeuristicAnalyzer): | ||
"""Check whether the package has a similar structure to other packages maintained by the same user.""" | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
name="similar_project_analyzer", | ||
heuristic=Heuristics.SIMILAR_PROJECTS, | ||
depends_on=None, | ||
) | ||
|
||
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: | ||
"""Analyze the package. | ||
|
||
Parameters | ||
---------- | ||
pypi_package_json: PyPIPackageJsonAsset | ||
The PyPI package JSON asset object. | ||
|
||
Returns | ||
------- | ||
tuple[HeuristicResult, dict[str, JsonType]]: | ||
The result and related information collected during the analysis. | ||
|
||
Raises | ||
------ | ||
HeuristicAnalyzerValueError | ||
if the analysis fails. | ||
""" | ||
package_name = pypi_package_json.component_name | ||
target_hash = self.get_structure_hash(package_name) | ||
if target_hash is None: | ||
return HeuristicResult.SKIP, { | ||
"message": f"the package {package_name} does not have a sdist.", | ||
} | ||
|
||
similar_packages = self.get_packages(package_name) | ||
if not similar_packages: | ||
return HeuristicResult.SKIP, { | ||
"message": f"the maintainers of {package_name} do not maintain any other packages.", | ||
} | ||
|
||
for package in similar_packages: | ||
package_hash = self.get_structure_hash(package) | ||
if package_hash is None: | ||
logger.info("Package does not have a sdist.") | ||
continue | ||
if package_hash == target_hash: | ||
return HeuristicResult.FAIL, { | ||
"similar_package": package, | ||
} | ||
return HeuristicResult.PASS, {} | ||
|
||
def get_maintainers(self, package_name: str) -> list[str]: | ||
"""Get all maintainers of a package. | ||
|
||
Parameters | ||
---------- | ||
package_name (str): The name of the package. | ||
|
||
Returns | ||
------- | ||
list[str]: A list of maintainers. | ||
""" | ||
url = f"https://pypi.org/project/{package_name}/" | ||
response = requests.get(url, timeout=10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you have a look at |
||
if response.status_code != 200: | ||
return [] | ||
|
||
soup = BeautifulSoup(response.text, "html.parser") | ||
gravatar_spans = soup.find_all("span", class_="sidebar-section__user-gravatar-text") | ||
maintainers = [span.get_text().strip() for span in gravatar_spans] | ||
|
||
return maintainers | ||
|
||
def get_packages_by_user(self, username: str) -> list[str]: | ||
"""Get all packages by a user. | ||
|
||
Parameters | ||
---------- | ||
username (str): The username of the user. | ||
|
||
Returns | ||
------- | ||
list[str]: A list of package names. | ||
""" | ||
url = f"https://pypi.org/user/{username}/" | ||
response = requests.get(url, timeout=10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar here with using |
||
if response.status_code != 200: | ||
return [] | ||
|
||
soup = BeautifulSoup(response.text, "html.parser") | ||
headers = soup.find_all("h3", class_="package-snippet__title") | ||
packages = [header.get_text().strip() for header in headers] | ||
return packages | ||
|
||
def get_packages(self, package_name: str) -> list[str]: | ||
"""Get packages that are maintained by this package's maintainers. | ||
|
||
Parameters | ||
---------- | ||
package_name (str): The name of the package. | ||
|
||
Returns | ||
------- | ||
list[str]: A list of similar projects. | ||
""" | ||
similar_projects = [] | ||
maintainers = self.get_maintainers(package_name) | ||
for user in maintainers: | ||
user_packages = self.get_packages_by_user(user) | ||
similar_projects.extend(user_packages) | ||
# Remove the target package from the list of similar projects. | ||
similar_projects_set = set(similar_projects) | ||
similar_projects_set.discard(package_name) | ||
return list(similar_projects_set) | ||
|
||
def fetch_sdist_url(self, package_name: str, version: str | None = None) -> str: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it would be possible to create a |
||
"""Fetch the sdist URL for a package. | ||
|
||
Parameters | ||
---------- | ||
package_name (str): The name of the package. | ||
version (str): The version of the package. If None, the latest version will be used. | ||
|
||
Returns | ||
------- | ||
str: The sdist URL, or an empty string if not found. | ||
""" | ||
url = f"https://pypi.org/pypi/{package_name}/json" | ||
try: | ||
response = requests.get(url, timeout=10) | ||
response.raise_for_status() | ||
data = response.json() | ||
except requests.exceptions.RequestException as err: | ||
err_message = f"Failed to fetch PyPI JSON for {package_name}: {err}" | ||
raise HeuristicAnalyzerValueError(err_message) from err | ||
except ValueError as err: | ||
err_message = f"Failed to decode PyPI JSON for {package_name}: {err}" | ||
raise HeuristicAnalyzerValueError(err_message) from err | ||
|
||
actual_version: str | ||
if version is None: | ||
try: | ||
actual_version = typing.cast(str, data["info"]["version"]) | ||
except (KeyError, TypeError) as err: | ||
err_message = f"Failed to get version for {package_name}: {err}" | ||
raise HeuristicAnalyzerValueError(err_message) from err | ||
else: | ||
actual_version = version | ||
|
||
try: | ||
for release_file in data.get("releases", {}).get(actual_version, []): | ||
if isinstance(release_file, dict) and release_file.get("packagetype") == "sdist": | ||
sdist_url = release_file.get("url") | ||
if isinstance(sdist_url, str): | ||
return sdist_url | ||
except Exception as err: | ||
err_message = f"Failed to parse releases for {package_name} version {actual_version}: {err}" | ||
raise HeuristicAnalyzerValueError(err_message) from err | ||
|
||
return "" | ||
|
||
def get_structure_hash(self, package_name: str) -> str | None: | ||
"""Calculate a hash based on the project's file structure. | ||
|
||
Parameters | ||
---------- | ||
package_name (str): The name of the package. | ||
|
||
Returns | ||
------- | ||
str: The structure hash. | ||
|
||
Raises | ||
------ | ||
ValueError: If the sdist URL cannot be fetched or the package structure cannot be hashed. | ||
""" | ||
sdist_url = self.fetch_sdist_url(package_name) | ||
if not sdist_url: | ||
return None | ||
|
||
try: | ||
response = requests.get(sdist_url, stream=True, timeout=10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar here with using |
||
response.raise_for_status() | ||
raw_file_obj: typing.IO[bytes] = typing.cast(typing.IO[bytes], response.raw) | ||
|
||
with tarfile.open(fileobj=raw_file_obj, mode="r:gz") as file_archive: | ||
paths = [] | ||
for member in file_archive: | ||
if not member.isdir(): | ||
# remove top‑level dir. | ||
parts = member.name.split("/", 1) | ||
normalized = parts[1] if len(parts) > 1 else parts[0] | ||
# replace the pkg name. | ||
normalized = normalized.replace(package_name, "<PKG>") | ||
paths.append(normalized) | ||
paths.sort() | ||
structure_hash_calculator = hashlib.sha256() | ||
for path in paths: | ||
structure_hash_calculator.update(path.encode("utf-8")) | ||
structure_hash_calculator.update(b"\n") | ||
return structure_hash_calculator.hexdigest() | ||
except requests.exceptions.RequestException as err: | ||
err_message = f"Failed to download sdist for {package_name} from {sdist_url}: {err}" | ||
raise HeuristicAnalyzerValueError(err_message) from err | ||
except tarfile.TarError as err: | ||
err_message = f"Failed to process tarfile for {package_name} from {sdist_url}: {err}" | ||
raise HeuristicAnalyzerValueError(err_message) from err | ||
except Exception as err: | ||
err_message = f"Failed to get structure hash for {package_name}: {err}" | ||
raise HeuristicAnalyzerValueError(err_message) from err |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.metadata.similar_projects import SimilarProjectAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.metadata.unchanged_release import UnchangedReleaseAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence import WheelAbsenceAnalyzer | ||
|
@@ -332,6 +333,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: | |
SuspiciousSetupAnalyzer, | ||
WheelAbsenceAnalyzer, | ||
AnomalousVersionAnalyzer, | ||
SimilarProjectAnalyzer, | ||
] | ||
|
||
# name used to query the result of all problog rules, so it can be accessed outside the model. | ||
|
@@ -381,6 +383,10 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: | |
failed({Heuristics.CLOSER_RELEASE_JOIN_DATE.value}), | ||
forceSetup. | ||
|
||
% Package released that is similar to other packages maintained by the same maintainer. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you walk me through the rationale of why we should combine |
||
{Confidence.HIGH.value}::trigger(malware_high_confidence_4) :- | ||
quickUndetailed, forceSetup, failed({Heuristics.SIMILAR_PROJECTS.value}). | ||
|
||
% Package released recently with little detail, with multiple releases as a trust marker, but frequent and with | ||
% the same code. | ||
{Confidence.MEDIUM.value}::trigger(malware_medium_confidence_1) :- | ||
|
@@ -401,6 +407,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: | |
{problog_result_access} :- trigger(malware_high_confidence_1). | ||
{problog_result_access} :- trigger(malware_high_confidence_2). | ||
{problog_result_access} :- trigger(malware_high_confidence_3). | ||
{problog_result_access} :- trigger(malware_high_confidence_4). | ||
{problog_result_access} :- trigger(malware_medium_confidence_2). | ||
{problog_result_access} :- trigger(malware_medium_confidence_1). | ||
query({problog_result_access}). | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this functionality is offered by
PyPIRegistry.get_maintainers_of_package
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. That is what I used first, but @behnazh-w told me that get_maintainers_of_package is not working anymore because PYPI blocks it so I rewrite it there .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this code can obtain the maintainer info, rather than adding a new function, please update the
PyPIRegistry.get_maintainers_of_package
function so other heuristics can benefit from it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I change it here, or create a new PR for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either works, although a separate PR is preferable.