Skip to content

Commit 42bf33b

Browse files
dekkersDonnypeammar92Darwinkel
authored
Fix scan profile db event issue by adding an explicit reference field (1.8) (#1094)
Co-authored-by: Donny Peeters <46660228+Donnype@users.noreply.github.com> Co-authored-by: ammar92 <ammar.abdulamir@gmail.com> Co-authored-by: Patrick <Darwinkel@users.noreply.github.com>
1 parent 677b2c0 commit 42bf33b

7 files changed

Lines changed: 373 additions & 132 deletions

File tree

octopoes/octopoes/core/service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ def _on_delete_origin_parameter(self, event: OriginParameterDBEvent) -> None:
411411
return
412412

413413
def _run_inferences(self, event: ScanProfileDBEvent) -> None:
414-
inference_origins = self.origin_repository.list_by_source(event.new_data.reference, valid_time=event.valid_time)
414+
inference_origins = self.origin_repository.list_by_source(event.reference, valid_time=event.valid_time)
415415
inference_origins = [o for o in inference_origins if o.origin_type == OriginType.INFERENCE]
416416
for inference_origin in inference_origins:
417417
self._run_inference(inference_origin, event.valid_time)

octopoes/octopoes/events/events.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,13 @@ def primary_key(self) -> str:
5858

5959
class ScanProfileDBEvent(DBEvent):
6060
entity_type: Literal["scan_profile"] = "scan_profile"
61+
reference: Reference
6162
old_data: Optional[ScanProfile]
6263
new_data: Optional[ScanProfile]
6364

6465
@property
6566
def primary_key(self) -> Reference:
66-
return self.new_data.reference if self.new_data else self.old_data.reference
67+
return self.reference
6768

6869

6970
EVENT_TYPE = Union[OOIDBEvent, OriginDBEvent, OriginParameterDBEvent, ScanProfileDBEvent]

octopoes/octopoes/events/manager.py

Lines changed: 46 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -54,60 +54,58 @@ def publish(self, event: DBEvent) -> None:
5454
event.client,
5555
)
5656

57-
if isinstance(event, ScanProfileDBEvent):
58-
incremented = (event.operation_type == OperationType.CREATE and event.new_data.level > 0) or (
59-
event.operation_type == OperationType.UPDATE and event.new_data.level > event.old_data.level
60-
)
61-
if incremented:
62-
ooi = json.dumps(
63-
{
64-
"primary_key": event.new_data.reference,
65-
"object_type": event.new_data.reference.class_,
66-
"scan_profile": event.new_data.dict(),
67-
}
68-
)
69-
70-
self.channel.basic_publish(
71-
"",
72-
f"{event.client}__scan_profile_increments",
73-
ooi.encode(),
74-
properties=pika.BasicProperties(
75-
delivery_mode=pika.DeliveryMode.Persistent,
76-
),
77-
)
78-
79-
logger.info(
80-
"Published scan_profile_increment [primary_key=%s] [level=%s]",
81-
format_id_short(event.primary_key),
82-
event.new_data.level,
83-
)
84-
85-
# publish mutations
86-
mutation = ScanProfileMutation(
87-
operation=event.operation_type,
88-
primary_key=event.primary_key,
89-
)
57+
if not isinstance(event, ScanProfileDBEvent):
58+
return
9059

91-
if event.operation_type != OperationType.DELETE:
92-
mutation.value = AbstractOOI(
93-
primary_key=event.new_data.reference,
94-
object_type=event.new_data.reference.class_,
95-
scan_profile=event.new_data,
96-
)
60+
incremented = (event.operation_type == OperationType.CREATE and event.new_data.level > 0) or (
61+
event.operation_type == OperationType.UPDATE
62+
and event.old_data
63+
and event.new_data.level > event.old_data.level
64+
)
65+
66+
if incremented:
67+
ooi = json.dumps(
68+
{
69+
"primary_key": event.reference,
70+
"object_type": event.reference.class_,
71+
"scan_profile": event.new_data.dict(),
72+
}
73+
)
9774

9875
self.channel.basic_publish(
9976
"",
100-
f"{event.client}__scan_profile_mutations",
101-
mutation.json().encode(),
102-
properties=pika.BasicProperties(
103-
delivery_mode=pika.DeliveryMode.Persistent,
104-
),
77+
f"{event.client}__scan_profile_increments",
78+
ooi.encode(),
79+
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
10580
)
10681

107-
level = mutation.value.scan_profile.level if mutation.value != OperationType.DELETE else None
10882
logger.info(
109-
"Published scan profile mutation [operation_type=%s] [primary_key=%s] [level=%s]",
110-
mutation.operation,
83+
"Published scan_profile_increment [primary_key=%s] [level=%s]",
11184
format_id_short(event.primary_key),
112-
level,
85+
event.new_data.level,
86+
)
87+
88+
# publish mutations
89+
mutation = ScanProfileMutation(operation=event.operation_type, primary_key=event.primary_key)
90+
91+
if event.operation_type != OperationType.DELETE:
92+
mutation.value = AbstractOOI(
93+
primary_key=event.new_data.reference,
94+
object_type=event.new_data.reference.class_,
95+
scan_profile=event.new_data,
11396
)
97+
98+
self.channel.basic_publish(
99+
"",
100+
f"{event.client}__scan_profile_mutations",
101+
mutation.json().encode(),
102+
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
103+
)
104+
105+
level = mutation.value.scan_profile.level if mutation.value is not None else None
106+
logger.info(
107+
"Published scan profile mutation [operation_type=%s] [primary_key=%s] [level=%s]",
108+
mutation.operation,
109+
format_id_short(event.primary_key),
110+
level,
111+
)

octopoes/octopoes/repositories/scan_profile_repository.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def save(
108108
event = ScanProfileDBEvent(
109109
operation_type=OperationType.CREATE if old_scan_profile is None else OperationType.UPDATE,
110110
valid_time=valid_time,
111+
reference=new_scan_profile.reference,
111112
old_data=old_scan_profile,
112113
new_data=new_scan_profile,
113114
)
@@ -118,6 +119,7 @@ def delete(self, scan_profile: ScanProfileBase, valid_time: datetime) -> None:
118119

119120
event = ScanProfileDBEvent(
120121
operation_type=OperationType.DELETE,
122+
reference=scan_profile.reference,
121123
valid_time=valid_time,
122124
old_data=scan_profile,
123125
)

octopoes/tests/conftest.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@
44
from unittest.mock import Mock
55

66
import pytest
7+
from bits.runner import BitRunner
78

8-
from octopoes.models import OOI, EmptyScanProfile, Reference, ScanProfileBase
9+
from octopoes.api.api import app
10+
from octopoes.api.router import settings
11+
from octopoes.config.settings import Settings, XTDBType
12+
from octopoes.core.service import OctopoesService
13+
from octopoes.models import OOI, DeclaredScanProfile, EmptyScanProfile, Reference, ScanProfileBase
914
from octopoes.models.path import Direction, Path
1015
from octopoes.models.types import DNSZone, Hostname, IPAddressV4, Network, ResolvedHostname
1116
from octopoes.repositories.ooi_repository import OOIRepository
@@ -138,3 +143,38 @@ def resolved_hostname(hostname, ipaddressv4, ooi_repository, scan_profile_reposi
138143
scan_profile_repository,
139144
valid_time,
140145
)
146+
147+
148+
@pytest.fixture
149+
def empty_scan_profile():
150+
return EmptyScanProfile(reference="test_reference")
151+
152+
153+
@pytest.fixture
154+
def declared_scan_profile():
155+
return DeclaredScanProfile(reference="test_reference", level=2)
156+
157+
158+
@pytest.fixture
159+
def xtdbtype_multinode():
160+
def get_settings_override():
161+
return Settings(xtdb_type=XTDBType.XTDB_MULTINODE)
162+
163+
app.dependency_overrides[settings] = get_settings_override
164+
yield
165+
app.dependency_overrides = {}
166+
167+
168+
@pytest.fixture
169+
def app_settings():
170+
return Settings(xtdb_type=XTDBType.XTDB_MULTINODE)
171+
172+
173+
@pytest.fixture
174+
def octopoes_service() -> OctopoesService:
175+
return OctopoesService(Mock(), Mock(), Mock(), Mock())
176+
177+
178+
@pytest.fixture
179+
def bit_runner(mocker) -> BitRunner:
180+
return mocker.patch("octopoes.core.service.BitRunner")
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import uuid
2+
from datetime import datetime
3+
4+
import pika
5+
6+
from octopoes.events.events import OOIDBEvent, OperationType, ScanProfileDBEvent
7+
from octopoes.events.manager import EventManager
8+
9+
10+
def test_event_manager_create_ooi(mocker, network):
11+
celery_mock = mocker.Mock()
12+
channel_mock = mocker.Mock()
13+
14+
mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
15+
manager = EventManager("test", celery_mock, "queue", channel_mock)
16+
event = OOIDBEvent(operation_type=OperationType.CREATE, valid_time=datetime(2023, 1, 1), new_data=network)
17+
manager.publish(event)
18+
19+
celery_mock.send_task.assert_called_once_with(
20+
"octopoes.tasks.tasks.handle_event",
21+
(
22+
{
23+
"entity_type": "ooi",
24+
"operation_type": "create",
25+
"valid_time": "2023-01-01T00:00:00",
26+
"client": "test",
27+
"old_data": None,
28+
"new_data": {
29+
"object_type": "Network",
30+
"scan_profile": None,
31+
"primary_key": "Network|internet",
32+
"name": "internet",
33+
},
34+
},
35+
),
36+
queue="queue",
37+
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
38+
)
39+
40+
channel_mock.basic_publish.assert_not_called()
41+
42+
43+
def test_event_manager_create_empty_scan_profile(mocker, empty_scan_profile):
44+
celery_mock = mocker.Mock()
45+
channel_mock = mocker.Mock()
46+
47+
mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
48+
manager = EventManager("test", celery_mock, "queue", channel_mock)
49+
event = ScanProfileDBEvent(
50+
operation_type=OperationType.CREATE,
51+
valid_time=datetime(2023, 1, 1),
52+
new_data=empty_scan_profile,
53+
reference="test_reference",
54+
)
55+
manager.publish(event)
56+
57+
celery_mock.send_task.assert_called_once_with(
58+
"octopoes.tasks.tasks.handle_event",
59+
(
60+
{
61+
"entity_type": "scan_profile",
62+
"operation_type": "create",
63+
"valid_time": "2023-01-01T00:00:00",
64+
"client": "test",
65+
"old_data": None,
66+
"new_data": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0},
67+
"reference": "test_reference",
68+
},
69+
),
70+
queue="queue",
71+
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
72+
)
73+
74+
channel_mock.basic_publish.assert_called_once_with(
75+
"",
76+
"test__scan_profile_mutations",
77+
b'{"operation": "create", "primary_key": "test_reference", '
78+
b'"value": {"primary_key": "test_reference", '
79+
b'"object_type": "test_reference", '
80+
b'"scan_profile": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0}}}',
81+
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
82+
)
83+
84+
85+
def test_event_manager_create_declared_scan_profile(mocker, declared_scan_profile):
86+
celery_mock = mocker.Mock()
87+
channel_mock = mocker.Mock()
88+
89+
mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
90+
manager = EventManager("test", celery_mock, "queue", channel_mock)
91+
event = ScanProfileDBEvent(
92+
operation_type=OperationType.CREATE,
93+
valid_time=datetime(2023, 1, 1),
94+
new_data=declared_scan_profile,
95+
reference="test_reference",
96+
)
97+
manager.publish(event)
98+
99+
celery_mock.send_task.assert_called_once_with(
100+
"octopoes.tasks.tasks.handle_event",
101+
(
102+
{
103+
"entity_type": "scan_profile",
104+
"operation_type": "create",
105+
"valid_time": "2023-01-01T00:00:00",
106+
"client": "test",
107+
"old_data": None,
108+
"new_data": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2},
109+
"reference": "test_reference",
110+
},
111+
),
112+
queue="queue",
113+
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
114+
)
115+
116+
assert channel_mock.basic_publish.call_count == 2
117+
channel_mock.basic_publish.asset_has_calls(
118+
mocker.call(
119+
"",
120+
"test__scan_profile_increments",
121+
b'{"primary_key": "test_reference", "object_type": "test_reference",'
122+
b'"scan_profile": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2}}',
123+
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
124+
),
125+
mocker.call(
126+
"",
127+
"test__scan_profile_mutations",
128+
b'{"operation": "create", "primary_key": "test_reference", '
129+
b'"value": {"primary_key": "test_reference", '
130+
b'"object_type": "test_reference", '
131+
b'"scan_profile": {"scan_profile_type": "declared", "reference": "test_reference", "level": 2}}}',
132+
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
133+
),
134+
)
135+
136+
137+
def test_event_manager_delete_empty_scan_profile(mocker, empty_scan_profile):
138+
celery_mock = mocker.Mock()
139+
channel_mock = mocker.Mock()
140+
141+
mocker.patch.object(uuid, "uuid4", return_value="1754a4c8-f0b8-42c8-b294-5706ce23a47d")
142+
manager = EventManager("test", celery_mock, "queue", channel_mock)
143+
event = ScanProfileDBEvent(
144+
operation_type=OperationType.DELETE,
145+
valid_time=datetime(2023, 1, 1),
146+
old_data=empty_scan_profile,
147+
reference="test_reference",
148+
)
149+
manager.publish(event)
150+
151+
celery_mock.send_task.assert_called_once_with(
152+
"octopoes.tasks.tasks.handle_event",
153+
(
154+
{
155+
"entity_type": "scan_profile",
156+
"operation_type": "delete",
157+
"valid_time": "2023-01-01T00:00:00",
158+
"client": "test",
159+
"old_data": {"scan_profile_type": "empty", "reference": "test_reference", "level": 0},
160+
"new_data": None,
161+
"reference": "test_reference",
162+
},
163+
),
164+
queue="queue",
165+
task_id="1754a4c8-f0b8-42c8-b294-5706ce23a47d",
166+
)
167+
168+
channel_mock.basic_publish.assert_called_once_with(
169+
"",
170+
"test__scan_profile_mutations",
171+
b'{"operation": "delete", "primary_key": "test_reference", "value": null}',
172+
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
173+
)

0 commit comments

Comments
 (0)