Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions modules/tools/whl-can/auto_test/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Quickstart

Autonomous driving chassis automatic testing tool

---

## Prerequisites

Before using automated testing tools, it's best to perform a check using the whl-can tool, and then launch the automated testing tool.

---

## How to Run

```bash
python3 cli.py --level 1 --config config/default.yaml
```

This will start the program with non-blocking keyboard listening and send
control commands in real time.
159 changes: 159 additions & 0 deletions modules/tools/whl-can/auto_test/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
A robust test runner for validating Apollo's chassis control interface.
Implements V-Model testing tiers (Static -> Dynamic Low -> Dynamic High).

Key Features:
- 100Hz Command Heartbeat (prevents chassis timeouts)
- Safety Watchdog (monitors thread health & chassis errors)
- Tiered Test Execution (L1/L2/L3)
- Real-time Curses UI
"""

import argparse
import time
import curses
import sys

try:
from cyber.python.cyber_py3 import cyber
except ImportError:
print("FATAL: Apollo Cyber RT python modules not found.")
print("Please ensure you are in the Apollo docker environment.")
sys.exit(1)

from core.runner import TestRunner
from core.ui import UIHandler
from config.manager import ConfigManager

from tests.l1_static import (
test_l1_driving_mode_transition,
test_l1_steering_performance_static,
test_l1_static_max_steering_rate,
test_l1_static_brake_consistency,
test_l1_epb_toggle,
test_l1_static_gear_shift,
test_l1_signal_control,
test_l1_sport_mode_toggle,
)
from tests.l2_dynamic_low import (
test_l2_speed_control_loop,
test_l2_throttle_linearity,
test_l2_brake_linearity,
test_l2_throttle_response_time,
test_l2_brake_response_time,
test_l2_gear_protection,
test_l2_mode_protection,
)
from tests.l3_dynamic_high import (
test_l3_staged_acceleration,
test_l3_staged_braking_performance,
test_l3_emergency_brake,
)


def main(stdscr):
# 1. Parse Arguments
parser = argparse.ArgumentParser(description="Apollo Chassis Auto-Tester")
parser.add_argument(
"--level",
type=int,
default=1,
choices=[1, 2, 3],
help="Test Level (1: Static, 2: Low-Speed, 3: High-Dynamic)",
)
parser.add_argument(
"--config",
type=str,
default="config/default.yaml",
help="Path to test configuration file",
)
# Curses wrapper passes extra args awkwardly, so we parse known args only
# In a real CLI, we might handle argv differently before curses.wrapper
args, _ = parser.parse_known_args()

# 2. Initialize Components
config = ConfigManager(args.config)
ui = UIHandler(stdscr)
runner = TestRunner(ui, config, args.level)

# 3. Register Test Cases
# --- Level 1: Static / Functional ---
runner.register_test_case(
test_l1_driving_mode_transition, "TC-FUNC-01: Mode Transition", 1
)
runner.register_test_case(
test_l1_steering_performance_static, "TC-CTRL-07: Static Steer Step", 1
)
runner.register_test_case(
test_l1_static_max_steering_rate, "TC-CTRL-08: Static Max Steer Test", 1
)
runner.register_test_case(
test_l1_static_brake_consistency, "TC-CTRL-04-S: Static Brake Consistency", 1
)
runner.register_test_case(test_l1_epb_toggle, "TC-FUNC-04: EPB Toggle", 1)
runner.register_test_case(
test_l1_static_gear_shift, "TC-FUNC-02: Static Gear Shift", 1
)
runner.register_test_case(
test_l1_sport_mode_toggle, "TC-FUNC-05: Sport/Mode Toggle", 1
)
runner.register_test_case(test_l1_signal_control, "TC-SIG-ALL: Body Signals", 1)

# --- Level 2: Dynamic Low Speed ---
runner.register_test_case(
test_l2_speed_control_loop, "TC-CTRL-09: Low Speed Loop", 2
)
runner.register_test_case(
test_l2_throttle_linearity, "TC-CTRL-01: Throttle Linearity", 2
)
runner.register_test_case(test_l2_brake_linearity, "TC-CTRL-04: Brake Linearity", 2)
runner.register_test_case(
test_l2_throttle_response_time, "TC-CTRL-02: Throttle Resp.", 2
)
runner.register_test_case(test_l2_brake_response_time, "TC-CTRL-05: Brake Resp.", 2)
runner.register_test_case(test_l2_gear_protection, "TC-FUNC-03: Gear Protection", 2)
runner.register_test_case(test_l2_mode_protection, "TC-FUNC-06: Mode Protection", 2)

# --- Level 3: Dynamic High Performance ---
runner.register_test_case(
test_l3_staged_acceleration, "TC-CTRL-10: Staged Accel", 3
)
runner.register_test_case(
test_l3_staged_braking_performance, "TC-CTRL-06: Staged Brake", 3
)
runner.register_test_case(
test_l3_emergency_brake, "TC-SAFETY-01: Emergency Brake", 3
)

# 4. Execution Loop
try:
runner.start() # Starts Heartbeat & Watchdog
runner.run_sequence()
except KeyboardInterrupt:
ui.log("User interrupted (Ctrl+C). Stopping...", "RED")
except Exception as e:
ui.log(f"CRITICAL ERROR: {str(e)}", "RED")
import traceback

ui.log(traceback.format_exc(), "RED")
time.sleep(3) # Give user time to see the error
finally:
ui.log("Shutting down...", "CYAN")
runner.cleanup()


if __name__ == "__main__":
# Environment Check
if not cyber.ok():
print("Initializing Cyber RT...")
cyber.init()

try:
# Using curses.wrapper to handle terminal setup/teardown automatically
curses.wrapper(main)
except Exception as e:
print(f"Failed to initialize UI: {e}")
cyber.shutdown()
Empty file.
17 changes: 17 additions & 0 deletions modules/tools/whl-can/auto_test/config/default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# config/default.yaml
topics:
control: "/apollo/control"
chassis: "/apollo/chassis"

limits:
max_speed_mps: 20.0
max_throttle_pct: 100.0
max_brake_pct: 100.0
max_steering_pct: 100.0
emergency_brake_val: 60.0

thresholds:
latency_ms: 200
steering_steady_error: 2.0
speed_steady_error: 0.5
chassis_wait_sec: 5.0
31 changes: 31 additions & 0 deletions modules/tools/whl-can/auto_test/config/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import yaml
import os


class ConfigManager:
def __init__(self, path):
self.data = {
"topics": {"control": "/apollo/control", "chassis": "/apollo/chassis"},
"limits": {"emergency_brake_val": 50.0},
"thresholds": {"steady_error": 2.0, "chassis_wait_sec": 5.0},
}
if os.path.exists(path):
try:
with open(path, "r") as f:
user_config = yaml.safe_load(f)
if user_config:
self.data.update(user_config)
except Exception as e:
print(f"Warning: Failed to load config {path}: {e}")

@property
def topics(self):
return self.data["topics"]

@property
def limits(self):
return self.data["limits"]

@property
def thresholds(self):
return self.data["thresholds"]
Empty file.
80 changes: 80 additions & 0 deletions modules/tools/whl-can/auto_test/core/reporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
import csv
import time
import threading
import matplotlib.pyplot as plt
from datetime import datetime
from typing import List, Tuple


class DataRecorder:
"""
Responsible for high-frequency data collection, CSV storage, and plotting.
Implements decoupling of data and logic.
"""

def __init__(self, test_name: str, output_dir: str = "test_results"):
self.test_name = test_name
self.output_dir = os.path.join(
output_dir, datetime.now().strftime("%Y%m%d_%H%M%S")
)
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)

# Data buffer: (timestamp, cmd_val, feedback_val, speed_mps)
self.data: List[Tuple[float, float, float, float]] = []
self.is_recording = False
self._lock = threading.Lock()
self.start_time = 0.0

def start(self):
with self._lock:
self.data = []
self.start_time = time.time()
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot uses AI. Check for mistakes.
self.is_recording = True

def record(self, cmd_val: float, feedback_val: float, speed_mps: float = 0.0):
"""Call this method in the test loop to record data points."""
if not self.is_recording:
return
with self._lock:
t = time.time() - self.start_time
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot uses AI. Check for mistakes.
self.data.append((t, cmd_val, feedback_val, speed_mps))

def stop(self):
with self._lock:
self.is_recording = False

def save_and_plot(self, title: str, ylabel: str, save_name: str):
"""Save the CSV file and generate a PNG image."""
if not self.data:
return

# 1. Save CSV
csv_path = os.path.join(self.output_dir, f"{save_name}.csv")
with open(csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Time(s)", "Command", "Feedback", "Speed(m/s)"])
writer.writerows(self.data)

# 2. Plot (Backend Agg for headless environments)
plt.switch_backend("Agg")
times = [d[0] for d in self.data]
cmds = [d[1] for d in self.data]
fdbks = [d[2] for d in self.data]

plt.figure(figsize=(10, 6))
plt.plot(times, cmds, "r--", label="Command", linewidth=1.5)
plt.plot(times, fdbks, "b-", label="Feedback", linewidth=2.0)

plt.title(f"{self.test_name}: {title}")
plt.xlabel("Time (s)")
plt.ylabel(ylabel)
plt.legend()
plt.grid(True)

png_path = os.path.join(self.output_dir, f"{save_name}.png")
plt.savefig(png_path)
plt.close()

return png_path
Loading
Loading