Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions tests/api_app/analyzers_manager/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ def test_pull(self):
from api_app.analyzers_manager.file_analyzers.yara_scan import YaraScan

analyzer = "Yara"
response = self.client.post(f"{self.URL}/{analyzer}/pull")
self.assertEqual(response.status_code, 200)
with patch.object(YaraScan, "update", return_value=True) as mock_update:
response = self.client.post(f"{self.URL}/{analyzer}/pull")
self.assertEqual(response.status_code, 200)

self.client.force_authenticate(self.superuser)
self.client.force_authenticate(self.superuser)

with patch.object(YaraScan, "update", return_value=True):
response = self.client.post(f"{self.URL}/{analyzer}/pull")
self.assertEqual(response.status_code, 200, response.json())
result = response.json()
self.assertIn("status", result)
self.assertTrue(result["status"])
self.assertEqual(response.status_code, 200, response.json())
result = response.json()
self.assertIn("status", result)
self.assertTrue(result["status"])
self.assertEqual(mock_update.call_count, 2)

analyzer = "Doc_Info"
response = self.client.post(f"{self.URL}/{analyzer}/pull")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def get_mocked_response(self):

return [
patch.object(CapaInfo, "update", return_value=True),
patch.object(CapaInfo, "_download_signatures", return_value=None),
patch("subprocess.run", return_value=response_from_command),
patch(
"api_app.analyzers_manager.file_analyzers.capa_info.requests.get",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def get_mocked_response(self):
"requests.Session.post",
return_value=self.MockUpResponse({"task": "123-456-789"}, 201),
),
patch(
"time.sleep",
return_value=None,
),
]

class MockUpResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def get_mocked_response():
],
),
patch("requests.post", return_value=MockUpResponse({"qid": 1}, 200)),
patch("time.sleep", return_value=None),
]

@classmethod
Expand Down
63 changes: 45 additions & 18 deletions tests/api_app/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import hashlib
import os
from typing import Tuple
from unittest.mock import patch

from django.conf import settings
from django.contrib.auth import get_user_model
Expand Down Expand Up @@ -101,7 +102,8 @@ def test_ask_analysis_availability__run_all_analyzers(self):
response = self.client.post("/api/ask_analysis_availability", data, format="json")
self.assertEqual(response.status_code, 200)

def test_analyze_file__pcap(self):
@patch("intel_owl.tasks.job_pipeline.apply_async")
def test_analyze_file__pcap(self, mock_apply_async):
# set a fake API key or YARAify_File_Scan will be skipped as not configured
models.PluginConfig.objects.create(
owner=self.user,
Expand Down Expand Up @@ -144,6 +146,9 @@ def test_analyze_file__pcap(self):
list(job.analyzers_to_execute.all().values_list("name", flat=True)),
)

mock_apply_async.assert_called_once()
self.assertEqual(mock_apply_async.call_args.kwargs["args"], [job_id])

def test_analyze_file__exe(self):
data = self.analyze_file_data.copy()
response = self.client.post("/api/analyze_file", data, format="multipart")
Expand Down Expand Up @@ -232,7 +237,8 @@ def test_analyze_observable__ip(self):
self.assertEqual(data["observable_classification"], job.analyzable.classification, msg=msg)
self.assertEqual(self.observable_md5, job.analyzable.md5, msg=msg)

def test_analyze_observable__guess_optional(self):
@patch("intel_owl.tasks.job_pipeline.apply_async")
def test_analyze_observable__guess_optional(self, mock_apply_async):
data = self.analyze_observable_ip_data.copy()
observable_classification = data.pop("observable_classification") # let the server calc it

Expand All @@ -252,16 +258,20 @@ def test_analyze_observable__guess_optional(self):
self.assertEqual(observable_classification, job.analyzable.classification, msg=msg)
self.assertEqual(self.observable_md5, job.analyzable.md5, msg=msg)

def test_analyze_multiple_observables(self):
mock_apply_async.assert_called_once()
self.assertEqual(mock_apply_async.call_args.kwargs["args"], [job_id])

@patch("intel_owl.tasks.job_pipeline.apply_async")
def test_analyze_multiple_observables(self, mock_apply_async):
data = self.mixed_observable_data.copy()

response = self.client.post("/api/analyze_multiple_observables", data, format="json")
contents = response.json()
msg = (response.status_code, contents)
self.assertEqual(response.status_code, 200, msg=msg)
self.assertEqual(mock_apply_async.call_count, len(data["observables"]))

content = contents["results"][0]

job_id = int(content["job_id"])
job = models.Job.objects.get(pk=job_id)
self.assertEqual(data["observables"][0][1], job.analyzable.name, msg=msg)
Expand All @@ -275,9 +285,9 @@ def test_analyze_multiple_observables(self):
list(job.analyzers_to_execute.all().values_list("name", flat=True)),
msg=msg,
)
self.assertEqual(mock_apply_async.call_args_list[0].kwargs["args"], [job_id])

content = contents["results"][1]

job_id = int(content["job_id"])
job = models.Job.objects.get(pk=job_id)
self.assertEqual(data["observables"][1][1], job.analyzable.name, msg=msg)
Expand All @@ -286,6 +296,7 @@ def test_analyze_multiple_observables(self):
list(job.analyzers_to_execute.all().values_list("name", flat=True)),
msg=msg,
)
self.assertEqual(mock_apply_async.call_args_list[1].kwargs["args"], [job_id])
job.delete()

def test_observable_no_analyzers_only_connector(self):
Expand Down Expand Up @@ -527,7 +538,9 @@ def test_job_rescan__observable_playbook(self):
"visualizers": {},
},
)
response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
# dont actually run the analyzers, they are tested in unit tests
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: "dont" → "don't".

Suggested change
# dont actually run the analyzers, they are tested in unit tests
# don't actually run the analyzers, they are tested in unit tests

Copilot uses AI. Check for mistakes.
with patch("intel_owl.tasks.job_pipeline.apply_async") as mock_apply_async:
response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
contents = response.json()
self.assertEqual(response.status_code, 202, contents)
new_job_id = int(contents["id"])
Expand All @@ -543,6 +556,8 @@ def test_job_rescan__observable_playbook(self):
"visualizers": {},
},
)
mock_apply_async.assert_called_once()
self.assertEqual(mock_apply_async.call_args.kwargs["args"], [new_job_id])
an.delete()

def test_job_rescan__sample_analyzers(self):
Expand All @@ -567,7 +582,8 @@ def test_job_rescan__sample_analyzers(self):
)
job.analyzers_requested.set([AnalyzerConfig.objects.get(name="Strings_Info")])

response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
with patch("intel_owl.tasks.job_pipeline.apply_async") as mock_apply_async:
response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
contents = response.json()
self.assertEqual(response.status_code, 202, contents)
new_job_id = int(contents["id"])
Expand All @@ -579,6 +595,8 @@ def test_job_rescan__sample_analyzers(self):
list(new_job.analyzers_requested.all()),
[AnalyzerConfig.objects.get(name="Strings_Info")],
)
mock_apply_async.assert_called_once()
self.assertEqual(mock_apply_async.call_args.kwargs["args"], [new_job_id])
self.assertEqual(
new_job.runtime_configuration,
{
Expand Down Expand Up @@ -617,7 +635,8 @@ def test_job_rescan__sample_playbook(self):
},
)

response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
with patch("intel_owl.tasks.job_pipeline.apply_async") as mock_apply_async:
response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
contents = response.json()
self.assertEqual(response.status_code, 202, contents)
new_job_id = int(contents["id"])
Expand All @@ -642,6 +661,8 @@ def test_job_rescan__sample_playbook(self):
"visualizers": {},
},
)
mock_apply_async.assert_called_once()
self.assertEqual(mock_apply_async.call_args.kwargs["args"], [new_job_id])
job.delete()
an.delete()

Expand All @@ -665,14 +686,20 @@ def test_job_rescan__permission(self):
"visualizers": {},
},
)
# same user
response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
contents = response.json()
self.assertEqual(response.status_code, 202, contents)
# another user
self.client.logout()
self.client.force_login(self.guest)
response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
contents = response.json()
self.assertEqual(response.status_code, 403, contents)
with patch("intel_owl.tasks.job_pipeline.apply_async") as mock_apply_async:
response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
contents = response.json()
self.assertEqual(response.status_code, 202, contents)
new_job_id = int(contents["id"])
mock_apply_async.assert_called_once()
self.assertEqual(mock_apply_async.call_args.kwargs["args"], [new_job_id])

mock_apply_async.reset_mock()

self.client.logout()
self.client.force_login(self.guest)
response = self.client.post(f"/api/jobs/{job.pk}/rescan", format="json")
contents = response.json()
self.assertEqual(response.status_code, 403, contents)
mock_apply_async.assert_not_called()
an.delete()
18 changes: 15 additions & 3 deletions tests/test_crons.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,21 @@ def test_quark_updater(self):
quark_engine.QuarkEngine.update()
self.assertTrue(os.path.exists(DIR_PATH))

def test_yara_updater(self):
yara_scan.YaraScan.update()
self.assertTrue(len(os.listdir(settings.YARA_RULES_PATH)))
@if_mock_connections(
patch("git.Repo"),
patch("requests.get", return_value=MockUpResponse({}, 200)),
patch("zipfile.ZipFile"),
)
def test_yara_updater(self, mock_zipfile=None, mock_get=None, mock_repo=None):
# When settings.MOCK_CONNECTIONS is False, @if_mock_connections is a no-op
# and no mocks are injected, so all mock parameters will be None. In that
# case, skip this test to avoid AttributeError in CI.
if mock_zipfile is None or mock_get is None or mock_repo is None:
self.skipTest("MOCK_CONNECTIONS is disabled; yara updater test requires mocked connections.")
mock_repo.clone_from.side_effect = lambda url, path, **kwargs: os.makedirs(path, exist_ok=True)
mock_zipfile.return_value.extractall.side_effect = lambda path: os.makedirs(path, exist_ok=True)
result = yara_scan.YaraScan.update()
self.assertTrue(result)

@if_mock_connections(
patch(
Expand Down
Loading