Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions api_app/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,16 @@ def __str__(self):
return f"{self.__class__.__name__}"

def config(self, runtime_configuration: typing.Dict):
"""
Configure the plugin with runtime parameters.

Args:
runtime_configuration (dict): Runtime configuration parameters.
"""
self.__parameters = self._config.read_configured_params(self._user, runtime_configuration)
for parameter in self.__parameters:
attribute_name = f"_{parameter.name}" if parameter.is_secret else parameter.name
setattr(self, attribute_name, parameter.value)
value = parameter.value
# decrypt secrets that were stored encrypted
if parameter.is_secret and isinstance(value, str) and value.startswith("gAAAAA"):
from api_app.models import PluginConfig

value = PluginConfig._decrypt_value(value)
setattr(self, attribute_name, value)
logger.debug(
f"Adding to {self.__class__.__name__} param {attribute_name} with value {parameter.value} "
)
Expand Down Expand Up @@ -349,6 +349,14 @@ def _get_health_check_url(self, user: User = None) -> typing.Optional[str]:
if not param.configured or not param.value:
continue
url = param.value
# Decrypt if the value is Fernet-encrypted (secret parameter)
if isinstance(url, str) and url.startswith("gAAAAA"):
try:
from api_app.models import PluginConfig

url = PluginConfig._decrypt_value(url)
except Exception:
pass
logger.info(f"Url retrieved to verify is {param.name} for {self}")
return url
if hasattr(self, "url") and self.url:
Expand Down
65 changes: 65 additions & 0 deletions api_app/migrations/0073_encrypt_plugin_config_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

import base64
import hashlib
import json

from django.conf import settings
from django.db import migrations


def _get_fernet():
from cryptography.fernet import Fernet

key = getattr(settings, "PLUGIN_CONFIG_FERNET_KEY", None)
if key is None:
key = base64.urlsafe_b64encode(
hashlib.sha256(settings.SECRET_KEY.encode()).digest()
)
return Fernet(key)


def encrypt_existing_secrets(apps, schema_editor):
PluginConfig = apps.get_model("api_app", "PluginConfig")
fernet = _get_fernet()

for pc in PluginConfig.objects.filter(
parameter__is_secret=True,
value__isnull=False,
):
if isinstance(pc.value, str) and pc.value.startswith("gAAAAA"):
continue
encrypted = fernet.encrypt(json.dumps(pc.value).encode()).decode()
PluginConfig.objects.filter(pk=pc.pk).update(value=encrypted)


def decrypt_existing_secrets(apps, schema_editor):
PluginConfig = apps.get_model("api_app", "PluginConfig")
fernet = _get_fernet()

for pc in PluginConfig.objects.filter(
parameter__is_secret=True,
value__isnull=False,
):
if not (isinstance(pc.value, str) and pc.value.startswith("gAAAAA")):
continue
try:
decrypted = json.loads(fernet.decrypt(pc.value.encode()).decode())
PluginConfig.objects.filter(pk=pc.pk).update(value=decrypted)
except Exception:
pass


class Migration(migrations.Migration):

dependencies = [
("api_app", "0072_update_check_system"),
]

operations = [
migrations.RunPython(
encrypt_existing_secrets,
reverse_code=decrypt_existing_secrets,
),
]
25 changes: 25 additions & 0 deletions api_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,31 @@ def is_secret(self):
"""Returns whether the parameter is marked as secret."""
return self.parameter.is_secret

@staticmethod
def _encrypt_value(value):
"""Fernet-encrypt a value (serialized as JSON)."""
from cryptography.fernet import Fernet

f = Fernet(settings.PLUGIN_CONFIG_FERNET_KEY)
return f.encrypt(json.dumps(value).encode()).decode()

@staticmethod
def _decrypt_value(encrypted_value):
"""Fernet-decrypt a value back to its original Python object."""
from cryptography.fernet import Fernet

f = Fernet(settings.PLUGIN_CONFIG_FERNET_KEY)
return json.loads(f.decrypt(encrypted_value.encode()).decode())

def save(self, *args, **kwargs):
if (
self.is_secret()
and self.value is not None
and not (isinstance(self.value, str) and self.value.startswith("gAAAAA"))
):
self.value = self._encrypt_value(self.value)
super().save(*args, **kwargs)

@property
def plugin_name(self):
"""Returns the name of the plugin associated with this configuration."""
Expand Down
8 changes: 8 additions & 0 deletions intel_owl/settings/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# See the file 'LICENSE' for copying permission.

# Security Stuff
import base64
import hashlib

from django.core.management.utils import get_random_secret_key

from ._util import get_secret
Expand All @@ -26,6 +29,11 @@
CSRF_TRUSTED_ORIGINS = [f"{WEB_CLIENT_URL}:80/"]
ALLOWED_HOSTS = ["*"]

# Fernet key for encrypting plugin secrets at rest.
# Falls back to SECRET_KEY if PLUGIN_CONFIG_SECRET_KEY is not set.
_raw_secret = get_secret("PLUGIN_CONFIG_SECRET_KEY", SECRET_KEY)
PLUGIN_CONFIG_FERNET_KEY = base64.urlsafe_b64encode(hashlib.sha256(_raw_secret.encode()).digest())

# https://docs.djangoproject.com/en/4.2/ref/settings/#data-upload-max-memory-size
DATA_UPLOAD_MAX_MEMORY_SIZE = 100 * (10**6)
FILE_UPLOAD_MAX_MEMORY_SIZE = 100 * (10**6)
1 change: 1 addition & 0 deletions requirements/project-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,4 @@ DeepDiff==8.6.1
lxml==6.0.2
Faker==36.1.0
beautifulsoup4==4.14.2
cryptography==46.0.5
102 changes: 102 additions & 0 deletions tests/api_app/test_plugin_config_encryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

from api_app.choices import PythonModuleBasePaths
from api_app.models import Parameter, PluginConfig, PythonModule
from api_app.visualizers_manager.models import VisualizerConfig
from tests import CustomTestCase


class PluginConfigEncryptionTestCase(CustomTestCase):
def setUp(self):
super().setUp()
self.vc, _ = VisualizerConfig.objects.get_or_create(
name="test_encryption",
description="test encryption",
python_module=PythonModule.objects.get(
base_path=PythonModuleBasePaths.Visualizer.value,
module="yara.Yara",
),
disabled=False,
)
self.secret_param = Parameter.objects.create(
python_module=self.vc.python_module,
name="test_api_key",
type="str",
is_secret=True,
required=False,
)
self.non_secret_param = Parameter.objects.create(
python_module=self.vc.python_module,
name="test_max_retries",
type="int",
is_secret=False,
required=False,
)

def tearDown(self):
self.secret_param.delete()
self.non_secret_param.delete()
self.vc.delete()
super().tearDown()

def test_secret_value_encrypted_on_save(self):
pc = PluginConfig.objects.create(
owner=self.user,
for_organization=False,
parameter=self.secret_param,
value="my_super_secret_api_key_12345",
visualizer_config=self.vc,
)
pc.refresh_from_db()
self.assertIsInstance(pc.value, str)
self.assertTrue(pc.value.startswith("gAAAAA"))
pc.delete()

def test_encrypt_decrypt_roundtrip(self):
original = "my_super_secret_api_key_12345"
encrypted = PluginConfig._encrypt_value(original)
self.assertTrue(encrypted.startswith("gAAAAA"))
self.assertEqual(PluginConfig._decrypt_value(encrypted), original)

def test_non_secret_value_unchanged(self):
pc = PluginConfig.objects.create(
owner=self.user,
for_organization=False,
parameter=self.non_secret_param,
value=10,
visualizer_config=self.vc,
)
pc.refresh_from_db()
self.assertEqual(pc.value, 10)
pc.delete()

def test_no_double_encryption(self):
pc = PluginConfig.objects.create(
owner=self.user,
for_organization=False,
parameter=self.secret_param,
value="test_secret_value",
visualizer_config=self.vc,
)
pc.refresh_from_db()
first_encrypted = pc.value

# saving again should not re-encrypt
pc.save()
pc.refresh_from_db()
self.assertEqual(pc.value, first_encrypted)
self.assertEqual(PluginConfig._decrypt_value(pc.value), "test_secret_value")
pc.delete()

def test_encrypt_decrypt_dict(self):
original = {"key": "value", "nested": {"a": 1}}
encrypted = PluginConfig._encrypt_value(original)
self.assertTrue(encrypted.startswith("gAAAAA"))
self.assertEqual(PluginConfig._decrypt_value(encrypted), original)

def test_encrypt_decrypt_list(self):
original = ["secret1", "secret2", "secret3"]
encrypted = PluginConfig._encrypt_value(original)
self.assertTrue(encrypted.startswith("gAAAAA"))
self.assertEqual(PluginConfig._decrypt_value(encrypted), original)
Loading
Loading