Skip to content

Commit 14e8b9c

Browse files
committed
fix(security): remove shell usage from user password setup
Replace shell=True password generation and usermod calls with argv-based subprocess calls, generate temporary passwords with Python secrets, and enable Ruff S602 as a static security gate.
1 parent de80084 commit 14e8b9c

3 files changed

Lines changed: 104 additions & 15 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ include = ["api/**/*.py", "utils/**/*.py", "tests/**/*.py", "scripts/**/*.py"]
5858
[tool.ruff.lint]
5959
# Start with fatal correctness checks that are feasible on the existing tree.
6060
# Expand this set as legacy style/import/typing issues are cleaned up.
61-
select = ["E9", "F63", "F7", "F82", "S113"]
61+
select = ["E9", "F63", "F7", "F82", "S113", "S602"]
6262

6363
[tool.ruff.lint.per-file-ignores]
6464
"tests/**/*.py" = ["E402"]

tests/test_user_management.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import importlib
2+
import sys
3+
import types
4+
import unittest
5+
6+
7+
def _install_stubs():
8+
config_loader = types.ModuleType("utils.config_loader")
9+
config_loader.CONFIG_MANAGER = types.SimpleNamespace(
10+
get=lambda key, default=None: {"puid": 1000, "pgid": 1000}.get(key, default)
11+
)
12+
sys.modules["utils.config_loader"] = config_loader
13+
14+
global_logger = types.ModuleType("utils.global_logger")
15+
global_logger.logger = types.SimpleNamespace(
16+
debug=lambda *args, **kwargs: None,
17+
error=lambda *args, **kwargs: None,
18+
info=lambda *args, **kwargs: None,
19+
warning=lambda *args, **kwargs: None,
20+
)
21+
sys.modules["utils.global_logger"] = global_logger
22+
23+
24+
_install_stubs()
25+
sys.modules.pop("utils.user_management", None)
26+
user_management = importlib.import_module("utils.user_management")
27+
28+
29+
class UserManagementSecurityTests(unittest.TestCase):
30+
def test_hash_user_password_uses_stdin_without_shell(self):
31+
calls = []
32+
33+
def fake_run(*args, **kwargs):
34+
calls.append((args, kwargs))
35+
return types.SimpleNamespace(stdout="$6$hashed\n")
36+
37+
original_run = user_management.subprocess.run
38+
user_management.subprocess.run = fake_run
39+
try:
40+
hashed = user_management._hash_user_password("raw-password")
41+
finally:
42+
user_management.subprocess.run = original_run
43+
44+
self.assertEqual(hashed, "$6$hashed")
45+
self.assertEqual(calls[0][0][0], ["openssl", "passwd", "-6", "-stdin"])
46+
self.assertEqual(calls[0][1]["input"], "raw-password")
47+
self.assertTrue(calls[0][1]["capture_output"])
48+
self.assertTrue(calls[0][1]["text"])
49+
self.assertTrue(calls[0][1]["check"])
50+
self.assertNotIn("shell", calls[0][1])
51+
52+
def test_set_user_password_uses_argument_list_without_shell(self):
53+
calls = []
54+
55+
def fake_run(*args, **kwargs):
56+
calls.append((args, kwargs))
57+
return types.SimpleNamespace(returncode=0)
58+
59+
original_run = user_management.subprocess.run
60+
user_management.subprocess.run = fake_run
61+
try:
62+
user_management._set_user_password("dumb", "$6$hashed")
63+
finally:
64+
user_management.subprocess.run = original_run
65+
66+
self.assertEqual(calls[0][0][0], ["usermod", "-p", "$6$hashed", "dumb"])
67+
self.assertTrue(calls[0][1]["check"])
68+
self.assertNotIn("shell", calls[0][1])
69+
70+
def test_generate_user_password_returns_nonempty_random_string(self):
71+
first = user_management._generate_user_password()
72+
second = user_management._generate_user_password()
73+
74+
self.assertIsInstance(first, str)
75+
self.assertGreaterEqual(len(first), 16)
76+
self.assertNotEqual(first, second)
77+
78+
79+
if __name__ == "__main__":
80+
unittest.main()

utils/user_management.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,31 @@
11
from utils.config_loader import CONFIG_MANAGER as config
22
from utils.global_logger import logger
33
from concurrent.futures import ThreadPoolExecutor
4-
import multiprocessing, os, time, grp, pwd, subprocess, shutil
4+
import multiprocessing, os, time, grp, pwd, subprocess, shutil, secrets
55

66
user_id = config.get("puid")
77
group_id = config.get("pgid")
88

99

10+
def _generate_user_password():
11+
return secrets.token_urlsafe(18)
12+
13+
14+
def _hash_user_password(user_password):
15+
result = subprocess.run(
16+
["openssl", "passwd", "-6", "-stdin"],
17+
input=user_password,
18+
capture_output=True,
19+
text=True,
20+
check=True,
21+
)
22+
return result.stdout.strip()
23+
24+
25+
def _set_user_password(username, hashed_password):
26+
subprocess.run(["usermod", "-p", hashed_password, username], check=True)
27+
28+
1029
def chown_single(path, user_id, group_id):
1130
try:
1231
stat_info = os.stat(path)
@@ -244,19 +263,9 @@ def create_system_user(username="DUMB"):
244263
f"Writing to /etc/passwd took {passwd_write_end - passwd_write_start:.2f} seconds"
245264
)
246265

247-
user_password = (
248-
subprocess.check_output("openssl rand -base64 12", shell=True)
249-
.decode()
250-
.strip()
251-
)
252-
hashed_password = (
253-
subprocess.check_output(f"openssl passwd -6 {user_password}", shell=True)
254-
.decode()
255-
.strip()
256-
)
257-
subprocess.run(
258-
f"usermod -p '{hashed_password}' {username}", shell=True, check=True
259-
)
266+
user_password = _generate_user_password()
267+
hashed_password = _hash_user_password(user_password)
268+
_set_user_password(username, hashed_password)
260269
logger.info(f"Password set for user '{username}'. Stored securely in memory.")
261270

262271
zurg_dir = "/zurg"

0 commit comments

Comments
 (0)