Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions api_app/analyzers_manager/file_analyzers/macho_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

import logging
from typing import Any, Dict

try:
import machofile
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok but why this should happen? remove this and everything related to handling the missing library

except ImportError:
machofile = None

from api_app.analyzers_manager.classes import FileAnalyzer

logger = logging.getLogger(__name__)


def _safe_decode(value: Any) -> str:
"""Helper to safely decode bytes to string."""
if isinstance(value, bytes):
return value.decode("utf-8", "ignore")
return str(value)


class MachoInfo(FileAnalyzer):
"""
Analyzer for Mach-O binary files (macOS/iOS executables).
Uses the machofile library to parse and extract information.

API Validation Strategy:
This analyzer uses defensive programming with hasattr() checks because:
- The machofile library API varies between single-arch and Universal (FAT) binaries
- Different Mach-O file types may expose different methods/properties
- The behavior is validated through tests and internal documentation

Library reference: https://github.com/pstirparo/machofile
"""

@classmethod
def update(cls) -> bool:
return False

def _parse_macho(self):
"""Attempts to parse the file as Single or Universal Mach-O."""
try:
macho = machofile.MachO(self.filepath)
if hasattr(macho, "parse"):
macho.parse()
return macho
except Exception as e:
try:
macho = machofile.UniversalMachO(self.filepath)
if hasattr(macho, "parse"):
macho.parse()
return macho
except Exception as universal_error:
error_msg = (
"Failed to parse as both single and universal binary. Single: {}, Universal: {}".format(
e, universal_error
)
)
logger.warning(
"job_id:{} analyzer:{} md5:{} {}".format(
self.job_id, self.analyzer_name, self.md5, error_msg
)
)
raise Exception(error_msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalyzerRunException must be raised here. Also, this should not be a warning, it is too noisy. It should only be added to the report in the errors list. see other analyzers as a reference


@staticmethod
def _extract_basic_info(macho, results: Dict[str, Any]):
"""Extracts basic info like header, architectures, uuid, etc."""
if hasattr(macho, "get_general_info"):
results["general_info"] = macho.get_general_info(formatted=True)

elif hasattr(macho, "general_info"):
results["general_info"] = macho.general_info

if hasattr(macho, "get_macho_header"):
results["header"] = macho.get_macho_header(formatted=True)
elif hasattr(macho, "header"):
results["header"] = macho.header

if hasattr(macho, "get_architectures"):
results["architectures"] = macho.get_architectures()
elif hasattr(macho, "header") and "cputype" in results.get("header", {}):
results["architectures"] = [results["header"]["cputype"]]
else:
results["architectures"] = []

if hasattr(macho, "uuid"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are just converting an object into a dict. refactor all of this in a more convenient way. If the supported tool changes, we should change this too here...it is unfeasible...also this code is unreadable. please find a workaround

results["uuid"] = str(macho.uuid)

if hasattr(macho, "entry_point"):
results["entrypoint"] = str(macho.entry_point)

if hasattr(macho, "version_info"):
results["version_info"] = str(macho.version_info)

@staticmethod
def _extract_lists(macho, results: Dict[str, Any]):
"""Extracts list-based info like segments, dylibs, imports, exports."""
if hasattr(macho, "load_commands"):
results["load_commands"] = [str(lc) for lc in macho.load_commands]

if hasattr(macho, "segments"):
results["segments"] = [str(s) for s in macho.segments]

if hasattr(macho, "dylib_names"):
results["dylib_names"] = [_safe_decode(d) for d in macho.dylib_names]

if hasattr(macho, "get_imported_functions"):
results["imports"] = macho.get_imported_functions()
elif hasattr(macho, "imported_functions"):
results["imports"] = (
[_safe_decode(f) for f in macho.imported_functions] if macho.imported_functions else []
)

if hasattr(macho, "get_exported_symbols"):
results["exports"] = macho.get_exported_symbols()
elif hasattr(macho, "exported_symbols"):
results["exports"] = (
[_safe_decode(s) for s in macho.exported_symbols] if macho.exported_symbols else []
)

def run(self) -> Dict[str, Any]:
results = {}

if machofile is None:
error_msg = "machofile library is not installed"
logger.error(error_msg)
self.report.errors.append(error_msg)
self.report.status = self.report.STATUSES.FAILED
self.report.save()
return results

try:
macho = self._parse_macho()
if macho is None:
raise Exception("Failed to create MachO object")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check can be removed as _parse_macho() raises only on failure. works?

            if macho is None:
                raise Exception("Failed to create MachO object")


self._extract_basic_info(macho, results)
self._extract_lists(macho, results)

if hasattr(macho, "code_signature_info"):
results["code_signature"] = macho.code_signature_info

if hasattr(macho, "get_similarity_hashes"):
results["hashes"] = macho.get_similarity_hashes(formatted=True)

except Exception as e:
warning_message = "job_id:{} analyzer:{} md5:{} filename:{} MachoFile parsing error: {}".format(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f-strings everywhere

self.job_id, self.analyzer_name, self.md5, self.filename, e
)
logger.warning(warning_message, exc_info=True)
self.report.errors.append(warning_message)
self.report.status = self.report.STATUSES.FAILED
self.report.save()
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The analyzer is directly mutating self.report.status and saving the report inside run() when the machofile library is missing or parsing fails, but since Plugin.start() always calls after_run_success() whenever run() returns without raising, the final status will be overwritten to SUCCESS even for these failure cases. To keep behavior consistent with other analyzers and ensure failed parses are correctly surfaced, the error paths in run() should raise an exception (e.g., an AnalyzerRunException) after logging instead of setting the status manually, and rely on after_run_failed() to manage status and persistence.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be an appropriate fix

(131-133)

-            self.report.status = self.report.STATUSES.FAILED
-            self.report.save()
-            return results

+           raise AnalyzerRunException(error_msg)

(155-156)

-          self.report.status = self.report.STATUSES.FAILED
-          self.report.save()
+         raise AnalyzerRunException(warning_message)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mlodic PTAL


return results
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"name": "MachoFile",
"python_module": {
"module": "macho_info.MachoInfo",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"description": "Parse Mach-O binary files (macOS/iOS executables) using machofile library. Extracts headers, segments, dylibs, imports, exports, hashes, and code signatures.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

markdown link to the project

"disabled": False,
"soft_time_limit": 60,
"routing_key": "local",
"health_check_status": True,
"type": "file",
"docker_based": False,
"maximum_tlp": "RED",
"observable_supported": [],
"supported_filetypes": [
"application/x-mach-binary",
],
Comment on lines +28 to +33
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The supported_filetypes list includes very broad MIME types like "application/x-executable" and "application/x-binary", which are commonly used for non–Mach-O binaries (e.g., ELF). This will cause MachoInfo to run (and likely fail) on unrelated files; restrict supported_filetypes to Mach-O-specific MIME types (and/or add explicit signature checks + early exit) to avoid noisy failures and unnecessary work.

Copilot uses AI. Check for mistakes.
Comment on lines +28 to +33
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supported_filetypes includes generic MIME types like application/x-binary and application/x-executable. Since this analyzer is Mach-O specific (macOS/iOS), this configuration will make MachoInfo run on many non–Mach-O samples (e.g., ELF binaries commonly labeled application/x-executable), producing avoidable failures/noise in default playbooks. Consider restricting supported_filetypes to Mach-O specific MIME types (e.g., application/x-mach-binary and/or application/mac-binary) and removing the generic ones, or moving generic types to a separate opt-in config.

Copilot uses AI. Check for mistakes.
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"health_check_task": None,
"model": "analyzers_manager.AnalyzerConfig",
}

params = []
values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable o is not used.

Suggested change
o = Model.objects.get(**no_mtm)
Model.objects.get(**no_mtm)

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable o is not used.

Suggested change
o = Model.objects.get(**no_mtm)
Model.objects.get(**no_mtm)

Copilot uses AI. Check for mistakes.
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
dependencies = [
("api_app", "0071_delete_last_elastic_report"),
("analyzers_manager", "0175_analyzer_config_cleanbrowsing_malicious_detector"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
atomic = False
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.


from django.db import migrations


def migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.add(AnalyzerConfig.objects.get(name="MachoFile").id)
pc.full_clean()
pc.save()


def reverse_migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.remove(AnalyzerConfig.objects.get(name="MachoFile").id)
pc.full_clean()
pc.save()


class Migration(migrations.Migration):
dependencies = [
("playbooks_manager", "0061_replace_dns0_playbooks"),
("analyzers_manager", "0176_analyzer_config_macho_info"),
]

operations = [
migrations.RunPython(migrate, reverse_migrate),
]
9 changes: 7 additions & 2 deletions intel_owl/settings/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@

get_secret = os.environ.get

uid = pwd.getpwnam("www-data").pw_uid
gid = grp.getgrnam("www-data").gr_gid
try:
uid = pwd.getpwnam("www-data").pw_uid
gid = grp.getgrnam("www-data").gr_gid
except (KeyError, AttributeError):
# fallback for environments without www-data (like local macOS or CI envs)
uid = os.getuid()
gid = os.getgid()


def set_permissions(directory: Path, force_create: bool = False):
Expand Down
1 change: 1 addition & 0 deletions requirements/project-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ jbxapi==3.23.0
yara-x==1.10.0
flare-floss==3.1.1
flare-capa==9.3.1
machofile @ git+https://github.com/pstirparo/machofile.git@8601bcaf98d23ac068dc4a031827a1bf57b27c9b
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need pinned version otherwise we can't add it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aight, checking out which version can be used


# httpx required for HTTP/2 support (Mullvad DNS rejects HTTP/1.1 with protocol errors)
httpx[http2]==0.28.1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from api_app.analyzers_manager.file_analyzers.macho_info import MachoInfo
from tests import CustomTestCase


class MachoInfoTestCase(CustomTestCase):
fixtures = [
"api_app/fixtures/0001_user.json",
]

def test_macho_analysis(self):
"""Test MachoFile analyzer with a real sample"""
report = self._analyze_sample(
"macho_sample",
"80cc133a33786ceeacbd3acacd025dfc",
# not hardcoded, MD5 of tested data
"application/x-mach-binary",
"MachoFile",
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The analyzer config name in the migration is "MachoInfo" but the integration test is trying to retrieve an AnalyzerConfig with name "MachoFile". This mismatch will cause the test to fail with a DoesNotExist error. Please update line 17 to use "MachoInfo" instead of "MachoFile" to match the analyzer configuration name.

Suggested change
"MachoFile",
"MachoInfo",

Copilot uses AI. Check for mistakes.
MachoInfo,
)

# distinct checks
self.assertIn("header", report)
self.assertIn("magic", report["header"])

# Check architecture (our sample is likely x86_64 or arm64 depending on host build)
self.assertIn("architectures", report)
self.assertIsInstance(report["architectures"], list)

# Check segments
self.assertIn("segments", report)
self.assertGreater(len(report["segments"]), 0)

# Check dylibs (should have at least libSystem)
self.assertIn("dylib_names", report)
self.assertIsInstance(report["dylib_names"], list)

# Check hashes
self.assertIn("hashes", report)
self.assertIn("dylib_hash", report["hashes"])

# Check exports
self.assertIn("exports", report)
# exports might be list or dict depending on result format, analyzing sample suggested dict
self.assertTrue(isinstance(report["exports"], (list, dict)))

# Check code signature
self.assertIn("code_signature", report)
Loading
Loading