Skip to content

chore: semgrep test fixes and minor formatting fixes #1098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/macaron/database/db_custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class RFC3339DateTime(TypeDecorator): # pylint: disable=W0223
https://docs.sqlalchemy.org/en/20/dialects/sqlite.html#sqlalchemy.dialects.sqlite.DATETIME
"""

# It is stored in the database as a string
# It is stored in the database as a string.
impl = String

# To prevent Sphinx from rendering the docstrings for `cache_ok`, make this docstring private.
Expand Down
2 changes: 1 addition & 1 deletion src/macaron/database/table_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

logger: logging.Logger = logging.getLogger(__name__)

# TODO: Use UUIDs as primary keys rather than incremental
# TODO: Use UUIDs as primary keys rather than incremental.

################################################################################
# Analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import json
import logging
import os
import subprocess # nosec
import subprocess # nosec B404
import tempfile

import yaml
Expand Down Expand Up @@ -120,7 +120,7 @@ def _load_defaults(self, resources_path: str) -> tuple[str, str | None, set[str]

semgrep_commands: list[str] = ["semgrep", "scan", "--validate", "--oss-only", "--config", custom_rule_path]
try:
process = subprocess.run(semgrep_commands, check=True, capture_output=True) # nosec
process = subprocess.run(semgrep_commands, check=True, capture_output=True) # nosec B603
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as semgrep_error:
error_msg = (
f"Unable to run semgrep validation on {custom_rule_path} with arguments "
Expand Down Expand Up @@ -185,8 +185,8 @@ def _extract_rule_ids(self, path: str, target_files: set[str]) -> set[str]:
If any Semgrep rule file could not be safely loaded, or if their format was not in the expected Semgrep
format, or if there were any files in 'target_files' not found when searching in 'path'.
"""
# We keep a record of any file paths we coulnd't find to provide a more useful error message, rather than raising
# an error on the first missing file we see.
# We keep a record of any file paths we couldn't find to provide a more useful error message, rather than
# raising an error on the first missing file we see.
missing_files: list[str] = []
target_file_paths: list[str] = []
rule_ids: set[str] = set()
Expand All @@ -211,7 +211,7 @@ def _extract_rule_ids(self, path: str, target_files: set[str]) -> set[str]:
logger.debug(error_msg)
raise ConfigurationError(error_msg) from yaml_error

# should be a top-level key "rules", and then a list of rules (dictionaries) with "id" entries
# Should be a top-level key "rules", and then a list of rules (dictionaries) with "id" entries.
try:
for semgrep_rule in semgrep_ruleset["rules"]:
rule_ids.add(semgrep_rule["id"])
Expand Down Expand Up @@ -243,11 +243,11 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
if there is no source code available.
"""
analysis_result: dict = {}
# since we have to run them anyway, return disabled rule findings for debug information
# Since we have to run them anyway, return disabled rule findings for debug information.
disabled_results: dict = {}
# Here, we disable 'nosemgrep' ignoring so that this is not an evasion method of our scan (i.e. malware includes
# 'nosemgrep' comments to prevent our scan detecting those code lines). Read more about the 'nosemgrep' feature
# here: https://semgrep.dev/docs/ignoring-files-folders-code
# here: https://semgrep.dev/docs/ignoring-files-folders-code.
semgrep_commands: list[str] = ["semgrep", "scan", "--oss-only", "--disable-nosem"]
result: HeuristicResult = HeuristicResult.PASS

Expand All @@ -266,7 +266,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
semgrep_commands.append(f"--json-output={output_json_file.name}")
logger.debug("executing: %s.", semgrep_commands)
try:
process = subprocess.run(semgrep_commands, check=True, capture_output=True) # nosec
process = subprocess.run(semgrep_commands, check=True, capture_output=True) # nosec B603
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as semgrep_error:
error_msg = (
f"Unable to run semgrep on {source_code_path} with arguments {semgrep_commands}: {semgrep_error}"
Expand Down Expand Up @@ -298,6 +298,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
file = json_extract(finding, ["path"], str)
if not rule_id or not file:
continue
rule_id = rule_id.split(".")[-1]

file = os.path.relpath(file, os.path.dirname(source_code_path))
start = json_extract(finding, ["start", "line"], int)
Expand All @@ -310,7 +311,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
# final element in that path, so we use that to match our rule IDs.
# e.g. rule_id = src.macaron.resources.pypi_malware_rules.obfuscation_decode-and-execute, which comes from
# the rule ID 'obfuscation_decode-and-execute' inside 'obfuscation.yaml'.
if rule_id.split(".")[-1] in self.disabled_rule_ids:
if rule_id in self.disabled_rule_ids:
if rule_id not in disabled_results:
disabled_results[rule_id] = {"message": message, "detections": []}
disabled_results[rule_id]["detections"].append({"file": file, "start": start, "end": end})
Expand All @@ -320,7 +321,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
analysis_result[rule_id] = {"message": message, "detections": []}
analysis_result[rule_id]["detections"].append({"file": file, "start": start, "end": end})

# some semgrep rules were triggered, even after removing disabled ones
# Some semgrep rules were triggered, even after removing disabled ones.
if analysis_result:
result = HeuristicResult.FAIL

Expand Down
4 changes: 2 additions & 2 deletions src/macaron/parsers/github_workflow_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
# pylint: skip-file
# flake8: noqa
Expand All @@ -7,7 +7,7 @@
# generated by datamodel-codegen:
# filename: https://raw.githubusercontent.com/SchemaStore/schemastore/a1689388470d1997f2e5ebd8b430e99587b8d354/src/schemas/json/github-workflow.json
# timestamp: 2024-05-10T03:46:22+00:00
# Some manual modifications made, noted as MODIFIED in comments below
# Some manual modifications made, noted as MODIFIED in comments below.

from __future__ import annotations

Expand Down
4 changes: 2 additions & 2 deletions src/macaron/parsers/pomparser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the parser for POM files."""
import logging
from xml.etree.ElementTree import Element # nosec
from xml.etree.ElementTree import Element # nosec B405

import defusedxml.ElementTree
from defusedxml.ElementTree import fromstring
Expand Down
2 changes: 1 addition & 1 deletion src/macaron/repo_finder/repo_finder_java.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import re
import urllib.parse
from xml.etree.ElementTree import Element # nosec
from xml.etree.ElementTree import Element # nosec B405

from packageurl import PackageURL

Expand Down
4 changes: 2 additions & 2 deletions tests/artifact/test_local_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ def is_case_sensitive_filesystem() -> bool:

try:
os.mkdir(upper)
# if upper is not treated the same as lower -> case sensitive
# If upper is not treated the same as lower -> case-sensitive.
return True
except FileExistsError:
# upper is treated the same as lower -> case insensitive
# Upper is treated the same as lower -> case-insensitive.
return False


Expand Down
4 changes: 2 additions & 2 deletions tests/macaron_testcase.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2023, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains the MacaronTestCase class for setup/teardown of test cases."""
Expand All @@ -11,7 +11,7 @@
from macaron.config.defaults import create_defaults, defaults, load_defaults


# TODO: add fixture in the future
# TODO: add fixture in the future.
class MacaronTestCase(TestCase):
"""The TestCase class for Macaron."""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"enabled_sourcecode_rule_findings": {
"src.macaron.resources.pypi_malware_rules.exfiltration_remote-exfiltration": {
"exfiltration_remote-exfiltration": {
"message": "Detected exfiltration of sensitive data to a remote endpoint",
"detections": [
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"enabled_sourcecode_rule_findings": {
"src.macaron.resources.pypi_malware_rules.obfuscation_decode-and-execute": {
"obfuscation_decode-and-execute": {
"message": "Detected the flow of a decoded primitive value to a remote endpoint, process, code evaluation, or file write",
"detections": [
{
Expand Down Expand Up @@ -35,7 +35,7 @@
}
]
},
"src.macaron.resources.pypi_malware_rules.obfuscation_inline-imports": {
"obfuscation_inline-imports": {
"message": "Found an instance of a suspicious API in a hardcoded inline import",
"detections": [
{
Expand Down Expand Up @@ -105,7 +105,7 @@
}
]
},
"src.macaron.resources.pypi_malware_rules.obfuscation_obfuscation-tools": {
"obfuscation_obfuscation-tools": {
"message": "Found an indicator of the use of a python code obfuscation tool",
"detections": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_nonexistent_rule_path(mock_defaults: MagicMock) -> None:
@patch("macaron.malware_analyzer.pypi_heuristics.sourcecode.pypi_sourcecode_analyzer.defaults")
def test_invalid_custom_rules(mock_defaults: MagicMock) -> None:
"""Test for when the provided file is not a valid semgrep rule, so error,"""
# use this file as an invalid semgrep rule as it is most definitely not a semgrep rule, and does exist
# Use this file as an invalid semgrep rule as it is most definitely not a semgrep rule, and does exist.
defaults = {
"custom_semgrep_rules_path": os.path.abspath(__file__),
}
Expand Down
8 changes: 4 additions & 4 deletions tests/malware_analyzer/pypi/test_wheel_absence.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def test_analyze_no_information(pypi_package_json: MagicMock) -> None:


# Note: to patch a function, the way it is imported matters.
# e.g. if it is imported like this: import os; os.listdir() then you patch os.listdir
# if it is imported like this: from os import listdir; listdir() then you patch <module>.listdir
# E.g. if it is imported like this: import os; os.listdir() then you patch os.listdir.
# If it is imported like this: from os import listdir; listdir() then you patch <module>.listdir.
@patch("macaron.malware_analyzer.pypi_heuristics.metadata.wheel_absence.send_head_http_raw")
def test_analyze_tar_present(mock_send_head_http_raw: MagicMock, pypi_package_json: MagicMock) -> None:
"""Test for when only .tar.gz is present, so failed"""
Expand Down Expand Up @@ -72,7 +72,7 @@ def test_analyze_tar_present(mock_send_head_http_raw: MagicMock, pypi_package_js
pypi_package_json.pypi_registry.inspector_url_scheme = "https"
pypi_package_json.pypi_registry.inspector_url_netloc = "inspector.pypi.io"

mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes
mock_send_head_http_raw.return_value = MagicMock() # Assume valid URL for testing purposes.

expected_detail_info = {
"inspector_links": [inspector_link_expected],
Expand Down Expand Up @@ -131,7 +131,7 @@ def test_analyze_whl_present(mock_send_head_http_raw: MagicMock, pypi_package_js
pypi_package_json.package_json = {"info": {"name": "ttttttttest_nester"}}
pypi_package_json.pypi_registry.inspector_url_scheme = "https"
pypi_package_json.pypi_registry.inspector_url_netloc = "inspector.pypi.io"
mock_send_head_http_raw.return_value = MagicMock() # assume valid URL for testing purposes
mock_send_head_http_raw.return_value = MagicMock() # Assume valid URL for testing purposes.

expected_detail_info = {
"inspector_links": [inspector_link_expected],
Expand Down
4 changes: 2 additions & 2 deletions tests/parsers/yaml/test_yaml_loader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2022, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module test the yaml loader functions."""
Expand Down Expand Up @@ -39,7 +39,7 @@ def test_load_yaml_content(self) -> None:
def test_validate_yaml_data(self) -> None:
"""Test the validate yaml data method."""
# We are not testing the behavior of yamale methods
# so the schema and data can be empty
# so the schema and data can be empty.
mock_schema = Schema({})
mock_data: list = []

Expand Down
2 changes: 1 addition & 1 deletion tests/repo_finder/test_commit_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def test_version_to_tag_matching(_data: DataObject) -> None: # noqa: PT019
This test verifies that a similar version and tag can be matched by the commit finder.
"""
# pylint: disable=protected-access
# Generate the version
# Generate the version.
version = _data.draw(hypothesis.strategies.from_regex(input_pattern, fullmatch=True))
if not version:
return
Expand Down
4 changes: 2 additions & 2 deletions tests/slsa_analyzer/build_tool/test_go.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module tests the Go build functions."""
Expand All @@ -24,7 +24,7 @@
def test_get_build_dirs(snapshot: list, tmp_path: Path, go_tool: Go, folder: str, file: str) -> None:
"""Test discovering build directories."""
# Since there's issues having 2 go.mod files in the same project, we make
# it on the fly for this test
# it on the fly for this test.
proj_dir = tmp_path.joinpath(folder)
proj_dir.mkdir(parents=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_detect_malicious_metadata(
[
pytest.param(
{
# similar to rule ID malware_high_confidence_1, but SUSPICIOUS_SETUP is skipped since the file does not
# Similar to rule ID malware_high_confidence_1, but SUSPICIOUS_SETUP is skipped since the file does not
# exist, so the rule should not trigger.
Heuristics.EMPTY_PROJECT_LINK: HeuristicResult.FAIL,
Heuristics.SOURCE_CODE_REPO: HeuristicResult.SKIP,
Expand Down
6 changes: 3 additions & 3 deletions tests/slsa_analyzer/mock_git_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""
Expand Down Expand Up @@ -44,7 +44,7 @@ def initiate_repo(repo_path: str | os.PathLike, git_init_options: dict | None =
git_wrapper = Git(repo_path)
return git_wrapper
except GitError:
# No git repo at repo_path
# No git repo at repo_path.
git.Repo.init(repo_path, **git_init_options)
return Git(repo_path)

Expand All @@ -65,7 +65,7 @@ def commit_files(git_wrapper: Git, file_names: list) -> bool:
True if succeed else False.
"""
try:
# Store the index object as recommended by the documentation
# Store the index object as recommended by the documentation.
current_index = git_wrapper.repo.index
current_index.add(file_names)
current_index.commit(f"Add files: {str(file_names)}")
Expand Down
22 changes: 11 additions & 11 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""
Expand Down Expand Up @@ -35,46 +35,46 @@ def test_construct_query(self) -> None:
assert query == r"q=Some+simple+query+language%3Ajava&sort=stars&order=desc"

# TODO: the copy_file_bulk method is essential, however, this test
# need further works.
# needs further work.
def test_copy_file_bulk(self) -> None:
"""
Test the copy file bulk method
"""
src_path = "/src/path"
target_path = "/target/path"

# Testing making dir to store files
# Testing making dir to store files.
with patch("macaron.util.copy_file") as mock_copy_file:
with patch("os.makedirs") as mock_make_dirs:
# Empty file list, it does nothing
# Empty file list, it does nothing.
assert util.copy_file_bulk([], src_path, target_path)
mock_copy_file.assert_not_called()
mock_make_dirs.assert_not_called()

with patch("os.makedirs") as mock_make_dirs:
# Test creating the dirs for storing the file
# Test creating the dirs for storing the file.
assert util.copy_file_bulk(["foo/file"], src_path, target_path)
mock_make_dirs.assert_called_with("/target/path/foo", exist_ok=True)

# Testing copy behaviors
# Testing copy behaviors.
with patch("os.makedirs") as mock_make_dirs:
# Test ignoring existed files
# Test ignoring existed files.
with patch("os.path.exists", return_value=True):
with patch("macaron.util.copy_file") as mock_copy_file:
assert util.copy_file_bulk(["file"], src_path, target_path)
mock_copy_file.assert_not_called()

# Files not existed, perform the copy operation
# Files do not exist, perform the copy operation.
with patch("os.path.exists", return_value=False):
# Test copying file successful
# Test copying file successful.
with patch("macaron.util.copy_file", return_value=True) as mock_copy_file:
assert util.copy_file_bulk(["file"], src_path, target_path)

# Test copying file unsuccessful
# Test copying file unsuccessful.
with patch("macaron.util.copy_file", return_value=False) as mock_copy_file:
assert not util.copy_file_bulk(["file"], src_path, target_path)

# Test copying multiple files
# Test copying multiple files.
with patch("macaron.util.copy_file", return_value=True) as mock_copy_file:
assert util.copy_file_bulk(["foo/file1", "foo/file2"], src_path, target_path)
mock_copy_file.assert_has_calls(
Expand Down
4 changes: 2 additions & 2 deletions tests/vsa/compare_vsa.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Script to compare a generated VSA with an expected payload."""
Expand Down Expand Up @@ -139,7 +139,7 @@ def compare_list(
if len(result) != len(expected):
log_err(f"Expected field '{name}' of length {len(result)} in result to have length {len(expected)}")
log_diff(name, result, expected)
# Nothing else to compare
# Nothing else to compare.
return False

equal = True
Expand Down
Loading