Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[pytest]
addopts = --timeout=60 --cov=src --cov-branch --cov-report=html --capture=fd --tb=native
addopts = --timeout=60 --cov=skvaider --cov=aramaki --cov-branch --cov-report=html --capture=fd --tb=native
junit_family = legacy
consider_namespace_packages = true
asyncio_mode=auto
Expand Down
65 changes: 50 additions & 15 deletions src/skvaider/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,44 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send):
return result


class BodyBuffer:
"""Fixed-size capture buffer that keeps the head and tail of ingested bytes.

When total bytes exceed max_bytes, the middle is replaced with a marker
so both the start and end of long bodies are visible in debug output.
"""

DEFAULT_MAX_BYTES = 500 * 1024

def __init__(self, max_bytes: int = DEFAULT_MAX_BYTES):
self._head_limit = max_bytes // 2
self._tail_limit = max_bytes // 2
self._head: bytearray = bytearray()
self._tail: bytearray = bytearray()
self._total: int = 0

def ingest(self, chunk: bytes) -> None:
self._total += len(chunk)
if len(self._head) < self._head_limit:
space = self._head_limit - len(self._head)
take = min(space, len(chunk))
self._head.extend(chunk[:take])
chunk = chunk[take:]
if chunk:
self._tail.extend(chunk)
excess = len(self._tail) - self._tail_limit
if excess > 0:
del self._tail[:excess]

@property
def data(self) -> bytes:
if self._total <= self._head_limit + self._tail_limit:
return bytes(self._head) + bytes(self._tail)
missing = self._total - self._head_limit - self._tail_limit
marker = f"\n\n[... {missing} bytes omitted ...]\n\n".encode()
return bytes(self._head) + marker + bytes(self._tail)


class DebugRecorder:
# XXX async file io!
temp_file: Path | None
Expand All @@ -111,8 +149,6 @@ class DebugRecorder:
enabled: bool = False
debug_id: str = ""

MAX_REQUEST_BUFFER = 500 * 1024 # 500k max buffer for requests/responses

def __init__(
self,
request: Request,
Expand All @@ -129,7 +165,7 @@ def __init__(
self.slow_threshold = slow_threshold
self.request = request
self.temp_file = None
self.captured_request_body = b""
self._req_buffer = BodyBuffer()

self.time_start = time.time()

Expand All @@ -141,17 +177,20 @@ def __init__(
self.enabled = True
self.triggers.append("header")

self.captured_response_body = b""
self._resp_buffer = BodyBuffer()

@property
def captured_request_body(self) -> bytes:
return self._req_buffer.data

@property
def captured_response_body(self) -> bytes:
return self._resp_buffer.data

async def capture_request(self) -> Message:
message = await self._orig_receive()
if message["type"] == "http.request":
chunk = message.get("body", b"")
remaining_buffer_size = self.MAX_REQUEST_BUFFER - len(
self.captured_request_body
)
self.captured_request_body += chunk[:remaining_buffer_size]

self._req_buffer.ingest(message.get("body", b""))
return message

async def capture_response(self, message: Message):
Expand All @@ -161,11 +200,7 @@ async def capture_response(self, message: Message):
k.decode(): v.decode() for k, v in message.get("headers", [])
}
elif message["type"] == "http.response.body":
chunk = message.get("body", b"")
remaining_buffer_size = self.MAX_REQUEST_BUFFER - len(
self.captured_response_body
)
self.captured_response_body += chunk[:remaining_buffer_size]
self._resp_buffer.ingest(message.get("body", b""))
await self._orig_send(message)

def trigger_flag(self):
Expand Down
43 changes: 0 additions & 43 deletions src/skvaider/test_debug.py

This file was deleted.

132 changes: 132 additions & 0 deletions src/skvaider/tests/test_debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock

import pytest
from fastapi.testclient import TestClient

from skvaider.debug import BodyBuffer, DebugRecorder


@pytest.fixture
def recorder(tmp_path: Path) -> DebugRecorder:
request = MagicMock()

def get(key: str, default: Any = None) -> Any:
return default

request.headers.get.side_effect = get
return DebugRecorder(
request=request,
directory=tmp_path,
receive=AsyncMock(),
send=AsyncMock(),
)


def test_no_debug_by_default(
tmp_path: Path, client: TestClient, auth_header: None, llm_model_name: str
):
response = client.post(
"/openai/v1/completions",
json={"model": "gemma"},
)
request_id = response.headers["x-skvaider-request-id"]
assert not (tmp_path / "debug" / f"{request_id}.request").exists()
assert not (tmp_path / "debug" / f"{request_id}.response").exists()


def test_debug_via_header(
tmp_path: Path, client: TestClient, auth_header: None, llm_model_name: str
):
response = client.post(
"/openai/v1/completions",
headers={"x-skvaider-debug": "yes"},
json={"model": "gemma"},
)
request_id = response.headers["x-skvaider-request-id"]
assert (tmp_path / "debug" / f"{request_id}.request").exists()
assert (tmp_path / "debug" / f"{request_id}.response").exists()


def test_no_debug_if_unauthenticated(
tmp_path: Path, client: TestClient, llm_model_name: str
):
response = client.post(
"/openai/v1/completions",
headers={"x-skvaider-debug": "yes"},
json={"model": "gemma"},
)
assert response.status_code == 403
request_id = response.headers["x-skvaider-request-id"]
assert not (tmp_path / "debug" / f"{request_id}.request").exists()
assert not (tmp_path / "debug" / f"{request_id}.response").exists()
assert not (tmp_path / "debug" / f"{request_id}.response").exists()


# BodyBuffer unit tests (use tiny max_bytes so tests stay fast and readable)


def test_body_buffer_small_body_unchanged():
buf = BodyBuffer(max_bytes=20)
buf.ingest(b"hello world")
assert buf.data == b"hello world"


def test_body_buffer_at_limit_unchanged():
buf = BodyBuffer(max_bytes=10)
buf.ingest(b"1234567890")
assert buf.data == b"1234567890"


def test_body_buffer_multi_chunk_under_limit_unchanged():
# Two chunks that straddle the head/tail boundary must still be byte-identical.
buf = BodyBuffer(max_bytes=10) # head=5, tail=5
buf.ingest(b"12345")
buf.ingest(b"6789") # total 9 bytes — just under limit
assert buf.data == b"123456789"


def test_body_buffer_over_limit_cuts_from_middle():
buf = BodyBuffer(max_bytes=10) # head=5, tail=5
data = b"HHHHH" + b"M" * 6 + b"TTTTT" # 16 bytes; 6 in the middle are lost
buf.ingest(data)
result = buf.data
assert result[:5] == b"HHHHH"
assert result[-5:] == b"TTTTT"
assert b"omitted" in result
assert b"M" not in result


def test_body_buffer_over_limit_multi_chunk():
buf = BodyBuffer(max_bytes=10) # head=5, tail=5
buf.ingest(b"HHHHH") # fills head
buf.ingest(b"MMMM") # goes to tail
buf.ingest(b"TTTTT") # pushes out the M bytes
result = buf.data
assert result[:5] == b"HHHHH"
assert result[-5:] == b"TTTTT"
assert b"omitted" in result


# DebugRecorder integration tests — only public API, no private attribute access


async def test_recorder_request_body_captured(recorder: DebugRecorder):
data = b"the request body"
recorder._orig_receive = AsyncMock(
return_value={"type": "http.request", "body": data}
) #
await recorder.capture_request()
assert recorder.captured_request_body == data


async def test_recorder_response_body_captured(recorder: DebugRecorder):
data = b"the response body"
await recorder.capture_response(
{"type": "http.response.start", "status": 200, "headers": []}
)
await recorder.capture_response(
{"type": "http.response.body", "body": data}
)
assert recorder.captured_response_body == data
Loading