Skip to content

Commit 966e067

Browse files
committed
Add security and validation pytest suite for response plugin
Add tests/test_response_security.py with: - Elasticat.py security tests (timeout on requests, no verify=False) - Response abilities YAML validation (parseable, required fields, unique IDs) - hook.py module loading and attribute verification
1 parent a32550b commit 966e067

File tree

1 file changed

+144
-0
lines changed

1 file changed

+144
-0
lines changed

tests/test_response_security.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import ast
2+
import glob
3+
import os
4+
import re
5+
6+
import pytest
7+
import yaml
8+
9+
10+
PLUGIN_DIR = os.path.join(os.path.dirname(__file__), '..')
11+
ABILITIES_DIR = os.path.join(PLUGIN_DIR, 'data', 'abilities')
12+
PAYLOADS_DIR = os.path.join(PLUGIN_DIR, 'payloads')
13+
14+
REQUIRED_ABILITY_FIELDS = {'id', 'name', 'tactic', 'technique'}
15+
16+
17+
class TestElasticatSecurity:
18+
"""Tests that elasticat.py has timeout on all requests calls."""
19+
20+
def _get_source(self):
21+
path = os.path.join(PAYLOADS_DIR, 'elasticat.py')
22+
with open(path) as f:
23+
return f.read()
24+
25+
def test_elasticat_exists(self):
26+
path = os.path.join(PAYLOADS_DIR, 'elasticat.py')
27+
assert os.path.isfile(path), 'elasticat.py payload not found'
28+
29+
def test_elasticat_is_valid_python(self):
30+
source = self._get_source()
31+
try:
32+
ast.parse(source)
33+
except SyntaxError as e:
34+
pytest.fail(f'elasticat.py has syntax error: {e}')
35+
36+
def test_elasticat_requests_have_timeout(self):
37+
source = self._get_source()
38+
tree = ast.parse(source)
39+
requests_methods = {'get', 'post', 'put', 'delete', 'patch', 'head'}
40+
missing = []
41+
for node in ast.walk(tree):
42+
if isinstance(node, ast.Call):
43+
func = node.func
44+
is_requests_call = False
45+
if isinstance(func, ast.Attribute) and func.attr in requests_methods:
46+
if isinstance(func.value, ast.Name) and func.value.id == 'requests':
47+
is_requests_call = True
48+
if is_requests_call:
49+
keyword_names = [kw.arg for kw in node.keywords]
50+
if 'timeout' not in keyword_names:
51+
line = getattr(node, 'lineno', '?')
52+
missing.append(f'line {line}: requests.{func.attr}()')
53+
if missing:
54+
pytest.fail(
55+
f'elasticat.py has requests calls without timeout: {"; ".join(missing)}'
56+
)
57+
58+
def test_elasticat_no_verify_false(self):
59+
source = self._get_source()
60+
matches = re.findall(r'verify\s*=\s*False', source)
61+
if matches:
62+
pytest.fail(
63+
f'elasticat.py uses verify=False ({len(matches)} occurrence(s)). '
64+
'SSL verification should not be disabled.'
65+
)
66+
67+
68+
class TestResponseAbilitiesYAML:
69+
"""Tests that response abilities YAML files are valid."""
70+
71+
@staticmethod
72+
def _collect_yaml_files():
73+
pattern = os.path.join(ABILITIES_DIR, '**', '*.yml')
74+
return glob.glob(pattern, recursive=True)
75+
76+
def test_abilities_directory_exists(self):
77+
assert os.path.isdir(ABILITIES_DIR), 'abilities directory not found'
78+
79+
def test_at_least_one_ability_exists(self):
80+
files = self._collect_yaml_files()
81+
assert len(files) > 0, 'No ability YAML files found'
82+
83+
def test_all_abilities_are_parseable(self):
84+
for yml_file in self._collect_yaml_files():
85+
with open(yml_file) as f:
86+
try:
87+
data = yaml.safe_load(f)
88+
except yaml.YAMLError as e:
89+
pytest.fail(f'Failed to parse {yml_file}: {e}')
90+
assert data is not None, f'{yml_file} is empty'
91+
92+
def test_all_abilities_have_required_fields(self):
93+
for yml_file in self._collect_yaml_files():
94+
with open(yml_file) as f:
95+
data = yaml.safe_load(f)
96+
if not isinstance(data, list):
97+
data = [data]
98+
for ability in data:
99+
for field in REQUIRED_ABILITY_FIELDS:
100+
assert field in ability, (
101+
f'{yml_file}: ability missing required field "{field}"'
102+
)
103+
104+
def test_ability_ids_are_unique(self):
105+
seen_ids = {}
106+
for yml_file in self._collect_yaml_files():
107+
with open(yml_file) as f:
108+
data = yaml.safe_load(f)
109+
if not isinstance(data, list):
110+
data = [data]
111+
for ability in data:
112+
aid = ability.get('id')
113+
if aid in seen_ids:
114+
pytest.fail(
115+
f'Duplicate ability id {aid} in {yml_file} and {seen_ids[aid]}'
116+
)
117+
seen_ids[aid] = yml_file
118+
119+
120+
class TestResponseHook:
121+
"""Tests that hook.py loads correctly."""
122+
123+
def test_hook_module_loads(self):
124+
hook_path = os.path.join(PLUGIN_DIR, 'hook.py')
125+
assert os.path.isfile(hook_path), 'hook.py not found'
126+
tree = ast.parse(open(hook_path).read())
127+
top_level_names = [
128+
node.name
129+
for node in ast.walk(tree)
130+
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef))
131+
]
132+
assert 'enable' in top_level_names, 'hook.py must define an enable() function'
133+
134+
def test_hook_has_name_and_description(self):
135+
hook_path = os.path.join(PLUGIN_DIR, 'hook.py')
136+
source = open(hook_path).read()
137+
tree = ast.parse(source)
138+
assigned_names = [
139+
node.targets[0].id
140+
for node in ast.walk(tree)
141+
if isinstance(node, ast.Assign) and isinstance(node.targets[0], ast.Name)
142+
]
143+
assert 'name' in assigned_names, 'hook.py should assign a name variable'
144+
assert 'description' in assigned_names, 'hook.py should assign a description variable'

0 commit comments

Comments
 (0)