|
1 | 1 | #! /usr/bin/env python3 |
2 | 2 |
|
3 | 3 | import asyncio |
| 4 | +import json |
4 | 5 | import logging |
5 | 6 | import os |
6 | 7 | import re |
|
9 | 10 | import time |
10 | 11 | from functools import wraps |
11 | 12 | from pathlib import Path |
12 | | -from typing import Any, Dict, List, Optional |
| 13 | +from typing import Any, AsyncGenerator, Dict, List, Optional |
13 | 14 |
|
14 | 15 | from commonwealth.utils.apis import GenericErrorHandlingRoute, PrettyJSONResponse |
15 | 16 | from commonwealth.utils.logs import InterceptHandler, init_logger |
16 | 17 | from commonwealth.utils.sentry_config import init_sentry_async |
| 18 | +from commonwealth.utils.streaming import streamer |
17 | 19 | from fastapi import APIRouter, FastAPI, HTTPException, Query, status |
| 20 | +from fastapi.responses import StreamingResponse |
18 | 21 | from fastapi_versioning import VersionedFastAPI, versioned_api_route |
19 | 22 | from loguru import logger |
20 | 23 | from pydantic import BaseModel, Field |
@@ -56,6 +59,13 @@ class DiskSpeedResult(BaseModel): |
56 | 59 | error: Optional[str] = Field(None, description="Error message if test failed") |
57 | 60 |
|
58 | 61 |
|
| 62 | +class DiskSpeedTestPoint(BaseModel): |
| 63 | + size_mb: int = Field(..., description="Test size in MB") |
| 64 | + write_speed: Optional[float] = Field(None, description="Write speed in MiB/s") |
| 65 | + read_speed: Optional[float] = Field(None, description="Read speed in MiB/s") |
| 66 | + total_tests: Optional[int] = Field(None, description="Total number of tests in the sequence") |
| 67 | + |
| 68 | + |
59 | 69 | DiskNode.update_forward_refs() |
60 | 70 |
|
61 | 71 |
|
@@ -304,19 +314,8 @@ def parse_disktest_speed(output: str) -> tuple[Optional[float], Optional[float], |
304 | 314 |
|
305 | 315 |
|
306 | 316 | # pylint: disable=too-many-locals |
307 | | -@disk_router.get( |
308 | | - "/speed", |
309 | | - response_model=DiskSpeedResult, |
310 | | - summary="Run disk speed test using disktest binary.", |
311 | | -) |
312 | | -@to_http_exception |
313 | | -async def disk_speed( |
314 | | - size_bytes: int = Query( |
315 | | - 1024 * 1024 * 1024, |
316 | | - ge=1024 * 1024, |
317 | | - description="Number of bytes to test (default 1 GiB).", |
318 | | - ), |
319 | | -) -> DiskSpeedResult: |
| 317 | +async def run_single_speed_test(size_bytes: int) -> DiskSpeedResult: |
| 318 | + """Run a single disk speed test and return the result.""" |
320 | 319 | disktest_binary = "disktest" |
321 | 320 | temp_file_path: Optional[Path] = None |
322 | 321 |
|
@@ -408,6 +407,57 @@ async def disk_speed( |
408 | 407 | logger.warning(f"Failed to clean up temporary file {temp_file_path}: {e}") |
409 | 408 |
|
410 | 409 |
|
| 410 | +@disk_router.get( |
| 411 | + "/speed", |
| 412 | + response_model=DiskSpeedResult, |
| 413 | + summary="Run disk speed test using disktest binary.", |
| 414 | +) |
| 415 | +@to_http_exception |
| 416 | +async def disk_speed( |
| 417 | + size_bytes: int = Query( |
| 418 | + 1024 * 1024 * 1024, |
| 419 | + ge=1024 * 1024, |
| 420 | + description="Number of bytes to test (default 1 GiB).", |
| 421 | + ), |
| 422 | +) -> DiskSpeedResult: |
| 423 | + return await run_single_speed_test(size_bytes) |
| 424 | + |
| 425 | + |
| 426 | +async def multi_size_speed_test_generator() -> AsyncGenerator[str, None]: |
| 427 | + """Generator that runs speed tests at multiple sizes and yields JSON results.""" |
| 428 | + test_sizes_mb = [10, 50, 100, 200] |
| 429 | + total_tests = len(test_sizes_mb) |
| 430 | + |
| 431 | + for size_mb in test_sizes_mb: |
| 432 | + size_bytes = size_mb * 1024 * 1024 |
| 433 | + result = await run_single_speed_test(size_bytes) |
| 434 | + |
| 435 | + point = DiskSpeedTestPoint( |
| 436 | + size_mb=size_mb, |
| 437 | + write_speed=result.write_speed_mbps, |
| 438 | + read_speed=result.read_speed_mbps, |
| 439 | + total_tests=total_tests, |
| 440 | + ) |
| 441 | + yield json.dumps(point.dict()) |
| 442 | + |
| 443 | + |
| 444 | +@disk_router.get( |
| 445 | + "/speed/stream", |
| 446 | + summary="Run multi-size disk speed test with streaming results.", |
| 447 | +) |
| 448 | +async def disk_speed_stream() -> StreamingResponse: |
| 449 | + return StreamingResponse( |
| 450 | + streamer(multi_size_speed_test_generator(), heartbeats=1.0), |
| 451 | + media_type="application/x-ndjson", |
| 452 | + headers={ |
| 453 | + "Content-Type": "application/x-ndjson", |
| 454 | + "Cache-Control": "no-cache", |
| 455 | + "Connection": "keep-alive", |
| 456 | + "X-Accel-Buffering": "no", |
| 457 | + }, |
| 458 | + ) |
| 459 | + |
| 460 | + |
411 | 461 | fast_api_app = FastAPI( |
412 | 462 | title="Disk Usage API", |
413 | 463 | description="Inspect disk usage and delete files using du.", |
|
0 commit comments