Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 48 additions & 21 deletions backend/data_wizard/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.parsers import FileUploadParser

from ebios_rm.models import ElementaryAction
from .serializers import LoadFileSerializer
from core.models import (
Folder,
Perimeter,
RequirementAssessment,
Framework,
RequirementNode,
RiskAssessment,
RiskScenario,
RiskMatrix,
AppliedControl,
ReferenceControl,
Threat,
Asset,
)
from core.serializers import (
AssetWriteSerializer,
Expand All @@ -35,7 +33,7 @@
FolderWriteSerializer,
)
from ebios_rm.serializers import ElementaryActionWriteSerializer
from tprm.models import Entity, Solution, Contract
from tprm.models import Entity
from tprm.serializers import (
EntityWriteSerializer,
SolutionWriteSerializer,
Expand Down Expand Up @@ -180,23 +178,26 @@ def process_data(

def _process_users(self, request, records):
results = {"successful": 0, "failed": 0, "errors": []}

for record in records:
# check if the email is available
if not record.get("email"):
user_email = record.get("email")
if not user_email:
results["failed"] += 1
results["errors"].append(
{"record": record, "error": "email field is mandatory"}
)
continue
# Prepare data for serializer
user_data = {
"email": record.get("email"), # Email is mandatory
"email": user_email, # Email is mandatory
"first_name": record.get("first_name"),
"last_name": record.get("last_name"),
}
# Use the serializer for validation and saving
current_instance = User.objects.filter(email=user_email).first()
serializer = UserWriteSerializer(
data=user_data, context={"request": request}
instance=current_instance, data=user_data, context={"request": request}
)
try:
if serializer.is_valid(raise_exception=True):
Expand Down Expand Up @@ -226,7 +227,8 @@ def _process_assets(self, request, records, folders_map, folder_id):
if record.get("domain") != "":
domain = folders_map.get(str(record.get("domain")).lower(), folder_id)
# Check if name is provided as it's mandatory
if not record.get("name"):
asset_name = record.get("name")
if not asset_name:
results["failed"] += 1
results["errors"].append(
{"record": record, "error": "Name field is mandatory"}
Expand All @@ -236,14 +238,19 @@ def _process_assets(self, request, records, folders_map, folder_id):
# Prepare data for serializer
asset_data = {
"ref_id": record.get("ref_id", ""),
"name": record.get("name"), # Name is mandatory
"name": asset_name, # Name is mandatory
"type": record.get("type", "SP"),
"folder": domain,
"description": record.get("description", ""),
}

current_instance = Asset.objects.filter(
name=asset_name, folder=domain
).first()

# Use the serializer for validation and saving
serializer = AssetWriteSerializer(
data=asset_data, context={"request": request}
instance=current_instance, data=asset_data, context={"request": request}
)
try:
if serializer.is_valid(raise_exception=True):
Expand Down Expand Up @@ -280,7 +287,8 @@ def _process_applied_controls(self, request, records, folders_map, folder_id):
priority = None

# Check if name is provided as it's mandatory
if not record.get("name"):
applied_control_name = record.get("name")
if not applied_control_name:
results["failed"] += 1
results["errors"].append(
{"record": record, "error": "Name field is mandatory"}
Expand All @@ -290,7 +298,7 @@ def _process_applied_controls(self, request, records, folders_map, folder_id):
# Prepare data for serializer
control_data = {
"ref_id": record.get("ref_id", ""),
"name": record.get("name"), # Name is mandatory
"name": applied_control_name, # Name is mandatory
"description": record.get("description", ""),
"category": record.get("category", ""),
"folder": domain,
Expand All @@ -299,9 +307,15 @@ def _process_applied_controls(self, request, records, folders_map, folder_id):
"csf_function": record.get("csf_function", "govern"),
}

current_instance = AppliedControl.objects.filter(
name=applied_control_name, folder=domain
).first()

# Use the serializer for validation and saving
serializer = AppliedControlWriteSerializer(
data=control_data, context={"request": request}
instance=current_instance,
data=control_data,
context={"request": request},
)
try:
if serializer.is_valid(raise_exception=True):
Expand Down Expand Up @@ -334,7 +348,8 @@ def _process_perimeters(self, request, records, folders_map, folder_id):
if record.get("domain") != "":
domain = folders_map.get(str(record.get("domain")).lower(), folder_id)
# Check if name is provided as it's mandatory
if not record.get("name"):
perimeter_name = record.get("name")
if not perimeter_name:
results["failed"] += 1
results["errors"].append(
{"record": record, "error": "Name field is mandatory"}
Expand All @@ -344,14 +359,19 @@ def _process_perimeters(self, request, records, folders_map, folder_id):
# Prepare data for serializer
perimeter_data = {
"ref_id": record.get("ref_id", ""),
"name": record.get("name"), # Name is mandatory
"name": perimeter_name, # Name is mandatory
"folder": domain,
"description": record.get("description", ""),
"status": record.get("status"),
}
# Use the serializer for validation and saving
current_instance = Perimeter.objects.filter(
name=perimeter_name, folder=domain
).first()
serializer = PerimeterWriteSerializer(
data=perimeter_data, context={"request": request}
instance=current_instance,
data=perimeter_data,
context={"request": request},
)
try:
if serializer.is_valid(raise_exception=True):
Expand Down Expand Up @@ -436,7 +456,8 @@ def _process_elementary_actions(self, request, records, folders_map, folder_id):
domain = folders_map.get(str(record.get("domain")).lower(), folder_id)

# Check if name is provided as it's mandatory
if not record.get("name"):
elementary_action_name = record.get("name")
if not elementary_action_name:
results["failed"] += 1
results["errors"].append(
{"record": record, "error": "Name field is mandatory"}
Expand All @@ -458,7 +479,7 @@ def _process_elementary_actions(self, request, records, folders_map, folder_id):
# Prepare data for serializer
elementary_action_data = {
"ref_id": record.get("ref_id", ""),
"name": record.get("name"), # Name is mandatory
"name": elementary_action_name, # Name is mandatory
"description": record.get("description", ""),
"folder": domain,
"attack_stage": attack_stage,
Expand All @@ -468,9 +489,15 @@ def _process_elementary_actions(self, request, records, folders_map, folder_id):
if icon:
elementary_action_data["icon"] = icon

current_instance = ElementaryAction.objects.filter(
name=elementary_action_name, folder=domain
).first()

# Use the serializer for validation and saving
serializer = ElementaryActionWriteSerializer(
data=elementary_action_data, context={"request": request}
instance=current_instance,
data=elementary_action_data,
context={"request": request},
)
try:
if serializer.is_valid(raise_exception=True):
Expand Down
Loading