Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions api_app/analyzers_manager/migrations/0193_analyzer_config_rdap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "rdap.Rdap",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "Rdap",
"description": "Query the public [RDAP](https://about.rdap.org/) bootstrap "
"(rdap.org) for registration data (RFC 9082); the free, unauthenticated WHOIS "
"successor. Supports IPs, domains and URLs.",
"disabled": False,
"soft_time_limit": 30,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": ["ip", "domain", "url"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"mapping_data_model": {},
"model": "analyzers_manager.AnalyzerConfig",
}

params = []

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [
ForwardManyToOneDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
ForwardOneToOneDescriptor,
]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Comment on lines +103 to +104
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Comment on lines +115 to +116
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0073_alter_updatecheckstatus_last_checked_at_and_more"),
("analyzers_manager", "0192_analyzer_config_ipqs_url_file_scanner"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
61 changes: 61 additions & 0 deletions api_app/analyzers_manager/observable_analyzers/rdap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

from urllib.parse import urlparse

import requests

from api_app.analyzers_manager import classes
from api_app.analyzers_manager.exceptions import AnalyzerRunException
from api_app.choices import Classification


class Rdap(classes.ObservableAnalyzer):
"""Query the public RDAP bootstrap (https://rdap.org) for an observable's
registration data.

RDAP (Registration Data Access Protocol, RFC 9082/9083) is the IETF-standard,
free and unauthenticated successor to WHOIS. It returns structured JSON
describing the registration of IP addresses, domains, and URLs (resolved to
their host). The rdap.org bootstrap redirects each query to the authoritative
RDAP server for the object.
"""

url: str = "https://rdap.org"

def update(self) -> bool:
pass
Comment on lines +26 to +27

def run(self):
if self.observable_classification == Classification.IP:
path = f"ip/{self.observable_name}"
elif self.observable_classification == Classification.DOMAIN:
path = f"domain/{self.observable_name}"
elif self.observable_classification == Classification.URL:
hostname = urlparse(self.observable_name).hostname
if not hostname:
raise AnalyzerRunException(f"unable to extract a hostname from URL {self.observable_name}")
path = f"domain/{hostname}"
else:
raise AnalyzerRunException(
f"{self.observable_classification} is not a supported observable type "
"for RDAP (supported: ip, domain, url)"
)

try:
response = requests.get(
f"{self.url}/{path}",
headers={"Accept": "application/rdap+json"},
timeout=10,
)
# RDAP returns 404 when the registry holds no record for the object;
# treat that as a clean negative result rather than an error.
if response.status_code == 404:
return {"found": False}
response.raise_for_status()
except requests.RequestException as e:
raise AnalyzerRunException(e)
Comment on lines +56 to +57

result = response.json()
result["found"] = True
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.


from django.db import migrations


def migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.add(AnalyzerConfig.objects.get(name="Rdap").id)
pc.full_clean()
pc.save()


def reverse_migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.remove(AnalyzerConfig.objects.get(name="Rdap").id)
pc.full_clean()
pc.save()


class Migration(migrations.Migration):
dependencies = [
("playbooks_manager", "0066_link_crawl_visualizer_to_playbook"),
("analyzers_manager", "0193_analyzer_config_rdap"),
]

operations = [
migrations.RunPython(migrate, reverse_migrate),
]
61 changes: 61 additions & 0 deletions tests/api_app/analyzers_manager/observable_analyzers/test_rdap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

from unittest.mock import MagicMock, patch

from api_app.analyzers_manager.exceptions import AnalyzerRunException
from api_app.analyzers_manager.observable_analyzers.rdap import Rdap
from api_app.choices import Classification
from tests import CustomTestCase


class RdapTestCase(CustomTestCase):
"""Unit tests for the RDAP analyzer (mocked HTTP, no live calls)."""

@staticmethod
def _analyzer(observable_name, classification):
analyzer = Rdap(config={})
analyzer.observable_name = observable_name
analyzer.observable_classification = classification
return analyzer

@staticmethod
def _ok(json_data):
response = MagicMock(status_code=200)
response.raise_for_status.return_value = None
response.json.return_value = json_data
return response

@patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get")
def test_domain_found(self, mock_get):
mock_get.return_value = self._ok({"objectClassName": "domain", "ldhName": "example.com"})
result = self._analyzer("example.com", Classification.DOMAIN).run()
self.assertTrue(result["found"])
self.assertEqual(result["ldhName"], "example.com")
self.assertIn("rdap.org/domain/example.com", mock_get.call_args.args[0])

@patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get")
def test_ip_uses_ip_endpoint(self, mock_get):
mock_get.return_value = self._ok({"objectClassName": "ip network"})
self._analyzer("1.1.1.1", Classification.IP).run()
self.assertIn("rdap.org/ip/1.1.1.1", mock_get.call_args.args[0])

@patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get")
def test_url_resolves_to_host_domain(self, mock_get):
mock_get.return_value = self._ok({"objectClassName": "domain"})
self._analyzer("https://sub.example.com/path?q=1", Classification.URL).run()
self.assertIn("rdap.org/domain/sub.example.com", mock_get.call_args.args[0])

@patch("api_app.analyzers_manager.observable_analyzers.rdap.requests.get")
def test_not_found_returns_clean_negative(self, mock_get):
mock_get.return_value = MagicMock(status_code=404)
result = self._analyzer("does-not-exist.invalid", Classification.DOMAIN).run()
self.assertEqual(result, {"found": False})

def test_unsupported_classification_raises(self):
with self.assertRaises(AnalyzerRunException):
self._analyzer("deadbeefdeadbeef", Classification.HASH).run()

def test_url_without_hostname_raises(self):
with self.assertRaises(AnalyzerRunException):
self._analyzer("not-a-url", Classification.URL).run()
Loading