Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions hostpart/email_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ def send_key(
if not os.path.isfile(key_path):
raise EmailSendError(f"Key file not found: {key_path}")

# Security: Validate path to prevent path traversal attacks
# 보안: 경로 탐색 공격을 방지하기 위한 경로 검증
abs_key_path = os.path.abspath(key_path)
# Ensure the file is within the current working directory or explicitly allowed paths
# 파일이 현재 작업 디렉토리 또는 명시적으로 허용된 경로 내에 있는지 확인
cwd = os.path.abspath(os.getcwd())
if not abs_key_path.startswith(cwd + os.sep) and not abs_key_path == cwd:
# Check if it's a direct file in cwd
if os.path.dirname(abs_key_path) != cwd:
self.logger.warning(f"Potential path traversal attempt: {key_path}")
raise EmailSendError(
f"Invalid key path: file must be within the working directory. "
f"잘못된 키 경로: 파일은 작업 디렉토리 내에 있어야 합니다."
)
Comment on lines +101 to +112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

경로 탐색(Path Traversal) 방지 로직이 올바르게 작동하지만, 중첩된 if 문으로 인해 다소 복잡하고 이해하기 어렵습니다. 또한, 심볼릭 링크를 처리하기 위해 os.path.abspath 대신 os.path.realpath를 사용하는 것이 더 안전합니다. 가독성을 높이고 코드를 더 명확하게 만들기 위해 로직을 단순화하는 것을 제안합니다.

Suggested change
abs_key_path = os.path.abspath(key_path)
# Ensure the file is within the current working directory or explicitly allowed paths
# 파일이 현재 작업 디렉토리 또는 명시적으로 허용된 경로 내에 있는지 확인
cwd = os.path.abspath(os.getcwd())
if not abs_key_path.startswith(cwd + os.sep) and not abs_key_path == cwd:
# Check if it's a direct file in cwd
if os.path.dirname(abs_key_path) != cwd:
self.logger.warning(f"Potential path traversal attempt: {key_path}")
raise EmailSendError(
f"Invalid key path: file must be within the working directory. "
f"잘못된 키 경로: 파일은 작업 디렉토리 내에 있어야 합니다."
)
abs_key_path = os.path.realpath(key_path)
# Ensure the file is within the current working directory or explicitly allowed paths
# 파일이 현재 작업 디렉토리 또는 명시적으로 허용된 경로 내에 있는지 확인
cwd = os.path.realpath(os.getcwd())
# Ensure the resolved file path is within the current working directory.
if not (abs_key_path.startswith(cwd + os.sep) or abs_key_path == cwd):
self.logger.warning(f"Potential path traversal attempt: {key_path}")
raise EmailSendError(
f"Invalid key path: file must be within the working directory. "
f"잘못된 키 경로: 파일은 작업 디렉토리 내에 있어야 합니다."
)


subject = subject or self.config.subject
body = body or self.config.body

Expand Down
15 changes: 15 additions & 0 deletions hostpart/key_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,22 @@ def create_qr_image(

Returns:
str: Path to generated QR code image / 생성된 QR 코드 이미지 경로

Raises:
ValueError: If output_dir is invalid / output_dir가 유효하지 않을 경우
"""
# Validate and create output directory if it doesn't exist
# 출력 디렉토리가 존재하지 않으면 검증 및 생성
if not output_dir:
raise ValueError("output_dir cannot be empty / output_dir는 비워둘 수 없습니다")

if not os.path.isdir(output_dir):
try:
os.makedirs(output_dir, exist_ok=True)
self.logger.info(f"Created output directory: {output_dir}")
except OSError as e:
raise ValueError(f"Failed to create output directory: {output_dir}. Error: {e}")
Comment on lines +208 to +213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

os.makedirs 함수에 exist_ok=True를 사용하고 있으므로, 사전에 os.path.isdir(output_dir)로 디렉토리 존재 여부를 확인할 필요가 없습니다. 이 확인 로직은 불필요하며, 아주 드물지만 TOCTOU(Time-of-check-to-time-of-use) 경합 조건을 유발할 수 있습니다. 예를 들어, isdir 확인 후 makedirs 호출 전에 다른 프로세스가 해당 경로에 파일을 생성할 수 있습니다. os.makedirs 호출을 try-except 블록으로 감싸서 코드를 단순화하고 더 안전하게 만드는 것이 좋습니다.

        try:
            os.makedirs(output_dir, exist_ok=True)
            self.logger.info(f"Created output directory: {output_dir}")
        except OSError as e:
            raise ValueError(f"Failed to create output directory: {output_dir}. Error: {e}")


# Create QR code
# QR 코드 생성
qr_data = key.to_json(include_utc=include_utc)
Expand Down
100 changes: 100 additions & 0 deletions hostpart/test_enhanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,57 @@ def test_create_key_returns_tuple(self):
self.assertEqual(data['doorID'], 'test-door')


class TestKeyGeneratorSecurity(unittest.TestCase):
"""Test cases for KeyGenerator security improvements."""

def setUp(self):
"""Set up test fixtures."""
self.test_dir = tempfile.mkdtemp()
self.original_dir = os.getcwd()
os.chdir(self.test_dir)

def tearDown(self):
"""Clean up test fixtures."""
os.chdir(self.original_dir)
shutil.rmtree(self.test_dir)

def test_create_qr_image_creates_missing_directory(self):
"""Test that create_qr_image creates missing output directory."""
config = KeyGeneratorConfig(door_id="test-door")
generator = KeyGenerator(config)

key = generator.generate_key()
nested_dir = os.path.join(self.test_dir, "nested", "output", "dir")

qr_path = generator.create_qr_image(key, output_dir=nested_dir)

self.assertTrue(os.path.exists(nested_dir))
self.assertTrue(os.path.exists(qr_path))

def test_create_qr_image_empty_output_dir_raises_error(self):
"""Test that empty output_dir raises ValueError."""
config = KeyGeneratorConfig(door_id="test-door")
generator = KeyGenerator(config)

key = generator.generate_key()

with self.assertRaises(ValueError) as context:
generator.create_qr_image(key, output_dir="")

self.assertIn("empty", str(context.exception).lower())

def test_create_qr_image_invalid_path_raises_error(self):
"""Test that invalid path raises ValueError."""
config = KeyGeneratorConfig(door_id="test-door")
generator = KeyGenerator(config)

key = generator.generate_key()

# Use a path that cannot be created (null byte in path)
with self.assertRaises(ValueError):
generator.create_qr_image(key, output_dir="/nonexistent\x00path")


class TestEmailSender(unittest.TestCase):
"""Test cases for EmailSender class."""

Expand Down Expand Up @@ -271,6 +322,55 @@ def test_send_key_missing_file(self):
with self.assertRaises(EmailSendError):
sender.send_key("nonexistent.png", "recipient@example.com")

def test_send_key_path_traversal_prevention(self):
"""Test that path traversal attacks are blocked."""
config = EmailConfig(
sender_id="test",
sender_password="password"
)
sender = EmailSender(config)

# Create a file outside cwd to simulate path traversal attack
with tempfile.NamedTemporaryFile(delete=False, suffix='.png', dir='/tmp') as f:
test_file = f.name
f.write(b'test image data')

try:
# This should raise EmailSendError due to path traversal protection
with self.assertRaises(EmailSendError) as context:
sender.send_key(test_file, "recipient@example.com")

self.assertIn("working directory", str(context.exception).lower())

finally:
if os.path.exists(test_file):
os.remove(test_file)

@patch('email_sender.smtplib.SMTP')
def test_send_key_within_cwd_succeeds(self, mock_smtp):
"""Test that files within current working directory are allowed."""
mock_server = MagicMock()
mock_smtp.return_value = mock_server

config = EmailConfig(
sender_id="test",
sender_password="password"
)
sender = EmailSender(config)

# Create a file in current working directory
with tempfile.NamedTemporaryFile(delete=False, suffix='.png', dir=os.getcwd()) as f:
test_file = f.name
f.write(b'test image data')

try:
result = sender.send_key(test_file, "recipient@example.com")
self.assertTrue(result)

finally:
if os.path.exists(test_file):
os.remove(test_file)

@patch('email_sender.smtplib.SMTP')
def test_send_key_auth_failure(self, mock_smtp):
"""Test handling of authentication failure."""
Expand Down
12 changes: 12 additions & 0 deletions raspart/door_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(
# 속도 제한 상태
self._last_unlock_time: Optional[datetime] = None
self._initialized = False
self._cleanup_done = False # Flag to prevent double cleanup / 중복 정리 방지 플래그

# Initialize GPIO
# GPIO 초기화
Expand Down Expand Up @@ -211,7 +212,18 @@ def cleanup(self) -> None:
"""
Clean up GPIO resources.
GPIO 리소스를 정리합니다.

Thread-safe cleanup that prevents race conditions when called
from signal handlers.
신호 처리기에서 호출될 때 경쟁 조건을 방지하는 스레드 안전 정리입니다.
"""
# Prevent double cleanup (race condition protection)
# 중복 정리 방지 (경쟁 조건 보호)
if self._cleanup_done:
return

self._cleanup_done = True

if not self._initialized:
return

Expand Down
100 changes: 97 additions & 3 deletions raspart/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import subprocess
import sys
import logging
import threading
import time
from typing import Optional, Callable
from datetime import datetime
from dataclasses import dataclass
Expand All @@ -21,6 +23,12 @@
from constants import DEFAULT_KEY_FILE, DEFAULT_LOG_FILE


# Constants for process monitoring
# 프로세스 모니터링 상수
PROCESS_CHECK_INTERVAL_SECONDS = 5
PROCESS_TERMINATION_TIMEOUT = 5


@dataclass
class SubscriberConfig:
"""
Expand All @@ -32,6 +40,7 @@ class SubscriberConfig:
key_file: str = DEFAULT_KEY_FILE
log_file: str = DEFAULT_LOG_FILE
scanner_script: str = "qr_scanner.py"
monitor_scanner: bool = True # Enable scanner process monitoring / 스캐너 프로세스 모니터링 활성화


class DoorLensSubscriber:
Expand All @@ -41,15 +50,17 @@ class DoorLensSubscriber:

Features:
- Secure subprocess execution (no shell injection)
- Process lifecycle management
- Process lifecycle management with health monitoring
- Graceful shutdown handling
- Message deduplication
- Scanner process crash detection and logging

기능:
- 안전한 서브프로세스 실행 (쉘 인젝션 없음)
- 프로세스 수명 관리
- 상태 모니터링을 포함한 프로세스 수명 관리
- 정상적인 종료 처리
- 메시지 중복 제거
- 스캐너 프로세스 충돌 감지 및 로깅
"""

def __init__(
Expand All @@ -76,6 +87,8 @@ def __init__(
self._scanner_process: Optional[subprocess.Popen] = None
self._running = False
self._last_message_id: Optional[str] = None
self._monitor_thread: Optional[threading.Thread] = None
self._monitor_running = False

self._register_signal_handlers()

Expand Down Expand Up @@ -108,15 +121,87 @@ def signal_handler(signum, frame):
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

def _stop_process_monitor(self) -> None:
"""
Stop the process monitor thread.
프로세스 모니터 스레드를 중지합니다.
"""
self._monitor_running = False
if self._monitor_thread is not None and self._monitor_thread.is_alive():
self._monitor_thread.join(timeout=2)
self._monitor_thread = None
Comment on lines +130 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

프로세스 모니터 스레드를 중지할 때 join의 타임아웃이 2초로 설정되어 있습니다. 하지만 모니터링 루프의 sleep 간격(PROCESS_CHECK_INTERVAL_SECONDS)은 5초입니다. 이로 인해 join이 거의 항상 타임아웃되어 이전 모니터 스레드가 아직 실행 중인 상태에서 새 스레드가 시작될 수 있는 경합 조건이 발생합니다. 이로 인해 여러 모니터 스레드가 동시에 실행될 수 있습니다. join의 타임아웃을 PROCESS_CHECK_INTERVAL_SECONDS보다 길게 설정하여 스레드가 확실히 종료될 때까지 기다리도록 수정하는 것이 좋습니다.

Suggested change
if self._monitor_thread is not None and self._monitor_thread.is_alive():
self._monitor_thread.join(timeout=2)
self._monitor_thread = None
if self._monitor_thread is not None and self._monitor_thread.is_alive():
self._monitor_thread.join(timeout=PROCESS_CHECK_INTERVAL_SECONDS + 1)
self._monitor_thread = None


def _start_process_monitor(self) -> None:
"""
Start a background thread to monitor the scanner process health.
스캐너 프로세스 상태를 모니터링하는 백그라운드 스레드를 시작합니다.
"""
if not self.config.monitor_scanner:
return

self._stop_process_monitor()
self._monitor_running = True
self._monitor_thread = threading.Thread(
target=self._monitor_scanner_process,
daemon=True,
name="ScannerProcessMonitor"
)
self._monitor_thread.start()
self.logger.debug("Process monitor thread started")

def _monitor_scanner_process(self) -> None:
"""
Monitor scanner process and log if it crashes.
스캐너 프로세스를 모니터링하고 충돌 시 로그를 기록합니다.

This runs in a background thread and checks process status periodically.
백그라운드 스레드에서 실행되며 주기적으로 프로세스 상태를 확인합니다.
"""
while self._monitor_running and self._scanner_process is not None:
time.sleep(PROCESS_CHECK_INTERVAL_SECONDS)

if self._scanner_process is None:
break

# Check if process has terminated
# 프로세스가 종료되었는지 확인
return_code = self._scanner_process.poll()
if return_code is not None:
# Process has terminated
# 프로세스가 종료됨
if return_code == 0:
self.logger.info("Scanner process exited normally (code 0)")
else:
self.logger.error(
f"Scanner process crashed unexpectedly (exit code: {return_code})"
)
# Try to capture stderr for debugging
# 디버깅을 위해 stderr 캡처 시도
try:
if self._scanner_process.stderr:
stderr_output = self._scanner_process.stderr.read()
if stderr_output:
stderr_text = stderr_output.decode('utf-8', errors='replace')
self.logger.error(f"Scanner stderr: {stderr_text[:500]}")
except Exception as e:
self.logger.debug(f"Could not read scanner stderr: {e}")

self._scanner_process = None
break

def _stop_scanner_process(self) -> None:
"""
Stop any running scanner process.
실행 중인 스캐너 프로세스를 중지합니다.
"""
# Stop the monitor first
# 먼저 모니터 중지
self._stop_process_monitor()

if self._scanner_process is not None:
try:
self._scanner_process.terminate()
self._scanner_process.wait(timeout=5)
self._scanner_process.wait(timeout=PROCESS_TERMINATION_TIMEOUT)
self.logger.info("Scanner process terminated")
except subprocess.TimeoutExpired:
self._scanner_process.kill()
Expand Down Expand Up @@ -158,6 +243,11 @@ def _start_scanner_process(self) -> bool:
)

self.logger.info(f"Scanner process started (PID: {self._scanner_process.pid})")

# Start process monitor
# 프로세스 모니터 시작
self._start_process_monitor()

return True

except Exception as e:
Expand All @@ -183,6 +273,10 @@ def _save_key(self, key_data: str) -> bool:
with open(self.config.key_file, 'w') as f:
f.write(key_data)

# Set restrictive file permissions (owner read/write only)
# 제한적 파일 권한 설정 (소유자 읽기/쓰기만 허용)
os.chmod(self.config.key_file, 0o600)

self.logger.info(f"Key saved to {self.config.key_file}")
return True

Expand Down
Loading