Skip to content

Commit 56bed7b

Browse files
hyangminjHyangMin Jeongclaude
authored
Fix security vulnerabilities identified in PR review (#5)
* Fix security vulnerabilities identified in PR review Security improvements: - Add restrictive file permissions (0o600) for keyinfo.json to prevent unauthorized read access - Add path traversal protection in email_sender to block attacks like ../../../etc/passwd - Add race condition protection in door_controller cleanup to prevent double cleanup issues from signal handlers - Add output directory validation in key_generator to handle missing directories gracefully - Add scanner process monitoring in subscriber to detect and log crashes Test coverage: - Add tests for directory creation in key_generator - Add tests for path traversal prevention in email_sender - Add tests for cleanup race condition protection in door_controller - Add tests for file permission setting in subscriber - Add tests for process monitoring feature in subscriber Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Fix test cases to work with path traversal protection Update test_send_key_success and test_send_key_auth_failure to create temporary files in current working directory instead of system temp directory to comply with path traversal security check. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: HyangMin Jeong <hyangmin@Studio.local> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent c5e5e83 commit 56bed7b

File tree

6 files changed

+380
-5
lines changed

6 files changed

+380
-5
lines changed

hostpart/email_sender.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,21 @@ def send_key(
9696
if not os.path.isfile(key_path):
9797
raise EmailSendError(f"Key file not found: {key_path}")
9898

99+
# Security: Validate path to prevent path traversal attacks
100+
# 보안: 경로 탐색 공격을 방지하기 위한 경로 검증
101+
abs_key_path = os.path.abspath(key_path)
102+
# Ensure the file is within the current working directory or explicitly allowed paths
103+
# 파일이 현재 작업 디렉토리 또는 명시적으로 허용된 경로 내에 있는지 확인
104+
cwd = os.path.abspath(os.getcwd())
105+
if not abs_key_path.startswith(cwd + os.sep) and not abs_key_path == cwd:
106+
# Check if it's a direct file in cwd
107+
if os.path.dirname(abs_key_path) != cwd:
108+
self.logger.warning(f"Potential path traversal attempt: {key_path}")
109+
raise EmailSendError(
110+
f"Invalid key path: file must be within the working directory. "
111+
f"잘못된 키 경로: 파일은 작업 디렉토리 내에 있어야 합니다."
112+
)
113+
99114
subject = subject or self.config.subject
100115
body = body or self.config.body
101116

hostpart/key_generator.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,22 @@ def create_qr_image(
196196
197197
Returns:
198198
str: Path to generated QR code image / 생성된 QR 코드 이미지 경로
199+
200+
Raises:
201+
ValueError: If output_dir is invalid / output_dir가 유효하지 않을 경우
199202
"""
203+
# Validate and create output directory if it doesn't exist
204+
# 출력 디렉토리가 존재하지 않으면 검증 및 생성
205+
if not output_dir:
206+
raise ValueError("output_dir cannot be empty / output_dir는 비워둘 수 없습니다")
207+
208+
if not os.path.isdir(output_dir):
209+
try:
210+
os.makedirs(output_dir, exist_ok=True)
211+
self.logger.info(f"Created output directory: {output_dir}")
212+
except OSError as e:
213+
raise ValueError(f"Failed to create output directory: {output_dir}. Error: {e}")
214+
200215
# Create QR code
201216
# QR 코드 생성
202217
qr_data = key.to_json(include_utc=include_utc)

hostpart/test_enhanced.py

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,57 @@ def test_create_key_returns_tuple(self):
208208
self.assertEqual(data['doorID'], 'test-door')
209209

210210

211+
class TestKeyGeneratorSecurity(unittest.TestCase):
212+
"""Test cases for KeyGenerator security improvements."""
213+
214+
def setUp(self):
215+
"""Set up test fixtures."""
216+
self.test_dir = tempfile.mkdtemp()
217+
self.original_dir = os.getcwd()
218+
os.chdir(self.test_dir)
219+
220+
def tearDown(self):
221+
"""Clean up test fixtures."""
222+
os.chdir(self.original_dir)
223+
shutil.rmtree(self.test_dir)
224+
225+
def test_create_qr_image_creates_missing_directory(self):
226+
"""Test that create_qr_image creates missing output directory."""
227+
config = KeyGeneratorConfig(door_id="test-door")
228+
generator = KeyGenerator(config)
229+
230+
key = generator.generate_key()
231+
nested_dir = os.path.join(self.test_dir, "nested", "output", "dir")
232+
233+
qr_path = generator.create_qr_image(key, output_dir=nested_dir)
234+
235+
self.assertTrue(os.path.exists(nested_dir))
236+
self.assertTrue(os.path.exists(qr_path))
237+
238+
def test_create_qr_image_empty_output_dir_raises_error(self):
239+
"""Test that empty output_dir raises ValueError."""
240+
config = KeyGeneratorConfig(door_id="test-door")
241+
generator = KeyGenerator(config)
242+
243+
key = generator.generate_key()
244+
245+
with self.assertRaises(ValueError) as context:
246+
generator.create_qr_image(key, output_dir="")
247+
248+
self.assertIn("empty", str(context.exception).lower())
249+
250+
def test_create_qr_image_invalid_path_raises_error(self):
251+
"""Test that invalid path raises ValueError."""
252+
config = KeyGeneratorConfig(door_id="test-door")
253+
generator = KeyGenerator(config)
254+
255+
key = generator.generate_key()
256+
257+
# Use a path that cannot be created (null byte in path)
258+
with self.assertRaises(ValueError):
259+
generator.create_qr_image(key, output_dir="/nonexistent\x00path")
260+
261+
211262
class TestEmailSender(unittest.TestCase):
212263
"""Test cases for EmailSender class."""
213264

@@ -243,7 +294,9 @@ def test_send_key_success(self, mock_smtp):
243294
)
244295
sender = EmailSender(config)
245296

246-
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as f:
297+
# Create file in current working directory to pass path traversal check
298+
# 경로 탐색 검사를 통과하기 위해 현재 작업 디렉토리에 파일 생성
299+
with tempfile.NamedTemporaryFile(delete=False, suffix='.png', dir=os.getcwd()) as f:
247300
test_file = f.name
248301
f.write(b'test image data')
249302

@@ -271,6 +324,55 @@ def test_send_key_missing_file(self):
271324
with self.assertRaises(EmailSendError):
272325
sender.send_key("nonexistent.png", "recipient@example.com")
273326

327+
def test_send_key_path_traversal_prevention(self):
328+
"""Test that path traversal attacks are blocked."""
329+
config = EmailConfig(
330+
sender_id="test",
331+
sender_password="password"
332+
)
333+
sender = EmailSender(config)
334+
335+
# Create a file outside cwd to simulate path traversal attack
336+
with tempfile.NamedTemporaryFile(delete=False, suffix='.png', dir='/tmp') as f:
337+
test_file = f.name
338+
f.write(b'test image data')
339+
340+
try:
341+
# This should raise EmailSendError due to path traversal protection
342+
with self.assertRaises(EmailSendError) as context:
343+
sender.send_key(test_file, "recipient@example.com")
344+
345+
self.assertIn("working directory", str(context.exception).lower())
346+
347+
finally:
348+
if os.path.exists(test_file):
349+
os.remove(test_file)
350+
351+
@patch('email_sender.smtplib.SMTP')
352+
def test_send_key_within_cwd_succeeds(self, mock_smtp):
353+
"""Test that files within current working directory are allowed."""
354+
mock_server = MagicMock()
355+
mock_smtp.return_value = mock_server
356+
357+
config = EmailConfig(
358+
sender_id="test",
359+
sender_password="password"
360+
)
361+
sender = EmailSender(config)
362+
363+
# Create a file in current working directory
364+
with tempfile.NamedTemporaryFile(delete=False, suffix='.png', dir=os.getcwd()) as f:
365+
test_file = f.name
366+
f.write(b'test image data')
367+
368+
try:
369+
result = sender.send_key(test_file, "recipient@example.com")
370+
self.assertTrue(result)
371+
372+
finally:
373+
if os.path.exists(test_file):
374+
os.remove(test_file)
375+
274376
@patch('email_sender.smtplib.SMTP')
275377
def test_send_key_auth_failure(self, mock_smtp):
276378
"""Test handling of authentication failure."""
@@ -288,7 +390,9 @@ def test_send_key_auth_failure(self, mock_smtp):
288390
)
289391
sender = EmailSender(config)
290392

291-
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as f:
393+
# Create file in current working directory to pass path traversal check
394+
# 경로 탐색 검사를 통과하기 위해 현재 작업 디렉토리에 파일 생성
395+
with tempfile.NamedTemporaryFile(delete=False, suffix='.png', dir=os.getcwd()) as f:
292396
test_file = f.name
293397
f.write(b'test data')
294398

raspart/door_controller.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def __init__(
8787
# 속도 제한 상태
8888
self._last_unlock_time: Optional[datetime] = None
8989
self._initialized = False
90+
self._cleanup_done = False # Flag to prevent double cleanup / 중복 정리 방지 플래그
9091

9192
# Initialize GPIO
9293
# GPIO 초기화
@@ -211,7 +212,18 @@ def cleanup(self) -> None:
211212
"""
212213
Clean up GPIO resources.
213214
GPIO 리소스를 정리합니다.
215+
216+
Thread-safe cleanup that prevents race conditions when called
217+
from signal handlers.
218+
신호 처리기에서 호출될 때 경쟁 조건을 방지하는 스레드 안전 정리입니다.
214219
"""
220+
# Prevent double cleanup (race condition protection)
221+
# 중복 정리 방지 (경쟁 조건 보호)
222+
if self._cleanup_done:
223+
return
224+
225+
self._cleanup_done = True
226+
215227
if not self._initialized:
216228
return
217229

raspart/subscriber.py

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import subprocess
1313
import sys
1414
import logging
15+
import threading
16+
import time
1517
from typing import Optional, Callable
1618
from datetime import datetime
1719
from dataclasses import dataclass
@@ -21,6 +23,12 @@
2123
from constants import DEFAULT_KEY_FILE, DEFAULT_LOG_FILE
2224

2325

26+
# Constants for process monitoring
27+
# 프로세스 모니터링 상수
28+
PROCESS_CHECK_INTERVAL_SECONDS = 5
29+
PROCESS_TERMINATION_TIMEOUT = 5
30+
31+
2432
@dataclass
2533
class SubscriberConfig:
2634
"""
@@ -32,6 +40,7 @@ class SubscriberConfig:
3240
key_file: str = DEFAULT_KEY_FILE
3341
log_file: str = DEFAULT_LOG_FILE
3442
scanner_script: str = "qr_scanner.py"
43+
monitor_scanner: bool = True # Enable scanner process monitoring / 스캐너 프로세스 모니터링 활성화
3544

3645

3746
class DoorLensSubscriber:
@@ -41,15 +50,17 @@ class DoorLensSubscriber:
4150
4251
Features:
4352
- Secure subprocess execution (no shell injection)
44-
- Process lifecycle management
53+
- Process lifecycle management with health monitoring
4554
- Graceful shutdown handling
4655
- Message deduplication
56+
- Scanner process crash detection and logging
4757
4858
기능:
4959
- 안전한 서브프로세스 실행 (쉘 인젝션 없음)
50-
- 프로세스 수명 관리
60+
- 상태 모니터링을 포함한 프로세스 수명 관리
5161
- 정상적인 종료 처리
5262
- 메시지 중복 제거
63+
- 스캐너 프로세스 충돌 감지 및 로깅
5364
"""
5465

5566
def __init__(
@@ -76,6 +87,8 @@ def __init__(
7687
self._scanner_process: Optional[subprocess.Popen] = None
7788
self._running = False
7889
self._last_message_id: Optional[str] = None
90+
self._monitor_thread: Optional[threading.Thread] = None
91+
self._monitor_running = False
7992

8093
self._register_signal_handlers()
8194

@@ -108,15 +121,87 @@ def signal_handler(signum, frame):
108121
signal.signal(signal.SIGTERM, signal_handler)
109122
signal.signal(signal.SIGINT, signal_handler)
110123

124+
def _stop_process_monitor(self) -> None:
125+
"""
126+
Stop the process monitor thread.
127+
프로세스 모니터 스레드를 중지합니다.
128+
"""
129+
self._monitor_running = False
130+
if self._monitor_thread is not None and self._monitor_thread.is_alive():
131+
self._monitor_thread.join(timeout=2)
132+
self._monitor_thread = None
133+
134+
def _start_process_monitor(self) -> None:
135+
"""
136+
Start a background thread to monitor the scanner process health.
137+
스캐너 프로세스 상태를 모니터링하는 백그라운드 스레드를 시작합니다.
138+
"""
139+
if not self.config.monitor_scanner:
140+
return
141+
142+
self._stop_process_monitor()
143+
self._monitor_running = True
144+
self._monitor_thread = threading.Thread(
145+
target=self._monitor_scanner_process,
146+
daemon=True,
147+
name="ScannerProcessMonitor"
148+
)
149+
self._monitor_thread.start()
150+
self.logger.debug("Process monitor thread started")
151+
152+
def _monitor_scanner_process(self) -> None:
153+
"""
154+
Monitor scanner process and log if it crashes.
155+
스캐너 프로세스를 모니터링하고 충돌 시 로그를 기록합니다.
156+
157+
This runs in a background thread and checks process status periodically.
158+
백그라운드 스레드에서 실행되며 주기적으로 프로세스 상태를 확인합니다.
159+
"""
160+
while self._monitor_running and self._scanner_process is not None:
161+
time.sleep(PROCESS_CHECK_INTERVAL_SECONDS)
162+
163+
if self._scanner_process is None:
164+
break
165+
166+
# Check if process has terminated
167+
# 프로세스가 종료되었는지 확인
168+
return_code = self._scanner_process.poll()
169+
if return_code is not None:
170+
# Process has terminated
171+
# 프로세스가 종료됨
172+
if return_code == 0:
173+
self.logger.info("Scanner process exited normally (code 0)")
174+
else:
175+
self.logger.error(
176+
f"Scanner process crashed unexpectedly (exit code: {return_code})"
177+
)
178+
# Try to capture stderr for debugging
179+
# 디버깅을 위해 stderr 캡처 시도
180+
try:
181+
if self._scanner_process.stderr:
182+
stderr_output = self._scanner_process.stderr.read()
183+
if stderr_output:
184+
stderr_text = stderr_output.decode('utf-8', errors='replace')
185+
self.logger.error(f"Scanner stderr: {stderr_text[:500]}")
186+
except Exception as e:
187+
self.logger.debug(f"Could not read scanner stderr: {e}")
188+
189+
self._scanner_process = None
190+
break
191+
111192
def _stop_scanner_process(self) -> None:
112193
"""
113194
Stop any running scanner process.
114195
실행 중인 스캐너 프로세스를 중지합니다.
115196
"""
197+
# Stop the monitor first
198+
# 먼저 모니터 중지
199+
self._stop_process_monitor()
200+
116201
if self._scanner_process is not None:
117202
try:
118203
self._scanner_process.terminate()
119-
self._scanner_process.wait(timeout=5)
204+
self._scanner_process.wait(timeout=PROCESS_TERMINATION_TIMEOUT)
120205
self.logger.info("Scanner process terminated")
121206
except subprocess.TimeoutExpired:
122207
self._scanner_process.kill()
@@ -158,6 +243,11 @@ def _start_scanner_process(self) -> bool:
158243
)
159244

160245
self.logger.info(f"Scanner process started (PID: {self._scanner_process.pid})")
246+
247+
# Start process monitor
248+
# 프로세스 모니터 시작
249+
self._start_process_monitor()
250+
161251
return True
162252

163253
except Exception as e:
@@ -183,6 +273,10 @@ def _save_key(self, key_data: str) -> bool:
183273
with open(self.config.key_file, 'w') as f:
184274
f.write(key_data)
185275

276+
# Set restrictive file permissions (owner read/write only)
277+
# 제한적 파일 권한 설정 (소유자 읽기/쓰기만 허용)
278+
os.chmod(self.config.key_file, 0o600)
279+
186280
self.logger.info(f"Key saved to {self.config.key_file}")
187281
return True
188282

0 commit comments

Comments
 (0)