|
| 1 | +import os |
| 2 | +import tempfile |
| 3 | +import time |
| 4 | +import unittest |
| 5 | +import shutil |
| 6 | + |
| 7 | +from carbontracker.tracker import CarbonTracker |
| 8 | +from carbontracker.report import LogParser, format_duration |
| 9 | + |
| 10 | + |
| 11 | +class TestFormatDuration(unittest.TestCase): |
| 12 | + """Tests for the format_duration helper function.""" |
| 13 | + |
| 14 | + def test_format_duration_seconds_only(self): |
| 15 | + self.assertEqual(format_duration(30), "30s") |
| 16 | + |
| 17 | + def test_format_duration_minutes_and_seconds(self): |
| 18 | + self.assertEqual(format_duration(90), "1min 30s") |
| 19 | + |
| 20 | + def test_format_duration_hours_minutes_seconds(self): |
| 21 | + self.assertEqual(format_duration(3661), "1h 1min 1s") |
| 22 | + |
| 23 | + def test_format_duration_hours_only(self): |
| 24 | + self.assertEqual(format_duration(3600), "1h") |
| 25 | + |
| 26 | + def test_format_duration_zero(self): |
| 27 | + self.assertEqual(format_duration(0), "0s") |
| 28 | + |
| 29 | + |
| 30 | +class TestLogParser(unittest.TestCase): |
| 31 | + """ |
| 32 | + Tests for the LogParser class using dynamically generated logs. |
| 33 | + |
| 34 | + This test class generates real logs from CarbonTracker using simulation mode, |
| 35 | + ensuring the LogParser is always tested against the current log format. |
| 36 | + """ |
| 37 | + |
| 38 | + @classmethod |
| 39 | + def setUpClass(cls): |
| 40 | + """Generate a log file using CarbonTracker's simulation mode.""" |
| 41 | + cls.temp_dir = tempfile.mkdtemp() |
| 42 | + cls.log_prefix = "test_report_parser" |
| 43 | + |
| 44 | + # Known simulation parameters - we use these to verify parsing |
| 45 | + cls.sim_cpu_name = "TestCPU" |
| 46 | + cls.sim_cpu_tdp = 50 # 50W TDP for CPU |
| 47 | + cls.sim_gpu_name = "TestGPU" |
| 48 | + cls.sim_gpu_watts = 100 # 100W for GPU |
| 49 | + cls.num_epochs = 3 |
| 50 | + cls.epoch_duration = 0.3 # seconds of simulated work per epoch |
| 51 | + |
| 52 | + # Use simulation mode to generate deterministic logs |
| 53 | + tracker = CarbonTracker( |
| 54 | + epochs=cls.num_epochs, |
| 55 | + epochs_before_pred=-1, # Skip prediction |
| 56 | + monitor_epochs=-1, # Monitor all epochs |
| 57 | + update_interval=0.1, |
| 58 | + log_dir=cls.temp_dir, |
| 59 | + log_file_prefix=cls.log_prefix, |
| 60 | + verbose=0, # Suppress output |
| 61 | + sim_cpu=cls.sim_cpu_name, |
| 62 | + sim_cpu_tdp=cls.sim_cpu_tdp, |
| 63 | + sim_gpu=cls.sim_gpu_name, |
| 64 | + sim_gpu_watts=cls.sim_gpu_watts, |
| 65 | + ) |
| 66 | + |
| 67 | + # Run a simulated tracking session |
| 68 | + for epoch in range(cls.num_epochs): |
| 69 | + tracker.epoch_start() |
| 70 | + time.sleep(cls.epoch_duration) # Simulate some work |
| 71 | + tracker.epoch_end() |
| 72 | + |
| 73 | + tracker.stop() |
| 74 | + |
| 75 | + # Find the generated log file (the standard log, not output log) |
| 76 | + cls.std_log_path = None |
| 77 | + for filename in os.listdir(cls.temp_dir): |
| 78 | + if (filename.startswith(cls.log_prefix) and |
| 79 | + not filename.endswith("_output.log") and |
| 80 | + not filename.endswith("_err.log") and |
| 81 | + filename.endswith(".log")): |
| 82 | + cls.std_log_path = os.path.join(cls.temp_dir, filename) |
| 83 | + break |
| 84 | + |
| 85 | + if cls.std_log_path is None: |
| 86 | + raise RuntimeError(f"No standard log file found in {cls.temp_dir}") |
| 87 | + |
| 88 | + # Read the generated log content |
| 89 | + with open(cls.std_log_path, 'r') as f: |
| 90 | + cls.log_content = f.read() |
| 91 | + |
| 92 | + # Parse immediately to verify log content is valid |
| 93 | + cls.parser = LogParser(cls.log_content) |
| 94 | + |
| 95 | + @classmethod |
| 96 | + def tearDownClass(cls): |
| 97 | + """Clean up temporary log files.""" |
| 98 | + shutil.rmtree(cls.temp_dir, ignore_errors=True) |
| 99 | + |
| 100 | + def test_parser_parses_version(self): |
| 101 | + """Test that LogParser correctly extracts the carbontracker version.""" |
| 102 | + self.assertIsNotNone(self.parser.version) |
| 103 | + # Version should be a string like "X.Y.Z" or a dev version |
| 104 | + self.assertIsInstance(self.parser.version, str) |
| 105 | + # Version string should contain digits |
| 106 | + self.assertTrue(any(c.isdigit() for c in self.parser.version)) |
| 107 | + |
| 108 | + def test_parser_parses_pue(self): |
| 109 | + """Test that LogParser correctly extracts the PUE coefficient.""" |
| 110 | + self.assertIsNotNone(self.parser.pue) |
| 111 | + # PUE should be a positive float > 1.0 (industry standard) |
| 112 | + self.assertGreater(self.parser.pue, 1.0) |
| 113 | + self.assertLess(self.parser.pue, 3.0) # Reasonable upper bound |
| 114 | + |
| 115 | + def test_parser_parses_components(self): |
| 116 | + """Test that LogParser correctly extracts component information.""" |
| 117 | + self.assertIsNotNone(self.parser.components) |
| 118 | + # Should contain GPU and CPU references (simulated devices) |
| 119 | + self.assertIn("GPU", self.parser.components) |
| 120 | + self.assertIn("CPU", self.parser.components) |
| 121 | + # Should contain our simulated device names |
| 122 | + self.assertIn(self.sim_gpu_name, self.parser.components) |
| 123 | + self.assertIn(self.sim_cpu_name, self.parser.components) |
| 124 | + |
| 125 | + def test_parser_parses_correct_number_of_epochs(self): |
| 126 | + """Test that LogParser extracts the correct number of epochs.""" |
| 127 | + self.assertEqual(len(self.parser.epochs), self.num_epochs) |
| 128 | + |
| 129 | + def test_parser_epochs_have_required_fields(self): |
| 130 | + """Test that each epoch has all required fields.""" |
| 131 | + for i, epoch in enumerate(self.parser.epochs): |
| 132 | + with self.subTest(epoch=i + 1): |
| 133 | + self.assertIn('epoch', epoch) |
| 134 | + self.assertIn('duration', epoch) |
| 135 | + self.assertIn('gpu_power', epoch) |
| 136 | + self.assertIn('cpu_power', epoch) |
| 137 | + self.assertIn('total_power', epoch) |
| 138 | + |
| 139 | + def test_parser_epoch_numbers_sequential(self): |
| 140 | + """Test that epoch numbers are sequential starting from 1.""" |
| 141 | + for i, epoch in enumerate(self.parser.epochs): |
| 142 | + self.assertEqual(epoch['epoch'], i + 1) |
| 143 | + |
| 144 | + def test_parser_epoch_durations_positive(self): |
| 145 | + """Test that epoch durations are positive and reasonable.""" |
| 146 | + for epoch in self.parser.epochs: |
| 147 | + self.assertGreaterEqual(epoch['duration'], 0) |
| 148 | + # Duration should be less than 60 seconds for our test |
| 149 | + self.assertLess(epoch['duration'], 60) |
| 150 | + |
| 151 | + def test_parser_power_values_match_simulation(self): |
| 152 | + """Test that power values approximately match our simulation parameters.""" |
| 153 | + for epoch in self.parser.epochs: |
| 154 | + # GPU power should be approximately sim_gpu_watts * 0.5 (50% utilization default) |
| 155 | + expected_gpu_power = self.sim_gpu_watts * 0.5 |
| 156 | + self.assertAlmostEqual(epoch['gpu_power'], expected_gpu_power, delta=expected_gpu_power * 0.2) |
| 157 | + |
| 158 | + # CPU power should be approximately sim_cpu_tdp * 0.5 (50% utilization default) |
| 159 | + expected_cpu_power = self.sim_cpu_tdp * 0.5 |
| 160 | + self.assertAlmostEqual(epoch['cpu_power'], expected_cpu_power, delta=expected_cpu_power * 0.2) |
| 161 | + |
| 162 | + def test_parser_total_power_is_sum(self): |
| 163 | + """Test that total_power equals gpu_power + cpu_power.""" |
| 164 | + for epoch in self.parser.epochs: |
| 165 | + expected_total = epoch['gpu_power'] + epoch['cpu_power'] |
| 166 | + self.assertAlmostEqual(epoch['total_power'], expected_total, places=5) |
| 167 | + |
| 168 | + def test_parser_parses_timestamps(self): |
| 169 | + """Test that LogParser correctly extracts timestamps.""" |
| 170 | + self.assertIsNotNone(self.parser.start_time) |
| 171 | + self.assertIsNotNone(self.parser.end_time) |
| 172 | + # Timestamps should be in expected format (YYYY-MM-DD HH:MM:SS) |
| 173 | + self.assertRegex(self.parser.start_time, r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}') |
| 174 | + self.assertRegex(self.parser.end_time, r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}') |
| 175 | + |
| 176 | + def test_calculate_energy_metrics_structure(self): |
| 177 | + """Test that calculate_energy_metrics returns expected structure.""" |
| 178 | + # Ensure carbon_intensity is set for energy metrics calculation |
| 179 | + if self.parser.carbon_intensity is None: |
| 180 | + self.parser.carbon_intensity = 150.0 # Mock value in gCO2eq/kWh |
| 181 | + |
| 182 | + metrics = self.parser.calculate_energy_metrics() |
| 183 | + |
| 184 | + # Check all expected keys are present |
| 185 | + expected_keys = ['total_duration', 'avg_gpu_power', 'avg_cpu_power', |
| 186 | + 'total_power', 'energy_kwh', 'co2_kg'] |
| 187 | + for key in expected_keys: |
| 188 | + self.assertIn(key, metrics) |
| 189 | + |
| 190 | + def test_calculate_energy_metrics_values_non_negative(self): |
| 191 | + """Test that all energy metrics are non-negative.""" |
| 192 | + # Ensure carbon_intensity is set for energy metrics calculation |
| 193 | + if self.parser.carbon_intensity is None: |
| 194 | + self.parser.carbon_intensity = 150.0 # Mock value in gCO2eq/kWh |
| 195 | + |
| 196 | + metrics = self.parser.calculate_energy_metrics() |
| 197 | + |
| 198 | + self.assertGreaterEqual(metrics['total_duration'], 0) |
| 199 | + self.assertGreaterEqual(metrics['avg_gpu_power'], 0) |
| 200 | + self.assertGreaterEqual(metrics['avg_cpu_power'], 0) |
| 201 | + self.assertGreaterEqual(metrics['total_power'], 0) |
| 202 | + self.assertGreaterEqual(metrics['energy_kwh'], 0) |
| 203 | + self.assertGreaterEqual(metrics['co2_kg'], 0) |
| 204 | + |
| 205 | + def test_calculate_energy_metrics_duration_matches_epochs(self): |
| 206 | + """Test that total_duration matches sum of epoch durations.""" |
| 207 | + # Ensure carbon_intensity is set for energy metrics calculation |
| 208 | + if self.parser.carbon_intensity is None: |
| 209 | + self.parser.carbon_intensity = 150.0 # Mock value in gCO2eq/kWh |
| 210 | + |
| 211 | + metrics = self.parser.calculate_energy_metrics() |
| 212 | + expected_duration = sum(epoch['duration'] for epoch in self.parser.epochs) |
| 213 | + self.assertAlmostEqual(metrics['total_duration'], expected_duration, places=5) |
| 214 | + |
| 215 | + def test_generate_plots_returns_buffer(self): |
| 216 | + """Test that generate_plots returns a readable buffer.""" |
| 217 | + # Ensure carbon_intensity is set for CO2 calculations in plots |
| 218 | + if self.parser.carbon_intensity is None: |
| 219 | + self.parser.carbon_intensity = 150.0 # Mock value in gCO2eq/kWh |
| 220 | + |
| 221 | + plots = self.parser.generate_plots() |
| 222 | + |
| 223 | + self.assertIn('combined_plots', plots) |
| 224 | + # The plot should be a BytesIO buffer with read capability |
| 225 | + self.assertTrue(hasattr(plots['combined_plots'], 'read')) |
| 226 | + # Buffer should contain data (PNG header starts with specific bytes) |
| 227 | + data = plots['combined_plots'].read() |
| 228 | + self.assertGreater(len(data), 0) |
| 229 | + # PNG files start with specific bytes: \x89PNG |
| 230 | + self.assertTrue(data.startswith(b'\x89PNG')) |
| 231 | + |
| 232 | + |
| 233 | +class TestLogParserDurationParsing(unittest.TestCase): |
| 234 | + """Tests for LogParser's _parse_duration method.""" |
| 235 | + |
| 236 | + def setUp(self): |
| 237 | + """Create a LogParser instance for testing the duration parser.""" |
| 238 | + self.parser = LogParser("") |
| 239 | + |
| 240 | + def test_parse_duration_hours_minutes_seconds(self): |
| 241 | + """Test parsing HH:MM:SS format.""" |
| 242 | + self.assertEqual(self.parser._parse_duration("1:30:15"), 5415.0) |
| 243 | + |
| 244 | + def test_parse_duration_minutes_seconds(self): |
| 245 | + """Test parsing MM:SS format.""" |
| 246 | + self.assertEqual(self.parser._parse_duration("5:30"), 330.0) |
| 247 | + |
| 248 | + def test_parse_duration_seconds_with_decimals(self): |
| 249 | + """Test parsing HH:MM:SS.ss format with decimals.""" |
| 250 | + self.assertEqual(self.parser._parse_duration("0:00:01.50"), 1.5) |
| 251 | + |
| 252 | + def test_parse_duration_zero(self): |
| 253 | + """Test parsing zero duration.""" |
| 254 | + self.assertEqual(self.parser._parse_duration("0:00:00.00"), 0.0) |
| 255 | + |
| 256 | + def test_parse_duration_large_hours(self): |
| 257 | + """Test parsing large hour values.""" |
| 258 | + # 10 hours, 30 minutes, 45 seconds = 37845 seconds |
| 259 | + self.assertEqual(self.parser._parse_duration("10:30:45"), 37845.0) |
| 260 | + |
| 261 | + |
| 262 | +class TestReportOptionalDependency(unittest.TestCase): |
| 263 | + """Tests for the optional reportlab dependency.""" |
| 264 | + |
| 265 | + def test_generate_report_raises_import_error_without_reportlab(self): |
| 266 | + """Test that generate_report_from_log raises ImportError when reportlab is not installed.""" |
| 267 | + import carbontracker.report as report_module |
| 268 | + from carbontracker.report import generate_report_from_log |
| 269 | + |
| 270 | + # Save original value |
| 271 | + original_value = report_module.REPORTLAB_AVAILABLE |
| 272 | + |
| 273 | + try: |
| 274 | + # Mock reportlab as not available |
| 275 | + report_module.REPORTLAB_AVAILABLE = False |
| 276 | + |
| 277 | + with self.assertRaises(ImportError) as context: |
| 278 | + generate_report_from_log("dummy_log.txt", "dummy_output.pdf") |
| 279 | + |
| 280 | + # Check the error message contains installation instructions |
| 281 | + self.assertIn("pip install carbontracker[pdfreport]", str(context.exception)) |
| 282 | + self.assertIn("reportlab", str(context.exception)) |
| 283 | + finally: |
| 284 | + # Restore original value |
| 285 | + report_module.REPORTLAB_AVAILABLE = original_value |
| 286 | + |
| 287 | + |
| 288 | +if __name__ == "__main__": |
| 289 | + unittest.main() |
0 commit comments