Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions api_app/analyzers_manager/file_analyzers/macho_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

import logging
from typing import Any, Dict

try:
import machofile
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok but why this should happen? remove this and everything related to handling the missing library

except ImportError:
machofile = None

from api_app.analyzers_manager.classes import FileAnalyzer

logger = logging.getLogger(__name__)


class MachoInfo(FileAnalyzer):
"""
Analyzer for Mach-O binary files (macOS/iOS executables).
Uses the machofile library to parse and extract information.

API Validation Strategy:
This analyzer uses defensive programming with hasattr() checks because:
- The machofile library API varies between single-arch and Universal (FAT) binaries
- Different Mach-O file types may expose different methods/properties
- The behavior is validated through tests and internal documentation

Library reference: https://github.com/pstirparo/machofile
"""

def run(self) -> Dict[str, Any]:
results = {}

if machofile is None:
error_msg = "machofile library is not installed"
logger.error(error_msg)
self.report.errors.append(error_msg)
self.report.status = self.report.STATUSES.FAILED
self.report.save()
return results

try:
# Parse the Mach-O file
macho = None
parse_error = None

# Try single-architecture first
try:
macho = machofile.MachO(self.filepath)
if hasattr(macho, "parse"):
macho.parse()
except Exception as e:
# If single-arch fails, try Universal (FAT) binary
try:
macho = machofile.UniversalMachO(self.filepath)
if hasattr(macho, "parse"):
macho.parse()
except Exception as universal_error:
# Both parsers failed
parse_error = f"Failed to parse as both single and universal binary. Single: {str(e)}, Universal: {str(universal_error)}"
logger.warning(
f"job_id:{self.job_id} analyzer:{self.analyzer_name} md5:{self.md5} {parse_error}"
)
# After both parsing attempts fail, fail the analyzer cleanly instead of retrying construction without parsing.
raise Exception(parse_error)

if macho is None:
raise Exception("Failed to create MachO object")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check can be removed as _parse_macho() raises only on failure. works?

            if macho is None:
                raise Exception("Failed to create MachO object")


if hasattr(macho, "get_general_info"):
results["general_info"] = macho.get_general_info(formatted=True)
elif hasattr(macho, "general_info"):
results["general_info"] = macho.general_info

if hasattr(macho, "get_macho_header"):
results["header"] = macho.get_macho_header(formatted=True)
elif hasattr(macho, "header"):
results["header"] = macho.header

if hasattr(macho, "get_architectures"):
results["architectures"] = macho.get_architectures()
elif hasattr(macho, "header") and "cputype" in results.get("header", {}):
results["architectures"] = [results["header"]["cputype"]]
else:
results["architectures"] = []

if hasattr(macho, "load_commands"):
results["load_commands"] = [str(lc) for lc in macho.load_commands]

if hasattr(macho, "segments"):
results["segments"] = [str(s) for s in macho.segments]

if hasattr(macho, "dylib_names"):
results["dylib_names"] = [d.decode("utf-8", "ignore") if isinstance(d, bytes) else str(d) for d in macho.dylib_names]

if hasattr(macho, "uuid"):
results["uuid"] = str(macho.uuid)

if hasattr(macho, "entry_point"):
results["entrypoint"] = str(macho.entry_point)

if hasattr(macho, "version_info"):
results["version_info"] = str(macho.version_info)

if hasattr(macho, "code_signature_info"):
results["code_signature"] = macho.code_signature_info

if hasattr(macho, "get_imported_functions"):
results["imports"] = macho.get_imported_functions()
elif hasattr(macho, "imported_functions"):
import_funcs = macho.imported_functions
if import_funcs:
results["imports"] = [f.decode("utf-8", "ignore") if isinstance(f, bytes) else str(f) for f in import_funcs]
else:
results["imports"] = []

if hasattr(macho, "get_exported_symbols"):
results["exports"] = macho.get_exported_symbols()
elif hasattr(macho, "exported_symbols"):
if macho.exported_symbols:
results["exports"] = [s.decode("utf-8", "ignore") if isinstance(s, bytes) else str(s) for s in macho.exported_symbols]
else:
results["exports"] = []

if hasattr(macho, "get_similarity_hashes"):
results["hashes"] = macho.get_similarity_hashes(formatted=True)

except Exception as e:
warning_message = f"job_id:{self.job_id} analyzer:{self.analyzer_name} md5:{self.md5} filename:{self.filename} MachoFile parsing error: {e}"
logger.warning(warning_message, exc_info=True)
self.report.errors.append(warning_message)
self.report.status = self.report.STATUSES.FAILED
self.report.save()

return results
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"name": "MachoFile",
"python_module": {
"module": "macho_info.MachoInfo",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"description": "Parse Mach-O binary files (macOS/iOS executables) using machofile library. Extracts headers, segments, dylibs, imports, exports, hashes, and code signatures.",
"disabled": False,
"soft_time_limit": 60,
"routing_key": "local",
"health_check_status": True,
"type": "file",
"docker_based": False,
"maximum_tlp": "RED",
"observable_supported": [],
"supported_filetypes": [
"application/x-mach-binary",
],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"health_check_task": None,
"model": "analyzers_manager.AnalyzerConfig",
}

params = []
values = []


def _get_real_obj(Model, field, value):
if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
# in case is a dictionary, we have to retrieve the object with every key
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
value = other_model.objects.get(pk=value)
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
value = _get_real_obj(Model, field, value)
no_mtm[field] = value
try:
Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
Comment on lines 106 to 108
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugin.pop("model") is called both in migrate and again in reverse_migrate, so after the forward migration has run once in a given process the second pop will raise a KeyError because the model key has already been removed from the shared plugin dict. To avoid this, avoid mutating plugin here (e.g. use python_path = plugin["model"]) or cache the model path in a separate module-level variable that both functions can read without popping.

Copilot uses AI. Check for mistakes.
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
dependencies = [
("api_app", "0001_2_initial_squashed"),
("analyzers_manager", "0174_phishstats_url"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
atomic = False
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.


from django.db import migrations


def migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.add(AnalyzerConfig.objects.get(name="MachoFile").id)
pc.full_clean()
pc.save()


def reverse_migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.remove(AnalyzerConfig.objects.get(name="MachoFile").id)
pc.full_clean()
pc.save()


class Migration(migrations.Migration):
dependencies = [
("playbooks_manager", "0061_replace_dns0_playbooks"),
("analyzers_manager", "0175_analyzer_config_macho_info"),
]

operations = [
migrations.RunPython(migrate, reverse_migrate),
]
1 change: 1 addition & 0 deletions requirements/project-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ jbxapi==3.23.0
yara-x==1.10.0
flare-floss==3.1.1
flare-capa==9.3.1
machofile @ git+https://github.com/pstirparo/machofile.git@8601bcaf98d23ac068dc4a031827a1bf57b27c9b
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need pinned version otherwise we can't add it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aight, checking out which version can be used


# httpx required for HTTP/2 support (Mullvad DNS rejects HTTP/1.1 with protocol errors)
httpx[http2]==0.28.1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from api_app.analyzers_manager.file_analyzers.macho_info import MachoInfo
from tests import CustomTestCase


class MachoInfoTestCase(CustomTestCase):
fixtures = [
"api_app/fixtures/0001_user.json",
]

def test_macho_analysis(self):
"""Test MachoFile analyzer with a real sample"""
report = self._analyze_sample(
"macho_sample",
"80cc133a33786ceeacbd3acacd025dfc",
# not hardcoded, MD5 of tested data
"application/x-mach-binary",
"MachoFile",
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The analyzer config name in the migration is "MachoInfo" but the integration test is trying to retrieve an AnalyzerConfig with name "MachoFile". This mismatch will cause the test to fail with a DoesNotExist error. Please update line 17 to use "MachoInfo" instead of "MachoFile" to match the analyzer configuration name.

Suggested change
"MachoFile",
"MachoInfo",

Copilot uses AI. Check for mistakes.
MachoInfo,
)

# distinct checks
self.assertIn("header", report)
self.assertIn("magic", report["header"])

# Check architecture (our sample is likely x86_64 or arm64 depending on host build)
self.assertIn("architectures", report)
self.assertIsInstance(report["architectures"], list)

# Check segments
self.assertIn("segments", report)
self.assertGreater(len(report["segments"]), 0)

# Check dylibs (should have at least libSystem)
self.assertIn("dylib_names", report)
self.assertIsInstance(report["dylib_names"], list)

# Check hashes
self.assertIn("hashes", report)
self.assertIn("dylib_hash", report["hashes"])

# Check exports
self.assertIn("exports", report)
# exports might be list or dict depending on result format, analyzing sample suggested dict
self.assertTrue(isinstance(report["exports"], (list, dict)))

# Check code signature
self.assertIn("code_signature", report)
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class BaseFileAnalyzerTest(TestCase):
"text/xml": "android.xml",
"application/zip": "test.zip",
"application/x-dex": "sample.dex",
"application/x-mach-binary": "macho.sample",
}

@classmethod
Expand Down
Loading
Loading