-
-
Notifications
You must be signed in to change notification settings - Fork 579
Machofile analyzer #3268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Machofile analyzer #3268
Changes from 10 commits
6f01a28
a185d08
40a9ebc
b4cd43b
3e6bc3d
a66781c
946e43a
933989b
3da2d8e
8c7f00f
520cf62
94a3c65
395a2a3
964400c
6bce4ff
e1b3822
11d1bab
3907083
c6526c5
5a9b7db
6185f41
f65ca9f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,158 @@ | ||
| # This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl | ||
| # See the file 'LICENSE' for copying permission. | ||
|
|
||
| import logging | ||
| from typing import Any, Dict | ||
|
|
||
| try: | ||
| import machofile | ||
| except ImportError: | ||
| machofile = None | ||
|
|
||
| from api_app.analyzers_manager.classes import FileAnalyzer | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _safe_decode(value: Any) -> str: | ||
| """Helper to safely decode bytes to string.""" | ||
| if isinstance(value, bytes): | ||
| return value.decode("utf-8", "ignore") | ||
| return str(value) | ||
|
|
||
|
|
||
| class MachoInfo(FileAnalyzer): | ||
| """ | ||
| Analyzer for Mach-O binary files (macOS/iOS executables). | ||
| Uses the machofile library to parse and extract information. | ||
|
|
||
| API Validation Strategy: | ||
| This analyzer uses defensive programming with hasattr() checks because: | ||
| - The machofile library API varies between single-arch and Universal (FAT) binaries | ||
| - Different Mach-O file types may expose different methods/properties | ||
| - The behavior is validated through tests and internal documentation | ||
|
|
||
| Library reference: https://github.com/pstirparo/machofile | ||
| """ | ||
|
|
||
| @classmethod | ||
| def update(cls) -> bool: | ||
| return False | ||
|
|
||
| def _parse_macho(self): | ||
| """Attempts to parse the file as Single or Universal Mach-O.""" | ||
| try: | ||
| macho = machofile.MachO(self.filepath) | ||
| if hasattr(macho, "parse"): | ||
| macho.parse() | ||
| return macho | ||
| except Exception as e: | ||
| try: | ||
| macho = machofile.UniversalMachO(self.filepath) | ||
| if hasattr(macho, "parse"): | ||
| macho.parse() | ||
| return macho | ||
| except Exception as universal_error: | ||
| error_msg = ( | ||
| "Failed to parse as both single and universal binary. Single: {}, Universal: {}".format( | ||
| e, universal_error | ||
| ) | ||
| ) | ||
| logger.warning( | ||
| "job_id:{} analyzer:{} md5:{} {}".format( | ||
| self.job_id, self.analyzer_name, self.md5, error_msg | ||
| ) | ||
| ) | ||
| raise Exception(error_msg) | ||
|
||
|
|
||
| @staticmethod | ||
| def _extract_basic_info(macho, results: Dict[str, Any]): | ||
| """Extracts basic info like header, architectures, uuid, etc.""" | ||
| if hasattr(macho, "get_general_info"): | ||
| results["general_info"] = macho.get_general_info(formatted=True) | ||
|
|
||
| elif hasattr(macho, "general_info"): | ||
| results["general_info"] = macho.general_info | ||
|
|
||
| if hasattr(macho, "get_macho_header"): | ||
| results["header"] = macho.get_macho_header(formatted=True) | ||
| elif hasattr(macho, "header"): | ||
| results["header"] = macho.header | ||
|
|
||
| if hasattr(macho, "get_architectures"): | ||
| results["architectures"] = macho.get_architectures() | ||
| elif hasattr(macho, "header") and "cputype" in results.get("header", {}): | ||
| results["architectures"] = [results["header"]["cputype"]] | ||
| else: | ||
| results["architectures"] = [] | ||
|
|
||
| if hasattr(macho, "uuid"): | ||
|
||
| results["uuid"] = str(macho.uuid) | ||
|
|
||
| if hasattr(macho, "entry_point"): | ||
| results["entrypoint"] = str(macho.entry_point) | ||
|
|
||
| if hasattr(macho, "version_info"): | ||
| results["version_info"] = str(macho.version_info) | ||
|
|
||
| @staticmethod | ||
| def _extract_lists(macho, results: Dict[str, Any]): | ||
| """Extracts list-based info like segments, dylibs, imports, exports.""" | ||
| if hasattr(macho, "load_commands"): | ||
| results["load_commands"] = [str(lc) for lc in macho.load_commands] | ||
|
|
||
| if hasattr(macho, "segments"): | ||
| results["segments"] = [str(s) for s in macho.segments] | ||
|
|
||
| if hasattr(macho, "dylib_names"): | ||
| results["dylib_names"] = [_safe_decode(d) for d in macho.dylib_names] | ||
|
|
||
| if hasattr(macho, "get_imported_functions"): | ||
| results["imports"] = macho.get_imported_functions() | ||
| elif hasattr(macho, "imported_functions"): | ||
| results["imports"] = ( | ||
| [_safe_decode(f) for f in macho.imported_functions] if macho.imported_functions else [] | ||
| ) | ||
|
|
||
| if hasattr(macho, "get_exported_symbols"): | ||
| results["exports"] = macho.get_exported_symbols() | ||
| elif hasattr(macho, "exported_symbols"): | ||
| results["exports"] = ( | ||
| [_safe_decode(s) for s in macho.exported_symbols] if macho.exported_symbols else [] | ||
| ) | ||
|
|
||
| def run(self) -> Dict[str, Any]: | ||
| results = {} | ||
|
|
||
| if machofile is None: | ||
| error_msg = "machofile library is not installed" | ||
| logger.error(error_msg) | ||
| self.report.errors.append(error_msg) | ||
| self.report.status = self.report.STATUSES.FAILED | ||
| self.report.save() | ||
| return results | ||
|
|
||
| try: | ||
| macho = self._parse_macho() | ||
| if macho is None: | ||
| raise Exception("Failed to create MachO object") | ||
|
||
|
|
||
| self._extract_basic_info(macho, results) | ||
| self._extract_lists(macho, results) | ||
|
|
||
| if hasattr(macho, "code_signature_info"): | ||
| results["code_signature"] = macho.code_signature_info | ||
|
|
||
| if hasattr(macho, "get_similarity_hashes"): | ||
| results["hashes"] = macho.get_similarity_hashes(formatted=True) | ||
|
|
||
| except Exception as e: | ||
| warning_message = "job_id:{} analyzer:{} md5:{} filename:{} MachoFile parsing error: {}".format( | ||
|
||
| self.job_id, self.analyzer_name, self.md5, self.filename, e | ||
| ) | ||
| logger.warning(warning_message, exc_info=True) | ||
| self.report.errors.append(warning_message) | ||
| self.report.status = self.report.STATUSES.FAILED | ||
| self.report.save() | ||
|
||
|
|
||
| return results | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| from django.db import migrations | ||
| from django.db.models.fields.related_descriptors import ( | ||
| ForwardManyToOneDescriptor, | ||
| ForwardOneToOneDescriptor, | ||
| ManyToManyDescriptor, | ||
| ) | ||
|
|
||
| plugin = { | ||
| "name": "MachoFile", | ||
| "python_module": { | ||
| "module": "macho_info.MachoInfo", | ||
| "base_path": "api_app.analyzers_manager.file_analyzers", | ||
| }, | ||
| "description": "Parse Mach-O binary files (macOS/iOS executables) using machofile library. Extracts headers, segments, dylibs, imports, exports, hashes, and code signatures.", | ||
| "disabled": False, | ||
| "soft_time_limit": 60, | ||
| "routing_key": "local", | ||
| "health_check_status": True, | ||
| "type": "file", | ||
| "docker_based": False, | ||
| "maximum_tlp": "RED", | ||
| "observable_supported": [], | ||
| "supported_filetypes": [ | ||
| "application/x-mach-binary", | ||
| ], | ||
| "run_hash": False, | ||
| "run_hash_type": "", | ||
| "not_supported_filetypes": [], | ||
| "health_check_task": None, | ||
| "model": "analyzers_manager.AnalyzerConfig", | ||
| } | ||
|
|
||
| params = [] | ||
| values = [] | ||
|
|
||
|
|
||
| def _get_real_obj(Model, field, value): | ||
| if ( | ||
| type(getattr(Model, field)) | ||
| in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor] | ||
| and value | ||
| ): | ||
| other_model = getattr(Model, field).get_queryset().model | ||
| # in case is a dictionary, we have to retrieve the object with every key | ||
| if isinstance(value, dict): | ||
| real_vals = {} | ||
| for key, real_val in value.items(): | ||
| real_vals[key] = _get_real_obj(other_model, key, real_val) | ||
| value = other_model.objects.get_or_create(**real_vals)[0] | ||
| # it is just the primary key serialized | ||
| else: | ||
| value = other_model.objects.get(pk=value) | ||
| return value | ||
|
|
||
|
|
||
| def _create_object(Model, data): | ||
| mtm, no_mtm = {}, {} | ||
| for field, value in data.items(): | ||
| if type(getattr(Model, field)) is ManyToManyDescriptor: | ||
| mtm[field] = value | ||
| else: | ||
| value = _get_real_obj(Model, field, value) | ||
| no_mtm[field] = value | ||
| try: | ||
| Model.objects.get(**no_mtm) | ||
| except Model.DoesNotExist: | ||
| o = Model(**no_mtm) | ||
| o.full_clean() | ||
| o.save() | ||
| for field, value in mtm.items(): | ||
| attribute = getattr(o, field) | ||
| attribute.set(value) | ||
| return False | ||
| return True | ||
|
|
||
|
|
||
| def migrate(apps, schema_editor): | ||
| Parameter = apps.get_model("api_app", "Parameter") | ||
| PluginConfig = apps.get_model("api_app", "PluginConfig") | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| if not Model.objects.filter(name=plugin["name"]).exists(): | ||
|
Comment on lines
106
to
108
|
||
| exists = _create_object(Model, plugin) | ||
| if not exists: | ||
| for param in params: | ||
| _create_object(Parameter, param) | ||
| for value in values: | ||
| _create_object(PluginConfig, value) | ||
|
|
||
|
|
||
| def reverse_migrate(apps, schema_editor): | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| Model.objects.get(name=plugin["name"]).delete() | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| dependencies = [ | ||
| ("api_app", "0001_2_initial_squashed"), | ||
| ("analyzers_manager", "0174_phishstats_url"), | ||
| ] | ||
|
|
||
| operations = [migrations.RunPython(migrate, reverse_migrate)] | ||
| atomic = False | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| # This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl | ||
| # See the file 'LICENSE' for copying permission. | ||
|
|
||
|
|
||
| from django.db import migrations | ||
|
|
||
|
|
||
| def migrate(apps, schema_editor): | ||
| playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig") | ||
| AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig") | ||
| pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS") | ||
| pc.analyzers.add(AnalyzerConfig.objects.get(name="MachoFile").id) | ||
| pc.full_clean() | ||
| pc.save() | ||
|
|
||
|
|
||
| def reverse_migrate(apps, schema_editor): | ||
| playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig") | ||
| AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig") | ||
| pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS") | ||
| pc.analyzers.remove(AnalyzerConfig.objects.get(name="MachoFile").id) | ||
| pc.full_clean() | ||
| pc.save() | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| dependencies = [ | ||
| ("playbooks_manager", "0061_replace_dns0_playbooks"), | ||
| ("analyzers_manager", "0175_analyzer_config_macho_info"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.RunPython(migrate, reverse_migrate), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,6 +96,7 @@ jbxapi==3.23.0 | |
| yara-x==1.10.0 | ||
| flare-floss==3.1.1 | ||
| flare-capa==9.3.1 | ||
| machofile @ git+https://github.com/pstirparo/machofile.git@8601bcaf98d23ac068dc4a031827a1bf57b27c9b | ||
|
||
|
|
||
| # httpx required for HTTP/2 support (Mullvad DNS rejects HTTP/1.1 with protocol errors) | ||
| httpx[http2]==0.28.1 | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,47 @@ | ||||||
| from api_app.analyzers_manager.file_analyzers.macho_info import MachoInfo | ||||||
mlodic marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| from tests import CustomTestCase | ||||||
|
|
||||||
IshaanXCoder marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
mlodic marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| class MachoInfoTestCase(CustomTestCase): | ||||||
| fixtures = [ | ||||||
| "api_app/fixtures/0001_user.json", | ||||||
| ] | ||||||
|
|
||||||
IshaanXCoder marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| def test_macho_analysis(self): | ||||||
| """Test MachoFile analyzer with a real sample""" | ||||||
| report = self._analyze_sample( | ||||||
| "macho_sample", | ||||||
| "80cc133a33786ceeacbd3acacd025dfc", | ||||||
| # not hardcoded, MD5 of tested data | ||||||
IshaanXCoder marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| "application/x-mach-binary", | ||||||
| "MachoFile", | ||||||
|
||||||
| "MachoFile", | |
| "MachoInfo", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok but why this should happen? remove this and everything related to handling the missing library