-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtest_coverage_gap_infrastructure.py
More file actions
157 lines (123 loc) · 5.65 KB
/
test_coverage_gap_infrastructure.py
File metadata and controls
157 lines (123 loc) · 5.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python3
"""
Infrastructure Coverage Gap Tests
Addresses modules with 0% coverage: recovery, timeout_manager, simulation_monitor,
simulation_utils, visualization_optimizer.
"""
import asyncio
import logging
from pathlib import Path
# Import targets
from utils.recovery import RecoveryArgumentParser, setup_step_logging
from utils.simulation_monitor import SimulationMonitor
from utils.simulation_utils import DiagramAnalyzer, SimulationTracker
from utils.timeout_manager import (
LLMTimeoutManager,
ProcessTimeoutManager,
TimeoutConfig,
TimeoutManager,
)
from utils.visualization_optimizer import (
DataSampler,
VisualizationCache,
VisualizationOptimizer,
)
# 1. Tests for utils/recovery.py
class TestRecoveryUtils:
def test_recovery_argument_parser(self):
args = RecoveryArgumentParser.parse_step_arguments("test_step")
assert args.step_name == "test_step"
assert args.verbose is False
assert isinstance(args.output_dir, Path)
def test_setup_step_logging(self):
logger = setup_step_logging("test_recovery", verbose=True)
assert logger.level == logging.DEBUG
logger = setup_step_logging("test_recovery_info", verbose=False)
assert logger.level == logging.INFO
# 2. Tests for utils/timeout_manager.py
class TestTimeoutManager:
def test_sync_timeout_success(self):
manager = TimeoutManager()
config = TimeoutConfig(base_timeout=1.0, max_retries=0)
def fast_func(x): return x * 2
with manager.sync_timeout("test_sync", config, fast_func, 5) as result:
assert result.success is True
assert result.result == 10
def test_async_timeout_success(self):
"""Exercise async_timeout without pytest-asyncio (core env may omit dev extras)."""
async def _run() -> None:
manager = TimeoutManager()
config = TimeoutConfig(base_timeout=1.0, max_retries=0)
async def fast_async(x: int) -> int:
return x * 2
async with manager.async_timeout("test_async", config, fast_async, 5) as result:
assert result.success is True
assert result.result == 10
asyncio.run(_run())
def test_llm_timeout_manager(self):
manager = LLMTimeoutManager()
assert manager.default_config.base_timeout == 60.0
def test_process_timeout_manager(self):
manager = ProcessTimeoutManager()
assert manager.default_config.base_timeout == 120.0
# 3. Tests for utils/simulation_monitor.py
class TestSimulationMonitor:
def test_monitor_initialization(self, tmp_path):
log_file = tmp_path / "sim.log"
monitor = SimulationMonitor(log_file=log_file)
assert monitor.log_file == log_file
assert monitor.execution_data["total_attempted"] == 0
def test_track_simulation_decorator(self, tmp_path):
monitor = SimulationMonitor(log_file=tmp_path / "sim.log")
@monitor.track_simulation("test_sim")
def lucky_sim(x): return x + 1
result = lucky_sim(10)
assert result == 11
assert monitor.execution_data["total_successful"] == 1
assert "test_sim" in monitor.execution_data["simulations"]
def test_monitor_data_collection(self, tmp_path):
monitor = SimulationMonitor(log_file=tmp_path / "sim.log")
assert monitor.monitor_data_collection([1, 2, 3], "test") is True
assert monitor.monitor_data_collection([], "fail") is False
# 4. Tests for utils/simulation_utils.py
class TestSimulationUtils:
def test_simulation_tracker(self, tmp_path):
tracker = SimulationTracker("model_a", "pymdp", tmp_path)
tracker.log_step(0, [1, 0], [0], [1], 1.0)
assert len(tracker.data["traces"]["rewards"]) == 1
tracker.calculate_summary_stats()
assert tracker.data["summary_stats"]["total_reward"] == 1.0
def test_diagram_analyzer(self, tmp_path):
analyzer = DiagramAnalyzer("test_model", tmp_path)
analyzer.log_diagram("D1", "A", "B", {"prop": 1})
assert len(analyzer.analysis_data["diagrams"]) == 1
report_path = analyzer.generate_diagram_report()
assert report_path.exists()
# 5. Tests for utils/visualization_optimizer.py
class TestVisualizationOptimizer:
def test_visualization_cache(self, tmp_path):
cache_dir = tmp_path / "cache"
cache = VisualizationCache(cache_dir=cache_dir)
key = cache.get_cache_key("content", {"p": 1})
assert cache.is_cached(key) is False
dummy_file = tmp_path / "viz.png"
dummy_file.touch()
cache.cache_visualization(key, [str(dummy_file)])
assert cache.is_cached(key) is True
assert cache.get_cached_files(key) == [str(dummy_file)]
def test_data_sampler(self):
sampler = DataSampler(max_nodes=10)
data = {"nodes": [{"id": i} for i in range(20)]}
assert sampler.should_sample(data) is True
sampled = sampler.sample_data(data)
assert len(sampled["nodes"]) == 10
assert sampled["_sampling_applied"] is True
def test_optimizer_batch(self, tmp_path):
optimizer = VisualizationOptimizer(cache_dir=tmp_path / "cache")
def dummy_proc(file_path, **kwargs):
return {"success": True, "file": str(file_path)}
files = [tmp_path / f"file_{i}.md" for i in range(3)]
for f in files: f.touch()
results = optimizer.optimize_batch_processing(files, tmp_path, dummy_proc)
assert len(results["processed_files"]) == 3
assert results["optimization_stats"]["caching_enabled"] is True