diff --git a/README.md b/README.md index 4b0c4a3..6c185c4 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ A load testing suite for running benchmarks on self-hosted Braintrust data plane ## Overview -This suite currently supports three types of tests: +This suite currently supports four types of tests: - **Load Test**: Spawns simulated users to bombard the data plane with logs, simulating production traffic @@ -12,6 +12,8 @@ This suite currently supports three types of tests: - **Functional Test**: Exercises core API create/read/delete flows across key Braintrust resources +- **Smoke Test**: Exercises the Topics pipeline on a disposable project with a small synthetic trace set + The suite can be extended to support additional test types in the future, and that is a goal. Each test is highly configurable via the `braintest.yaml` config file. The tests should be configured to simulate a customer's expected load and usage patterns. We want to ensure that the infra Braintrust is hosted on can handle the customer's use case, and size up components accordingly if the tests fail. @@ -63,11 +65,20 @@ To override any config value via environment variable, use `__` (double undersco | `loadtest.processes` | `LOADTEST__PROCESSES` | | `evaltest.trial_count` | `EVALTEST__TRIAL_COUNT` | | `functionaltest.name_prefix` | `FUNCTIONALTEST__NAME_PREFIX` | +| `smoketest.count` | `SMOKETEST__COUNT` | Example: ```bash BRAINTRUST__API_URL=https://my-api.example.com LOADTEST__PROCESSES=8 python main.py ``` +The Topics smoke test is disabled by default because it creates a disposable +project and exercises model-backed Topics processing. Enable it with +`smoketest.run: True`, or run it directly with: + +```bash +uv run python smoke_test/run.py --dry-run +``` + ## Important Notes -- No actual LLM calls are made in any of these tests. Everything is mocked. The purpose is to load test Braintrust infra, not the LLM provider. +- Load, eval, and functional tests do not make actual LLM calls. The Topics smoke test exercises model-backed Topics processing, so keep it disabled unless you intentionally want to test that path. diff --git a/braintest.yaml b/braintest.yaml index adc8622..ed923b5 100644 --- a/braintest.yaml +++ b/braintest.yaml @@ -6,6 +6,26 @@ functionaltest: run: True # Whether or not to run this test name_prefix: functional-test # Prefix used when creating test resources +smoketest: + run: False # Whether or not to run the Topics smoke test. This creates a disposable project. + org: null # Optional org name. If null, the script discovers it from BRAINTRUST_API_KEY. + api_url: null # Optional dataplane API URL override. If null, the script discovers it from BRAINTRUST_API_KEY. + project_prefix: topics-smoke # Created project names are suffixed with a UTC timestamp + count: 105 # Topics generation needs at least 100 facet summaries + idle_seconds: 10 # Minimum idle window before Topics processing runs + topic_window: 1h # Backfill window for Topics automation + generation_cadence: 1h # Topics automation rerun cadence + relabel_overlap: 10m # Overlap window used for relabeling + allow_below_threshold: False # Only set True when intentionally testing pre-generation behavior + skip_facet_preflight: False # Skip direct and async-batch preflight calls before bulk insert + skip_running_check: False # Skip polling for initial running or processed Topics rows + timeout: 60 # Generic API timeout in seconds + facet_preflight_timeout: 30 # Timeout for direct Topics preflight calls + function_visibility_timeout: 60 # Time to wait for saved function refs to resolve + function_visibility_interval: 2 # Poll interval for saved function visibility + running_check_timeout: 90 # Time to wait for Topics rows to start running or processing + running_check_interval: 3 # Poll interval for Topics status checks + evaltest: run: True # Whether or not to run this test project_id: null # if null or blank, will create a new project with name above diff --git a/config.py b/config.py index c7f1cc3..3907ea2 100644 --- a/config.py +++ b/config.py @@ -27,6 +27,27 @@ class EvalTestConfig(BaseModel): dataset: DatasetConfig = DatasetConfig() +class SmokeTestConfig(BaseModel): + run: bool = False + org: str | None = None + api_url: str | None = None + project_prefix: str = "topics-smoke" + count: int = 105 + idle_seconds: int = 10 + topic_window: str = "1h" + generation_cadence: str = "1h" + relabel_overlap: str = "10m" + allow_below_threshold: bool = False + skip_facet_preflight: bool = False + skip_running_check: bool = False + timeout: int = 60 + facet_preflight_timeout: int = 30 + function_visibility_timeout: int = 60 + function_visibility_interval: int = 2 + running_check_timeout: int = 90 + running_check_interval: int = 3 + + class WaitTimeConfig(BaseModel): min: int = 5 max: int = 10 @@ -81,6 +102,7 @@ class Settings(BaseSettings): braintrust: BraintrustConfig = BraintrustConfig() functionaltest: FunctionalTestConfig = FunctionalTestConfig() evaltest: EvalTestConfig = EvalTestConfig() + smoketest: SmokeTestConfig = SmokeTestConfig() loadtest: LoadTestConfig = LoadTestConfig() @classmethod diff --git a/docs/smoke_test.md b/docs/smoke_test.md new file mode 100644 index 0000000..105adec --- /dev/null +++ b/docs/smoke_test.md @@ -0,0 +1,151 @@ +# Topics Smoke Test + +This folder contains a small support-only script for exercising Braintrust +Topics on a fresh disposable project. + +## What It Does + +`smoke_test/run.py`: + +1. Uses `BRAINTRUST_API_KEY` to call the fixed app URL, + `https://www.braintrust.dev/api/apikey/login`. +2. Discovers the org and dataplane `api_url` from that login response. +3. Creates a fresh project named like `topics-smoke-20260515-143022`. +4. Uses the built-in `Task` facet and creates a project-local topic-map function. +5. Inserts one synthetic root LLM trace and runs direct and async-batch gateway + preflights through the normal Topics path. +6. Inserts the remaining traces to reach 105 total. +7. Enables a minimal Topics automation using those saved functions. +8. Queues the Topics automation to run. +9. Polls until the `Task` facet shows the expected 105 running or processed traces. + +The default count is 105 because topic generation needs at least 100 facet +summaries. One facet keeps the model work and Baseten traffic as small as +possible while still exercising the pipeline. + +The generated spans use `span_attributes.type = "llm"` because Topics builds +facet input through Brainstore's thread preprocessor, which only includes LLM +spans or spans with no explicit type. + +The script intentionally does not expose facet or model override flags. It uses +the product's built-in `Task` facet and lets the backend choose the supported +facet model for the dataplane. + +The preflight catches model routing or preprocessor failures before the +remaining 104 traces are inserted and before Topics is queued. It also calls +`/function/invoke-async-batch` with the gateway header because that is the path +used by automation workers. Skip it only when testing the automation path +without extra preflight calls: + +```bash +uv run python smoke_test/run.py --skip-facet-preflight +``` + +## Usage + +Dry run without creating projects, traces, or automations: + +```bash +uv run python smoke_test/run.py --dry-run +``` + +The dry-run output includes the fixed app URL plus the org and dataplane API URL +that would be used. If no API key is present, those fields are only populated +from explicit arguments or environment variables. + +Real run: + +```bash +export BRAINTRUST_API_KEY=... +uv run python smoke_test/run.py +``` + +Run through the full suite by enabling the `smoketest` section in +`braintest.yaml`: + +```yaml +smoketest: + run: True +``` + +The direct CLI and `main.py` entrypoint both use `braintest.yaml` defaults. +CLI flags still override those defaults for one-off runs. + +The real run prints the app URL, org, and dataplane API URL before creating the +project. On success, the final line is a clickable Topics results link for the +created project. + +By default, the script waits up to 90 seconds for the initial facet work to show +the expected running or processed count: + +```bash +uv run python smoke_test/run.py --running-check-timeout 120 --running-check-interval 5 +``` + +The Topics status query can temporarily move while the automation cursor +advances. The script keeps polling until the expected traces are either running +or already processed, and fails if neither count appears before the timeout. + +After the running or processed count appears, full topic results are not +necessarily immediate. With the default `--idle-seconds 10`, the runtime +rechecks active Topics states about every 10 seconds. A healthy tiny project +should usually move from `waiting_for_facets` to topic generation and backfill +within a few minutes, but vendor/model latency and retry behavior can stretch +that. If it remains in `waiting_for_facets` with `ready_topic_maps: 0` after +several checks, the facet summaries are not becoming ready. + +Skip that verification only when you want the old fire-and-forget behavior: + +```bash +uv run python smoke_test/run.py --skip-running-check +``` + +If the API key belongs to multiple orgs: + +```bash +uv run python smoke_test/run.py --org braintrustdata.com +``` + +If discovery fails or you need to force a dataplane: + +```bash +uv run python smoke_test/run.py --org braintrustdata.com --api-url https://example.cloudfront.net +``` + +Check the project after creation: + +```bash +uv run python smoke_test/run.py status --project topics-smoke-20260515-143022 +``` + +## Expected States + +Topics may move through these states: + +- `waiting_for_facets`: facet summaries are still being processed or fewer than + 100 summaries are ready. +- `recomputing_topics`: topic maps are being generated from the summaries. +- `pending_logs_processing`: topics are ready and classifications are being + prepared. +- `processing_logs`: classifications are being written back to logs. +- `idle`: the automation has no immediate work queued. + +## Cost Guardrails + +- Default traces: 105. +- Facet: built-in `Task` only. +- No facet model override is sent; the backend default is used. +- Direct and async-batch preflight calls are run before the remaining traces are + seeded. +- Counts below 100 are rejected unless `--allow-below-threshold` is passed. +- Each run creates a fresh project so results are isolated and easy to inspect. +- The post-queue running check fails at timeout if rows are still attempted but + not completed. + +## Local Checks + +```bash +uv run python -m py_compile smoke_test/run.py +uv run python -m unittest smoke_test/test_topics_smoke.py +uv run python smoke_test/run.py --dry-run +``` diff --git a/main.py b/main.py index 45d369b..c7b4d78 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -Main script to orchestrate functionaltest, evaltest, and loadtest execution -based on braintest.yaml config. +Main script to orchestrate functionaltest, smoketest, evaltest, and loadtest +execution based on braintest.yaml config. """ import os @@ -43,6 +43,21 @@ def run_functionaltest(config): return False +def run_smoketest(config): + try: + subprocess.run( + [sys.executable, "smoke_test/run.py"], + check=True, + capture_output=False, + env={**os.environ, "PYTHONPATH": "."}, + ) + print("Smoke test completed successfully.") + return True + except subprocess.CalledProcessError as e: + print(f"Smoke test failed with error code {e.returncode}") + return False + + def run_loadtest(config): print("Load Test") @@ -211,6 +226,14 @@ def main(): print("Functional test is not enabled. Skipping...") results["functionaltest"] = "SKIPPED" + if config.get("smoketest", {}).get("run", False): + print("\n-----Running Smoke Test-----") + smoketest_success = run_smoketest(config) + results["smoketest"] = "SUCCESS" if smoketest_success else "FAILED" + else: + print("\nSmoke test is not enabled. Skipping...") + results["smoketest"] = "SKIPPED" + if config.get("evaltest", {}).get("run", False): print("\n-----Running Eval Test-----") evaltest_success = run_evaltest(config) diff --git a/smoke_test/run.py b/smoke_test/run.py new file mode 100644 index 0000000..d8eff75 --- /dev/null +++ b/smoke_test/run.py @@ -0,0 +1,1344 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# /// +"""Create a tiny disposable Braintrust project for Topics smoke testing. + +This script discovers the dataplane API URL from the API key when possible, +creates a fresh timestamped project, enables a minimal Topics automation, +inserts enough traces to cross the Topics generation threshold, and queues the +automation. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.parse import quote, urlencode +from urllib.request import Request, urlopen + +APP_URL = "https://www.braintrust.dev" +DEFAULT_COUNT = 105 +MIN_TOPIC_SUMMARIES = 100 +DEFAULT_FACET = "Task" +TOPIC_MAP_EMBEDDING_MODEL = "brain-embedding-1" +DEFAULT_PROJECT_PREFIX = "topics-smoke" +DEFAULT_TIMEOUT_SECONDS = 60 +DEFAULT_RUNNING_CHECK_TIMEOUT_SECONDS = 90 +DEFAULT_RUNNING_CHECK_INTERVAL_SECONDS = 3 +DEFAULT_PREFLIGHT_TIMEOUT_SECONDS = 30 +DEFAULT_FUNCTION_VISIBILITY_TIMEOUT_SECONDS = 60 +DEFAULT_FUNCTION_VISIBILITY_INTERVAL_SECONDS = 2 +TASK_FACET_PROMPT = """What is the user's overall request or goal in this conversation? + +Respond with a single sentence starting with "User wants to..." + +Focus on the high-level intent, not specific details. If multiple requests, describe the main one. + +Examples: +- "User wants to debug why their API calls are returning errors" +- "User wants to create a new LLM-based evaluation scorer" +- "User wants to understand how to interpret experiment results" +- "User wants to optimize their prompt for better accuracy" + +If no clear request is present, respond: "NONE" +""" + + +class SmokeError(Exception): + """Raised for expected user-facing failures.""" + + +@dataclass(frozen=True) +class AuthContext: + api_key: str + org_id: str + org_name: str + api_url: str + + +@dataclass(frozen=True) +class Project: + id: str + name: str + + +def json_request( + method: str, + url: str, + *, + api_key: str | None = None, + body: Any | None = None, + org_name: str | None = None, + extra_headers: dict[str, str] | None = None, + timeout: int = DEFAULT_TIMEOUT_SECONDS, +) -> Any: + data = None if body is None else json.dumps(body).encode("utf-8") + headers = { + "Accept": "application/json", + "User-Agent": "braintrust-topics-smoke/1.0", + } + if data is not None: + headers["Content-Type"] = "application/json" + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + if org_name: + headers["x-bt-org-name"] = org_name + if extra_headers: + headers.update(extra_headers) + + request = Request(url, data=data, headers=headers, method=method) + try: + with urlopen(request, timeout=timeout) as response: + text = response.read().decode("utf-8") + except HTTPError as exc: + error_text = exc.read().decode("utf-8", errors="replace") + raise SmokeError(f"{method} {url} failed with HTTP {exc.code}: {error_text}") from exc + except URLError as exc: + raise SmokeError(f"{method} {url} failed: {exc.reason}") from exc + + if not text: + return None + try: + return json.loads(text) + except json.JSONDecodeError as exc: + raise SmokeError(f"{method} {url} returned non-JSON response: {text[:500]}") from exc + + +def require_api_key(args: argparse.Namespace) -> str: + api_key = args.api_key or os.environ.get("BRAINTRUST_API_KEY") + if not api_key: + raise SmokeError("BRAINTRUST_API_KEY or --api-key is required") + return api_key + + +def discover_auth(args: argparse.Namespace) -> AuthContext: + api_key = require_api_key(args) + requested_org = args.org or os.environ.get("BRAINTRUST_ORG_NAME") + forced_api_url = args.api_url or os.environ.get("BRAINTRUST_API_URL") + + org_info: list[dict[str, Any]] = [] + discovery_error: SmokeError | None = None + try: + login = json_request( + "POST", + f"{APP_URL}/api/apikey/login", + api_key=api_key, + timeout=args.timeout, + ) + org_info = list(login.get("org_info") or []) + except SmokeError as exc: + discovery_error = exc + + selected = select_org(org_info, requested_org) if org_info else None + + if selected is None and discovery_error and not (forced_api_url and requested_org): + raise SmokeError( + "Could not discover org/api_url from the API key. Provide --org and --api-url " + "or BRAINTRUST_ORG_NAME and BRAINTRUST_API_URL.\n" + f"Discovery error: {discovery_error}" + ) + + if selected is None: + selected = { + "id": "", + "name": requested_org, + "api_url": forced_api_url, + } + + api_url = (forced_api_url or selected.get("api_url") or "").rstrip("/") + org_name = selected.get("name") or requested_org + if not org_name: + raise SmokeError("Could not determine org. Provide --org or BRAINTRUST_ORG_NAME.") + if not api_url: + raise SmokeError("Could not determine API URL. Provide --api-url or BRAINTRUST_API_URL.") + + return AuthContext( + api_key=api_key, + org_id=str(selected.get("id") or ""), + org_name=str(org_name), + api_url=api_url, + ) + + +def select_org(org_info: list[dict[str, Any]], requested_org: str | None) -> dict[str, Any] | None: + if not org_info: + return None + if requested_org: + for org in org_info: + if org.get("name") == requested_org: + return org + names = ", ".join(str(org.get("name")) for org in org_info) + raise SmokeError(f"Organization {requested_org!r} was not found. Must be one of: {names}") + if len(org_info) == 1: + return org_info[0] + names = ", ".join(str(org.get("name")) for org in org_info) + raise SmokeError( + "API key belongs to multiple organizations. Provide --org or BRAINTRUST_ORG_NAME. " + f"Available orgs: {names}" + ) + + +def api_url(ctx: AuthContext, path: str, query: dict[str, str] | None = None) -> str: + suffix = path if path.startswith("/") else f"/{path}" + url = f"{ctx.api_url}{suffix}" + if query: + url = f"{url}?{urlencode(query)}" + return url + + +def app_project_topics_url(org_name: str, project_name: str) -> str: + return ( + f"{APP_URL}/app/{quote(org_name, safe='')}/p/" + f"{quote(project_name, safe='')}/topics" + ) + + +def terminal_hyperlink(label: str, url: str) -> str: + return f"\033]8;;{url}\a{label}\033]8;;\a ({url})" + + +def api_get(ctx: AuthContext, path: str, query: dict[str, str] | None = None) -> Any: + return json_request( + "GET", + api_url(ctx, path, query), + api_key=ctx.api_key, + org_name=ctx.org_name, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + +def api_post( + ctx: AuthContext, + path: str, + body: Any, + *, + extra_headers: dict[str, str] | None = None, +) -> Any: + return json_request( + "POST", + api_url(ctx, path), + api_key=ctx.api_key, + org_name=ctx.org_name, + body=body, + extra_headers=extra_headers, + timeout=DEFAULT_TIMEOUT_SECONDS, + ) + + +def create_project(ctx: AuthContext, name: str) -> Project: + response = api_post(ctx, "/v1/project", {"name": name, "org_name": ctx.org_name}) + return Project(id=str(response["id"]), name=str(response["name"])) + + +def get_project_by_name(ctx: AuthContext, name: str) -> Project | None: + response = api_get( + ctx, + "/v1/project", + {"org_name": ctx.org_name, "project_name": name}, + ) + objects = response.get("objects") or [] + if not objects: + return None + return Project(id=str(objects[0]["id"]), name=str(objects[0]["name"])) + + +def create_fresh_project(ctx: AuthContext, base_name: str) -> Project: + for attempt in range(5): + name = base_name if attempt == 0 else f"{base_name}-{attempt + 1}" + existing = get_project_by_name(ctx, name) + if existing is None: + return create_project(ctx, name) + raise SmokeError(f"Could not find an unused project name based on {base_name!r}") + + +def list_topic_automations(ctx: AuthContext, project_id: str) -> list[dict[str, Any]]: + response = api_get(ctx, "/v1/project_automation", {"project_id": project_id}) + rows = response.get("objects") or [] + return [ + row + for row in rows + if isinstance(row.get("config"), dict) and row["config"].get("event_type") == "topic" + ] + + +def enable_topics(ctx: AuthContext, project: Project, args: argparse.Namespace) -> dict[str, Any]: + existing = list_topic_automations(ctx, project.id) + if existing: + raise SmokeError( + f"Project {project.name!r} already has {len(existing)} Topics automation(s); " + "fresh smoke projects should start empty." + ) + + facet_names, facet_functions, topic_map_functions = create_topic_function_refs( + ctx, + project.id, + ) + return register_topics_automation( + ctx, + project, + args, + facet_functions=facet_functions, + topic_map_functions=topic_map_functions, + ) + + +def create_topic_function_refs( + ctx: AuthContext, + project_id: str, +) -> tuple[list[str], list[dict[str, Any]], list[dict[str, Any]]]: + facet_names = [DEFAULT_FACET] + facet_functions = [ + {"type": "global", "name": DEFAULT_FACET, "function_type": "facet"} + ] + topic_map_functions = create_topic_map_functions( + ctx, + project_id, + facet_names, + ) + return facet_names, facet_functions, topic_map_functions + + +def register_topics_automation( + ctx: AuthContext, + project: Project, + args: argparse.Namespace, + *, + facet_functions: list[dict[str, Any]], + topic_map_functions: list[dict[str, Any]], +) -> dict[str, Any]: + body = { + "project_automation_name": "Topics smoke", + "description": "Minimal disposable Topics smoke-test automation", + "project_id": project.id, + "config": { + "event_type": "topic", + "sampling_rate": 1.0, + "facet_functions": facet_functions, + "topic_map_functions": topic_map_functions, + "scope": {"type": "trace", "idle_seconds": args.idle_seconds}, + "rerun_seconds": args.generation_cadence_seconds, + "relabel_overlap_seconds": args.relabel_overlap_seconds, + "backfill_time_range": format_duration_seconds(args.topic_window_seconds), + }, + "update": True, + } + response = api_post(ctx, "/api/project_automation/register", body) + automation = response.get("project_automation") or response + seed_topic_automation_cursors(ctx, project.id, automation, args.topic_window_seconds) + return automation + + +def create_topic_map_functions( + ctx: AuthContext, + project_id: str, + facet_names: list[str], +) -> list[dict[str, Any]]: + body = { + "functions": [ + { + "project_id": project_id, + "name": facet, + "slug": slugify_topic_map_name(f"{facet}-topic-map"), + "function_type": "classifier", + "function_data": { + "type": "topic_map", + "source_facet": facet, + "embedding_model": TOPIC_MAP_EMBEDDING_MODEL, + }, + "if_exists": "ignore", + } + for facet in facet_names + ] + } + response = api_post(ctx, "/insert-functions", body) + functions = response.get("functions") or [] + if len(functions) != len(facet_names): + raise SmokeError("Failed to create the expected topic map functions") + return [ + {"function": saved_function_ref_from_inserted_function(function_row, response)} + for function_row in functions + ] + + +def saved_function_ref_from_inserted_function( + function_row: dict[str, Any], + response: dict[str, Any], +) -> dict[str, str]: + ref = {"type": "function", "id": str(function_row["id"])} + version = function_row.get("_xact_id") or function_row.get("version") or response.get("xact_id") + if version is not None: + ref["version"] = str(version) + return ref + + +def saved_function_ref_to_use_body(function_ref: dict[str, Any]) -> dict[str, Any]: + if function_ref.get("type") == "function": + function_id = function_ref.get("id") + if not function_id: + raise SmokeError("saved function reference is missing id") + body = {"function_id": str(function_id)} + if function_ref.get("version") is not None: + body["version"] = str(function_ref["version"]) + return body + if function_ref.get("type") == "global": + name = function_ref.get("name") + if not name: + raise SmokeError("global function reference is missing name") + return { + "global_function": str(name), + "function_type": function_ref.get("function_type") or "scorer", + } + raise SmokeError(f"unsupported saved function reference: {function_ref!r}") + + +def verify_function_refs_resolve( + ctx: AuthContext, + project_id: str, + facet_functions: list[dict[str, Any]], + topic_map_functions: list[dict[str, Any]], + *, + timeout_seconds: int, + interval_seconds: int, +) -> None: + refs: list[tuple[str, dict[str, Any], str]] = [] + for ref in facet_functions: + refs.append(("facet", ref, "facet")) + for entry in topic_map_functions: + ref = entry.get("function") + if not isinstance(ref, dict): + raise SmokeError(f"topic map entry is missing function reference: {entry!r}") + refs.append(("topic map", ref, "topic_map")) + + deadline = time.monotonic() + timeout_seconds + last_error: str | None = None + while True: + try: + for label, ref, expected_type in refs: + resolved = api_post( + ctx, + "/function/use", + saved_function_ref_to_use_body(ref), + extra_headers={"x-bt-project-id": project_id}, + ) + function_data = resolved.get("function_data") if isinstance(resolved, dict) else None + actual_type = function_data.get("type") if isinstance(function_data, dict) else None + if actual_type != expected_type: + raise SmokeError( + f"/function/use resolved {label} {saved_function_ref_key(ref)!r} " + f"as function_data.type={actual_type!r}, expected {expected_type!r}" + ) + print(f"Verified /function/use visibility for {len(refs)} saved function reference(s)") + return + except SmokeError as exc: + last_error = str(exc) + if time.monotonic() >= deadline: + raise SmokeError( + "Saved function references did not become visible through /function/use " + f"within {timeout_seconds}s for project {project_id}. Last error: {last_error}" + ) from exc + time.sleep(interval_seconds) + + +def seed_topic_automation_cursors( + ctx: AuthContext, + project_id: str, + automation: dict[str, Any], + window_seconds: int, +) -> None: + automation_id = str(automation["id"]) + config = automation.get("config") or {} + object_id = topic_automation_object_id(project_id, config.get("data_scope")) + start_xact_id = inclusive_start_xact_id_from_epoch_ms( + int(time.time() * 1000) - window_seconds * 1000 + ) + api_post( + ctx, + "/brainstore/automation/reset-cursors", + { + "automation_id": automation_id, + "object_id": object_id, + "start_xact_id": start_xact_id, + }, + ) + poke_topic_automation(ctx, automation_id, object_id) + + +def poke_topic_automation(ctx: AuthContext, automation_id: str, object_id: str) -> None: + api_post( + ctx, + "/brainstore/automation/upsert-object-cursor", + {"automation_id": automation_id, "object_id": object_id}, + ) + + +def insert_smoke_traces(ctx: AuthContext, project: Project, events: list[dict[str, Any]]) -> Any: + return api_post(ctx, f"/v1/project_logs/{quote(project.id, safe='')}/insert", {"events": events}) + + +def format_function_refs(function_refs: list[dict[str, Any]]) -> str: + return ", ".join(str(ref.get("id")) for ref in function_refs) or "(none)" + + +def format_topic_map_refs(topic_map_refs: list[dict[str, Any]]) -> str: + ids = [ + str((ref.get("function") or {}).get("id")) + for ref in topic_map_refs + if isinstance(ref.get("function"), dict) + ] + return ", ".join(ids) or "(none)" + + +def build_trace_events(count: int, *, project_name: str, run_id: str) -> list[dict[str, Any]]: + now = time.time() + events: list[dict[str, Any]] = [] + for index in range(count): + scenario = SCENARIOS[index % len(SCENARIOS)] + variant = index // len(SCENARIOS) + root_id = f"topics-smoke-{run_id}-{index:04d}" + created = datetime.fromtimestamp(now - (count - index), UTC).isoformat() + user_message = scenario["user"].format(n=variant + 1) + assistant_message = scenario["assistant"].format(n=variant + 1) + events.append( + { + "id": root_id, + "span_id": root_id, + "root_span_id": root_id, + "created": created, + "input": [{"role": "user", "content": user_message}], + "output": [{"role": "assistant", "content": assistant_message}], + "metadata": { + "source": "smoke_test/run.py", + "run_id": run_id, + "project_name": project_name, + "scenario": scenario["name"], + "variant": variant + 1, + "synthetic": True, + }, + "tags": ["topics-smoke", scenario["name"]], + "span_attributes": { + "name": f"Topics smoke chat: {scenario['title']}", + "type": "llm", + }, + "metrics": { + "start": now - (count - index) - 0.25, + "end": now - (count - index), + "tokens": 120 + (index % 7), + }, + } + ) + return events + + +SCENARIOS = [ + { + "name": "billing-refund", + "title": "Billing refund", + "user": "I was charged twice for my subscription invoice #{n}. Can you refund the duplicate charge?", + "assistant": "I found the duplicate billing event and opened a refund request for invoice #{n}.", + }, + { + "name": "dataset-import", + "title": "Dataset import", + "user": "Help me import {n} CSV files into a Braintrust dataset and validate the columns.", + "assistant": ( + "I created a dataset import checklist, validated the required columns, and flagged one malformed row." + ), + }, + { + "name": "api-debugging", + "title": "API debugging", + "user": "My API request returns 401 when I run the eval job #{n}. What should I check?", + "assistant": ( + "I checked the auth header, org selection, and project scope, then suggested rotating the API key." + ), + }, + { + "name": "deployment-troubleshooting", + "title": "Deployment troubleshooting", + "user": "The self-hosted dataplane health check is failing after deploy attempt {n}.", + "assistant": "I reviewed the service URL, Redis connectivity, and object-store settings for the dataplane.", + }, + { + "name": "feature-request", + "title": "Feature request", + "user": "Can Braintrust add a dashboard that groups support conversations by product area? Request {n}.", + "assistant": ( + "I captured the product-area dashboard request and suggested using Topics classifications meanwhile." + ), + }, +] + + +def run_create(args: argparse.Namespace) -> int: + validate_args(args) + run_id = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") + project_name = timestamped_project_name(args.project_prefix, run_id) + events = build_trace_events(args.count, project_name=project_name, run_id=run_id) + + if args.dry_run: + auth_preview = preview_auth(args) + print( + json.dumps( + { + "dry_run": True, + "app_url": APP_URL, + "org": auth_preview["org"], + "api_url": auth_preview["api_url"], + "project_name": project_name, + "trace_count": len(events), + "facet": DEFAULT_FACET, + "sample_event": events[0], + "note": auth_preview["note"], + }, + indent=2, + ) + ) + return 0 + + ctx = discover_auth(args) + print(f"App URL: {APP_URL}") + print(f"Org: {ctx.org_name}") + print(f"Dataplane API URL: {ctx.api_url}") + project = create_fresh_project(ctx, project_name) + print(f"Created project: {ctx.org_name} / {project.name} ({project.id})") + + existing = list_topic_automations(ctx, project.id) + if existing: + raise SmokeError( + f"Project {project.name!r} already has {len(existing)} Topics automation(s); " + "fresh smoke projects should start empty." + ) + facet_names, facet_functions, topic_map_functions = create_topic_function_refs( + ctx, + project.id, + ) + print(f"Using built-in Topics facet: {DEFAULT_FACET}") + print(f"Created topic map function(s): {format_topic_map_refs(topic_map_functions)}") + verify_function_refs_resolve( + ctx, + project.id, + facet_functions, + topic_map_functions, + timeout_seconds=args.function_visibility_timeout, + interval_seconds=args.function_visibility_interval, + ) + + if args.skip_facet_preflight: + insert_smoke_traces(ctx, project, events) + print(f"Inserted {len(events)} synthetic trace(s)") + else: + insert_smoke_traces(ctx, project, events[:1]) + print("Inserted 1 synthetic preflight trace") + preflight_topics_pipeline( + ctx, + project, + events[0], + args, + facet_names, + facet_functions, + topic_map_functions, + ) + remaining_events = events[1:] + if remaining_events: + insert_smoke_traces(ctx, project, remaining_events) + print(f"Inserted {len(events)} synthetic trace(s)") + + automation = register_topics_automation( + ctx, + project, + args, + facet_functions=facet_functions, + topic_map_functions=topic_map_functions, + ) + print(f"Enabled Topics automation: {automation.get('name', 'Topics')} ({automation['id']})") + object_id = topic_automation_object_id(project.id, (automation.get("config") or {}).get("data_scope")) + poke_topic_automation(ctx, str(automation["id"]), object_id) + print("Queued Topics processing") + if not args.skip_running_check: + verify_expected_running(ctx, project, automation, [DEFAULT_FACET], args) + print(f"Project name for status checks: {project.name}") + topics_url = app_project_topics_url(ctx.org_name, project.name) + print(f"Open Topics results: {terminal_hyperlink('Topics results', topics_url)}") + return 0 + + +def preview_auth(args: argparse.Namespace) -> dict[str, str | None]: + if args.api_key or os.environ.get("BRAINTRUST_API_KEY"): + try: + ctx = discover_auth(args) + return { + "org": ctx.org_name, + "api_url": ctx.api_url, + "note": "No project, traces, or automations were created.", + } + except SmokeError as exc: + return { + "org": args.org or os.environ.get("BRAINTRUST_ORG_NAME"), + "api_url": args.api_url or os.environ.get("BRAINTRUST_API_URL"), + "note": f"No project, traces, or automations were created. Discovery failed: {exc}", + } + return { + "org": args.org or os.environ.get("BRAINTRUST_ORG_NAME"), + "api_url": args.api_url or os.environ.get("BRAINTRUST_API_URL"), + "note": "No network calls were made because no API key was provided.", + } + + +def run_status(args: argparse.Namespace) -> int: + if not args.project: + raise SmokeError("status requires --project ") + ctx = discover_auth(args) + project = get_project_by_name(ctx, args.project) + if project is None: + raise SmokeError(f"Project {args.project!r} was not found in org {ctx.org_name!r}") + automations = list_topic_automations(ctx, project.id) + summary: dict[str, Any] = { + "org": ctx.org_name, + "project": {"id": project.id, "name": project.name}, + "topics_url": app_project_topics_url(ctx.org_name, project.name), + "automations": [], + } + for automation in automations: + automation_id = str(automation["id"]) + object_id = topic_automation_object_id(project.id, (automation.get("config") or {}).get("data_scope")) + object_cursor = api_post( + ctx, + "/brainstore/automation/get-object-cursors", + {"automation_id": automation_id, "project_id": project.id}, + ) + summary["automations"].append( + { + "id": automation_id, + "name": automation.get("name", "Topics"), + "object_id": object_id, + "next_run_at": object_cursor.get("next_run_at"), + "last_run_at": object_cursor.get("last_run_at"), + "last_error": object_cursor.get("last_error"), + "topic_runtime": object_cursor.get("topic_runtime"), + } + ) + print(json.dumps(summary, indent=2, sort_keys=True)) + return 0 + + +def preflight_topics_pipeline( + ctx: AuthContext, + project: Project, + event: dict[str, Any], + args: argparse.Namespace, + facets: list[str], + facet_functions: list[dict[str, Any]], + topic_map_functions: list[dict[str, Any]], +) -> None: + if len(facets) != 1: + print("Skipping facet preflight because multiple facets were configured") + return + + facet = facets[0] + root_span_id = str(event["root_span_id"]) + saved_body = { + "api_version": 1, + "project_id": project.id, + "org_name": ctx.org_name, + **saved_function_ref_to_use_body(facet_functions[0]), + "input": { + "trace_ref": { + "object_type": "project_logs", + "object_id": project.id, + "root_span_id": root_span_id, + } + }, + "stream": False, + "timeout_ms": args.facet_preflight_timeout * 1000, + } + saved_response = api_post(ctx, "/function/invoke", saved_body) + if saved_response is None: + raise SmokeError(f"Saved facet preflight for {facet!r} returned no response") + print(f"Saved facet preflight succeeded for {facet!r}") + + pipeline_body = { + "api_version": 1, + "project_id": project.id, + "org_name": ctx.org_name, + "inline_function": build_batched_facet_inline_function( + facet, + topic_map_functions[0], + ), + "function_type": "facet", + "name": "Topics smoke pipeline preflight", + "input": { + "trace_ref": { + "object_type": "project_logs", + "object_id": project.id, + "root_span_id": root_span_id, + }, + }, + "stream": False, + "timeout_ms": args.facet_preflight_timeout * 1000, + } + pipeline_response = api_post(ctx, "/function/invoke", pipeline_body) + if not isinstance(pipeline_response, dict) or facet not in pipeline_response: + raise SmokeError( + f"Pipeline preflight for {facet!r} returned unexpected response: {pipeline_response!r}" + ) + print(f"Pipeline preflight succeeded for {facet!r}") + + async_batch_response = api_post( + ctx, + "/function/invoke-async-batch", + [{"request": pipeline_body}], + extra_headers={"x-bt-use-gateway": "true"}, + ) + if not isinstance(async_batch_response, dict) or async_batch_response.get("failed", 0) != 0: + raise SmokeError( + "Async-batch gateway preflight failed before bulk trace insert. " + f"Response: {async_batch_response!r}" + ) + print(f"Async-batch gateway preflight succeeded for {facet!r}") + + +def build_batched_facet_inline_function( + facet: str, + topic_map_function: dict[str, Any], +) -> dict[str, Any]: + topic_map_id = str((topic_map_function.get("function") or {})["id"]) + return { + "type": "batched_facet", + "preprocessor": { + "type": "global", + "name": "thread", + "function_type": "preprocessor", + }, + "facets": [ + { + "name": facet, + "prompt": facet_prompt_for_name(facet), + "embedding_model": TOPIC_MAP_EMBEDDING_MODEL, + "no_match_pattern": "^NONE", + } + ], + "topic_maps": { + facet: [ + { + "function_name": facet, + "topic_map_id": topic_map_id, + "topic_map_data": { + "type": "topic_map", + "source_facet": facet, + "embedding_model": TOPIC_MAP_EMBEDDING_MODEL, + }, + } + ] + }, + } + + +def verify_expected_running( + ctx: AuthContext, + project: Project, + automation: dict[str, Any], + facets: list[str], + args: argparse.Namespace, +) -> None: + if len(facets) != 1: + print("Skipping running-count check because multiple facets were configured") + return + + facet = facets[0] + expected = args.count + deadline = time.monotonic() + args.running_check_timeout + last_counts: dict[str, int] | None = None + function_keys = facet_function_keys_for_running_check(automation, facets) + print( + f"Waiting for Topics facet {facet!r} to show {expected} running or processed trace(s)..." + ) + while True: + counts = get_topic_facet_status_counts( + ctx, + project.id, + str(automation["id"]), + facet, + function_keys, + ) + last_counts = counts + running = counts["running"] + errors = counts["errors"] + processed = counts["processed"] + matched = counts["matched"] + print( + f"Facet {facet}: matched={matched} processed={processed} running={running} errors={errors}" + ) + if errors == 0 and (running >= expected or processed >= expected): + if processed >= expected: + print(f"Verified Topics processed count: {processed}") + else: + print(f"Verified Topics running count: {running}") + return + if errors >= expected: + raise SmokeError( + f"Topics facet {facet!r} reported {errors} error(s), which is at or above " + f"the expected {expected} trace(s). Last counts: {last_counts}" + ) + if time.monotonic() >= deadline: + error_note = ( + f"; last reported errors={errors}" if errors > 0 else "" + ) + raise SmokeError( + f"Timed out waiting for Topics facet {facet!r} to reach {expected} running or processed " + f"trace(s){error_note}. Last counts: {last_counts}" + ) + time.sleep(args.running_check_interval) + + +def get_topic_facet_status_counts( + ctx: AuthContext, + project_id: str, + automation_id: str, + facet: str, + function_keys: list[str] | None = None, +) -> dict[str, int]: + cursor = api_post( + ctx, + "/brainstore/automation/get-cursors", + {"automation_id": automation_id, "project_id": project_id}, + ) + pending_min_executed_xact_id = cursor.get("pending_min_executed_xact_id") + keys = function_keys or [f"global:facet:{facet}"] + inflight_parts = [] + error_parts = [] + completed_parts = [] + for function_key in keys: + base = f'_async_scoring_state.triggered_functions.{quote_btql_path_piece(function_key)}' + attempted = f"{base}.triggered_xact_id IS NOT NULL" + completed = f"{base}.completed_xact_id >= {base}.triggered_xact_id" + inflight = ( + f"{attempted} AND " + f"({base}.completed_xact_id IS NULL OR {base}.completed_xact_id < {base}.triggered_xact_id)" + ) + if pending_min_executed_xact_id is None: + error_condition = f"{base}.attempts > 0" + else: + escaped_xact = escape_btql_literal(str(pending_min_executed_xact_id)) + error_condition = f"{base}.attempts > 0 AND {base}.triggered_xact_id < '{escaped_xact}'" + inflight_parts.append(f"({inflight})") + error_parts.append(f"({inflight} AND {error_condition})") + completed_parts.append(f"({completed})") + inflight_any = " OR ".join(inflight_parts) + error_any = " OR ".join(error_parts) + completed_any = " OR ".join(completed_parts) + facet_path = f"facets.{quote_btql_path_piece(facet)}" + matched = f"{facet_path} != 'NO_MATCH' AND {facet_path} != 'SKIPPED' AND {facet_path} != 'skipped'" + query = ( + f"from: project_logs('{escape_btql_literal(project_id)}') spans | " + "measures: " + f"count(({matched}) ? 1 : null) as matched, " + f"count({facet_path}) as processed, " + f"count(({inflight_any}) ? 1 : null) as inflight, " + f"count(({error_any}) ? 1 : null) as errors, " + f"count(({completed_any}) ? 1 : null) as completed | " + "filter: created >= NOW() - INTERVAL 3600 SECOND" + ) + response = api_post( + ctx, + "/btql", + { + "query": query, + "fmt": "json", + "use_brainstore": True, + "brainstore_realtime": True, + "brainstore_ephemeral_wal": True, + "brainstore_skip_backfill_check": False, + "use_columnstore": True, + "api_version": 1, + "query_source": "topics-smoke-running-check", + }, + ) + row = (response.get("data") or [{}])[0] + errors = read_count(row, "errors") + inflight_count = read_count(row, "inflight") + return { + "matched": read_count(row, "matched"), + "processed": read_count(row, "processed"), + "running": max(0, inflight_count - errors), + "errors": errors, + "completed": read_count(row, "completed"), + } + + +def facet_prompt_for_name(facet: str) -> str: + if facet == DEFAULT_FACET: + return TASK_FACET_PROMPT + return ( + f"Classify this conversation for the {facet} facet. " + "Return a concise label, or NONE if there is no clear match." + ) + + +def saved_function_ref_key(function_ref: dict[str, Any]) -> str: + if function_ref.get("type") == "function": + function_id = function_ref.get("id") + if not function_id: + raise SmokeError("saved function reference is missing id") + return f"function_id:{function_id}" + function_type = function_ref.get("function_type") or "scorer" + name = function_ref.get("name") + if not name: + raise SmokeError("global function reference is missing name") + return f"global:{function_type}:{name}" + + +def facet_function_keys_for_running_check( + automation: dict[str, Any], + facets: list[str], +) -> list[str]: + refs = (automation.get("config") or {}).get("facet_functions") or [] + if len(refs) == len(facets): + return [saved_function_ref_key(ref) for ref in refs] + if len(facets) == 1: + return [f"global:facet:{facets[0]}"] + raise SmokeError("could not determine Topics facet function keys for running check") + + +def validate_args(args: argparse.Namespace) -> None: + if args.count < MIN_TOPIC_SUMMARIES and not args.allow_below_threshold: + raise SmokeError( + f"--count must be at least {MIN_TOPIC_SUMMARIES} for Topics generation. " + "Use --allow-below-threshold only when intentionally testing pre-generation behavior." + ) + if args.idle_seconds < 10: + raise SmokeError("--idle-seconds must be at least 10") + if getattr(args, "running_check_timeout", 0) < 0: + raise SmokeError("--running-check-timeout must be non-negative") + if getattr(args, "running_check_interval", 1) < 1: + raise SmokeError("--running-check-interval must be at least 1") + if getattr(args, "facet_preflight_timeout", 1) < 1: + raise SmokeError("--facet-preflight-timeout must be at least 1") + if getattr(args, "function_visibility_timeout", 0) < 0: + raise SmokeError("--function-visibility-timeout must be non-negative") + if getattr(args, "function_visibility_interval", 1) < 1: + raise SmokeError("--function-visibility-interval must be at least 1") + + +def timestamped_project_name(prefix: str, run_id: str | None = None) -> str: + safe_prefix = slugify_project_prefix(prefix) + timestamp = run_id or datetime.now(UTC).strftime("%Y%m%d-%H%M%S") + return f"{safe_prefix}-{timestamp}" + + +def slugify_project_prefix(value: str) -> str: + out = [] + previous_dash = False + for ch in value.strip().lower(): + if ch.isalnum(): + out.append(ch) + previous_dash = False + elif not previous_dash: + out.append("-") + previous_dash = True + slug = "".join(out).strip("-") + return slug or DEFAULT_PROJECT_PREFIX + + +def slugify_topic_map_name(value: str) -> str: + out = [] + previous_dash = False + for ch in value.strip().lower(): + if ("a" <= ch <= "z") or ch.isdigit(): + out.append(ch) + previous_dash = False + elif not previous_dash: + out.append("-") + previous_dash = True + slug = "".join(out).strip("-") + return slug or "topic-map" + + +def parse_duration_seconds(value: str) -> int: + value = value.strip() + if not value: + raise argparse.ArgumentTypeError("duration cannot be empty") + unit = value[-1].lower() if value[-1].isalpha() else "s" + number = value[:-1] if value[-1].isalpha() else value + try: + amount = int(number) + except ValueError as exc: + raise argparse.ArgumentTypeError(f"invalid duration {value!r}") from exc + multipliers = { + "s": 1, + "m": 60, + "h": 60 * 60, + "d": 24 * 60 * 60, + "w": 7 * 24 * 60 * 60, + } + if unit not in multipliers: + raise argparse.ArgumentTypeError(f"invalid duration unit {unit!r}") + return amount * multipliers[unit] + + +def format_duration_seconds(seconds: int) -> str: + for suffix, scale in ( + ("w", 7 * 24 * 60 * 60), + ("d", 24 * 60 * 60), + ("h", 60 * 60), + ("m", 60), + ("s", 1), + ): + if seconds > 0 and seconds % scale == 0: + return f"{seconds // scale}{suffix}" + return f"{seconds}s" + + +def inclusive_start_xact_id_from_epoch_ms(epoch_ms: int) -> str: + xact_namespace = 0x0DE1 + epoch_seconds = max(0, epoch_ms // 1000) + transaction_id = (xact_namespace << 48) | ((epoch_seconds & 0x0000FFFFFFFF) << 16) + if transaction_id <= 0: + return "0" + return str(transaction_id - 1) + + +def escape_btql_literal(value: str) -> str: + return value.replace("'", "''") + + +def quote_btql_path_piece(value: str) -> str: + if value and value.replace("_", "").isalnum() and not value[0].isdigit(): + return value + return '"' + value.replace('"', '""') + '"' + + +def read_count(row: dict[str, Any], key: str) -> int: + value = row.get(key) + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) + if isinstance(value, str): + try: + return int(value) + except ValueError: + return 0 + return 0 + + +def topic_automation_object_id(project_id: str, data_scope: Any | None) -> str: + if not isinstance(data_scope, dict): + return f"project_logs:{project_id}" + scope_type = data_scope.get("type") + if scope_type in (None, "project_logs"): + return f"project_logs:{project_id}" + if scope_type == "project_experiments": + return f"project_experiments:{project_id}" + if scope_type == "experiment": + experiment_id = data_scope.get("experiment_id") + if not experiment_id: + raise SmokeError("topic automation experiment data scope is missing experiment_id") + return f"experiment:{experiment_id}" + raise SmokeError(f"unsupported topic automation data scope: {scope_type}") + + +def load_configured_defaults() -> dict[str, Any]: + """Load smoke test CLI defaults from braintest.yaml when available.""" + try: + from config import load_config + except ModuleNotFoundError: + repo_root = Path(__file__).resolve().parents[1] + if str(repo_root) not in sys.path: + sys.path.insert(0, str(repo_root)) + try: + from config import load_config + except ModuleNotFoundError: + return {} + + try: + config = load_config() + except Exception: + return {} + + smoketest = config.get("smoketest", {}) + if not isinstance(smoketest, dict): + return {} + + defaults: dict[str, Any] = {} + direct_keys = [ + "api_url", + "org", + "project_prefix", + "count", + "idle_seconds", + "timeout", + "allow_below_threshold", + "skip_facet_preflight", + "skip_running_check", + "facet_preflight_timeout", + "function_visibility_timeout", + "function_visibility_interval", + "running_check_timeout", + "running_check_interval", + ] + for key in direct_keys: + value = smoketest.get(key) + if value is not None: + defaults[key] = value + + duration_keys = { + "topic_window": "topic_window_seconds", + "generation_cadence": "generation_cadence_seconds", + "relabel_overlap": "relabel_overlap_seconds", + } + for config_key, parser_key in duration_keys.items(): + value = smoketest.get(config_key) + if value is not None: + defaults[parser_key] = parse_duration_seconds(str(value)) + + return defaults + + +def build_parser(defaults: dict[str, Any] | None = None) -> argparse.ArgumentParser: + defaults = defaults or {} + parser = argparse.ArgumentParser( + description="Create a disposable Braintrust project for minimal Topics smoke testing." + ) + subparsers = parser.add_subparsers(dest="command") + + def add_common_auth_flags(p: argparse.ArgumentParser) -> None: + p.add_argument( + "--api-key", + default=defaults.get("api_key"), + help="Braintrust API key. Defaults to BRAINTRUST_API_KEY.", + ) + p.add_argument( + "--org", + default=defaults.get("org"), + help="Braintrust org name. Defaults to BRAINTRUST_ORG_NAME.", + ) + p.add_argument( + "--api-url", + default=defaults.get("api_url"), + help="Override discovered dataplane API URL. Defaults to BRAINTRUST_API_URL.", + ) + p.add_argument( + "--timeout", + type=int, + default=defaults.get("timeout", DEFAULT_TIMEOUT_SECONDS), + ) + + create = subparsers.add_parser("create", help="Create a project, seed traces, and queue Topics.") + add_common_auth_flags(create) + add_create_flags(create, defaults) + + status = subparsers.add_parser("status", help="Show minimal Topics automation status.") + add_common_auth_flags(status) + status.add_argument("--project", required=True, help="Project name created by this script.") + + add_common_auth_flags(parser) + add_create_flags(parser, defaults) + return parser + + +def add_create_flags(parser: argparse.ArgumentParser, defaults: dict[str, Any] | None = None) -> None: + defaults = defaults or {} + parser.add_argument( + "--project-prefix", + default=defaults.get("project_prefix", DEFAULT_PROJECT_PREFIX), + ) + parser.add_argument("--count", type=int, default=defaults.get("count", DEFAULT_COUNT)) + parser.add_argument("--idle-seconds", type=int, default=defaults.get("idle_seconds", 10)) + parser.add_argument( + "--topic-window", + dest="topic_window_seconds", + type=parse_duration_seconds, + default=defaults.get("topic_window_seconds", parse_duration_seconds("1h")), + ) + parser.add_argument( + "--generation-cadence", + dest="generation_cadence_seconds", + type=parse_duration_seconds, + default=defaults.get("generation_cadence_seconds", parse_duration_seconds("1h")), + ) + parser.add_argument( + "--relabel-overlap", + dest="relabel_overlap_seconds", + type=parse_duration_seconds, + default=defaults.get("relabel_overlap_seconds", parse_duration_seconds("10m")), + ) + parser.add_argument( + "--allow-below-threshold", + action="store_true", + default=defaults.get("allow_below_threshold", False), + ) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument( + "--skip-facet-preflight", + action="store_true", + default=defaults.get("skip_facet_preflight", False), + help="Do not run a one-trace direct facet invocation before queueing Topics.", + ) + parser.add_argument( + "--facet-preflight-timeout", + type=int, + default=defaults.get("facet_preflight_timeout", DEFAULT_PREFLIGHT_TIMEOUT_SECONDS), + help="Seconds to allow for the one-trace direct facet preflight.", + ) + parser.add_argument( + "--function-visibility-timeout", + type=int, + default=defaults.get( + "function_visibility_timeout", + DEFAULT_FUNCTION_VISIBILITY_TIMEOUT_SECONDS, + ), + help="Seconds to wait for saved functions to resolve through /function/use.", + ) + parser.add_argument( + "--function-visibility-interval", + type=int, + default=defaults.get( + "function_visibility_interval", + DEFAULT_FUNCTION_VISIBILITY_INTERVAL_SECONDS, + ), + help="Seconds between saved-function /function/use visibility checks.", + ) + parser.add_argument( + "--skip-running-check", + action="store_true", + default=defaults.get("skip_running_check", False), + help="Do not poll for the expected initial Topics running count after queueing.", + ) + parser.add_argument( + "--running-check-timeout", + type=int, + default=defaults.get("running_check_timeout", DEFAULT_RUNNING_CHECK_TIMEOUT_SECONDS), + help="Seconds to wait for the expected Topics running count.", + ) + parser.add_argument( + "--running-check-interval", + type=int, + default=defaults.get("running_check_interval", DEFAULT_RUNNING_CHECK_INTERVAL_SECONDS), + help="Seconds between Topics running-count checks.", + ) + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser(load_configured_defaults()) + args = parser.parse_args(argv) + command = args.command or "create" + try: + if command == "status": + return run_status(args) + return run_create(args) + except SmokeError as exc: + print(f"error: {exc}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/smoke_test/test_topics_smoke.py b/smoke_test/test_topics_smoke.py new file mode 100644 index 0000000..25180d8 --- /dev/null +++ b/smoke_test/test_topics_smoke.py @@ -0,0 +1,583 @@ +import os +import sys +import unittest +from contextlib import redirect_stderr, redirect_stdout +from io import StringIO +from pathlib import Path +from unittest import mock + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) +from smoke_test import run as topics_smoke # noqa: E402 + + +class TopicsSmokeTests(unittest.TestCase): + def make_args(self, **overrides): + values = { + "count": 105, + "allow_below_threshold": False, + "idle_seconds": 10, + "facet_preflight_timeout": 30, + "function_visibility_timeout": 60, + "function_visibility_interval": 2, + "running_check_timeout": 90, + "running_check_interval": 3, + } + values.update(overrides) + return mock.Mock(**values) + + def test_count_guardrail_rejects_below_threshold(self): + args = self.make_args(count=99, allow_below_threshold=False) + with self.assertRaises(topics_smoke.SmokeError): + topics_smoke.validate_args(args) + + def test_count_guardrail_can_be_overridden(self): + args = self.make_args(count=99, allow_below_threshold=True) + topics_smoke.validate_args(args) + + def test_running_check_interval_guardrail(self): + args = self.make_args(running_check_interval=0) + with self.assertRaises(topics_smoke.SmokeError): + topics_smoke.validate_args(args) + + def test_timestamped_project_name_uses_prefix_and_timestamp(self): + self.assertEqual( + topics_smoke.timestamped_project_name("Topics Smoke!", "20260515-120102"), + "topics-smoke-20260515-120102", + ) + + def test_app_project_topics_url_encodes_path_pieces(self): + self.assertEqual( + topics_smoke.app_project_topics_url("Acme / Test", "project/name"), + "https://www.braintrust.dev/app/Acme%20%2F%20Test/p/project%2Fname/topics", + ) + + def test_terminal_hyperlink_includes_clickable_label_and_visible_url(self): + link = topics_smoke.terminal_hyperlink("Topics results", "https://example.com/topics") + self.assertIn("\033]8;;https://example.com/topics\aTopics results", link) + self.assertTrue(link.endswith(" (https://example.com/topics)")) + + def test_parser_has_no_model_or_facet_override_flags(self): + parser = topics_smoke.build_parser() + with self.assertRaises(SystemExit), redirect_stderr(StringIO()): + parser.parse_args(["--dry-run", "--facet-model", "brain-facet-2"]) + with self.assertRaises(SystemExit), redirect_stderr(StringIO()): + parser.parse_args(["--dry-run", "--facet", "Task"]) + + def test_build_trace_events_count_and_shape(self): + events = topics_smoke.build_trace_events( + 105, + project_name="topics-smoke-20260515-120102", + run_id="20260515-120102", + ) + self.assertEqual(len(events), 105) + first = events[0] + self.assertEqual(first["span_id"], first["root_span_id"]) + self.assertEqual(first["span_attributes"]["type"], "llm") + self.assertEqual(first["input"][0]["role"], "user") + self.assertEqual(first["output"][0]["role"], "assistant") + self.assertTrue(first["metadata"]["synthetic"]) + + def test_select_org_single_org(self): + org = topics_smoke.select_org( + [{"id": "org_1", "name": "acme", "api_url": "https://api.acme"}], + None, + ) + self.assertEqual(org["name"], "acme") + + def test_select_org_multiple_requires_requested_org(self): + with self.assertRaises(topics_smoke.SmokeError): + topics_smoke.select_org( + [ + {"id": "org_1", "name": "acme", "api_url": "https://api.acme"}, + {"id": "org_2", "name": "other", "api_url": "https://api.other"}, + ], + None, + ) + + def test_select_org_uses_requested_org(self): + org = topics_smoke.select_org( + [ + {"id": "org_1", "name": "acme", "api_url": "https://api.acme"}, + {"id": "org_2", "name": "other", "api_url": "https://api.other"}, + ], + "other", + ) + self.assertEqual(org["api_url"], "https://api.other") + + def test_discover_auth_uses_forced_api_url_with_discovered_org(self): + args = mock.Mock( + api_key="key", + org=None, + api_url="https://forced.example", + timeout=1, + ) + with mock.patch.dict(os.environ, {}, clear=True), mock.patch.object( + topics_smoke, + "json_request", + return_value={ + "org_info": [ + { + "id": "org_1", + "name": "acme", + "api_url": "https://discovered.example", + } + ] + }, + ): + ctx = topics_smoke.discover_auth(args) + self.assertEqual(ctx.org_name, "acme") + self.assertEqual(ctx.api_url, "https://forced.example") + + def test_preview_auth_without_key_uses_explicit_values(self): + args = mock.Mock( + api_key=None, + org="acme", + api_url="https://forced.example", + ) + with mock.patch.dict(os.environ, {}, clear=True): + preview = topics_smoke.preview_auth(args) + self.assertEqual(preview["org"], "acme") + self.assertEqual(preview["api_url"], "https://forced.example") + self.assertIn("No network calls", preview["note"]) + + def test_preview_auth_with_key_discovers_values(self): + args = mock.Mock(api_key="key", org=None, api_url=None) + with mock.patch.object( + topics_smoke, + "discover_auth", + return_value=topics_smoke.AuthContext( + api_key="key", + org_id="org_1", + org_name="acme", + api_url="https://api.acme", + ), + ): + preview = topics_smoke.preview_auth(args) + self.assertEqual(preview["org"], "acme") + self.assertEqual(preview["api_url"], "https://api.acme") + self.assertIn("No project", preview["note"]) + + def test_duration_roundtrip(self): + self.assertEqual(topics_smoke.parse_duration_seconds("1h"), 3600) + self.assertEqual(topics_smoke.format_duration_seconds(3600), "1h") + self.assertEqual(topics_smoke.format_duration_seconds(90), "90s") + + def test_api_helpers_send_org_header(self): + ctx = topics_smoke.AuthContext( + api_key="key", + org_id="org_1", + org_name="acme", + api_url="https://api.acme", + ) + with mock.patch.object(topics_smoke, "json_request", return_value={}) as request: + topics_smoke.api_get(ctx, "/v1/project") + topics_smoke.api_post(ctx, "/btql", {"query": "select 1"}) + + self.assertEqual(request.call_args_list[0].kwargs["org_name"], "acme") + self.assertEqual(request.call_args_list[1].kwargs["org_name"], "acme") + + def test_inclusive_start_xact_id_matches_cli_formula_shape(self): + self.assertEqual( + topics_smoke.inclusive_start_xact_id_from_epoch_ms(0), + str((0x0DE1 << 48) - 1), + ) + + def test_btql_path_quoting_handles_function_keys(self): + self.assertEqual(topics_smoke.quote_btql_path_piece("Task"), "Task") + self.assertEqual( + topics_smoke.quote_btql_path_piece("global:facet:Task"), + '"global:facet:Task"', + ) + self.assertEqual(topics_smoke.quote_btql_path_piece(""), '""') + + def test_read_count_accepts_numeric_shapes(self): + self.assertEqual(topics_smoke.read_count({"count": 3}, "count"), 3) + self.assertEqual(topics_smoke.read_count({"count": 3.0}, "count"), 3) + self.assertEqual(topics_smoke.read_count({"count": "3"}, "count"), 3) + self.assertEqual(topics_smoke.read_count({"count": "nope"}, "count"), 0) + + def test_verify_expected_running_passes_when_expected_count_seen(self): + args = self.make_args(count=105) + project = topics_smoke.Project(id="project_1", name="topics-smoke-test") + with mock.patch.object( + topics_smoke, + "get_topic_facet_status_counts", + return_value={ + "matched": 0, + "processed": 0, + "running": 105, + "errors": 0, + "completed": 0, + }, + ) as counts, redirect_stdout(StringIO()): + topics_smoke.verify_expected_running( + mock.Mock(), + project, + {"id": "automation_1"}, + ["Task"], + args, + ) + counts.assert_called_once_with( + mock.ANY, + "project_1", + "automation_1", + "Task", + ["global:facet:Task"], + ) + + def test_verify_expected_running_passes_when_expected_count_already_processed(self): + args = self.make_args(count=105) + project = topics_smoke.Project(id="project_1", name="topics-smoke-test") + with mock.patch.object( + topics_smoke, + "get_topic_facet_status_counts", + return_value={ + "matched": 105, + "processed": 105, + "running": 0, + "errors": 0, + "completed": 105, + }, + ) as counts, redirect_stdout(StringIO()): + topics_smoke.verify_expected_running( + mock.Mock(), + project, + {"id": "automation_1"}, + ["Task"], + args, + ) + counts.assert_called_once() + + def test_verify_expected_running_keeps_polling_through_transient_errors(self): + args = self.make_args(count=105) + project = topics_smoke.Project(id="project_1", name="topics-smoke-test") + with mock.patch.object( + topics_smoke, + "get_topic_facet_status_counts", + side_effect=[ + { + "matched": 0, + "processed": 0, + "running": 0, + "errors": 0, + "completed": 0, + }, + { + "matched": 0, + "processed": 0, + "running": 105, + "errors": 0, + "completed": 0, + }, + ], + ) as counts, mock.patch.object(topics_smoke.time, "sleep"), redirect_stdout(StringIO()): + topics_smoke.verify_expected_running( + mock.Mock(), + project, + {"id": "automation_1"}, + ["Task"], + args, + ) + self.assertEqual(counts.call_count, 2) + + def test_verify_expected_running_fails_on_persistent_errors_at_timeout(self): + args = self.make_args(count=105, running_check_timeout=0) + project = topics_smoke.Project(id="project_1", name="topics-smoke-test") + with mock.patch.object( + topics_smoke, + "get_topic_facet_status_counts", + return_value={ + "matched": 0, + "processed": 0, + "running": 0, + "errors": 105, + "completed": 0, + }, + ), self.assertRaises(topics_smoke.SmokeError), redirect_stdout(StringIO()): + topics_smoke.verify_expected_running( + mock.Mock(), + project, + {"id": "automation_1"}, + ["Task"], + args, + ) + + def test_verify_expected_running_stops_when_all_expected_rows_error(self): + args = self.make_args(count=105, running_check_timeout=90) + project = topics_smoke.Project(id="project_1", name="topics-smoke-test") + with mock.patch.object( + topics_smoke, + "get_topic_facet_status_counts", + return_value={ + "matched": 0, + "processed": 0, + "running": 0, + "errors": 105, + "completed": 0, + }, + ) as counts, self.assertRaisesRegex(topics_smoke.SmokeError, "reported 105 error"), redirect_stdout(StringIO()): + topics_smoke.verify_expected_running( + mock.Mock(), + project, + {"id": "automation_1"}, + ["Task"], + args, + ) + counts.assert_called_once() + + def test_status_count_query_treats_finished_attempts_as_errors(self): + posted_bodies = [] + + def fake_api_post(_ctx, path, body): + posted_bodies.append((path, body)) + if path == "/brainstore/automation/get-cursors": + return {"pending_min_executed_xact_id": None} + if path == "/btql": + return { + "data": [ + {"matched": 0, "processed": 0, "inflight": 105, "errors": 105} + ] + } + raise AssertionError(path) + + with mock.patch.object(topics_smoke, "api_post", side_effect=fake_api_post): + counts = topics_smoke.get_topic_facet_status_counts( + mock.Mock(), + "project_1", + "automation_1", + "Task", + ) + + self.assertEqual(counts["running"], 0) + self.assertEqual(counts["errors"], 105) + btql_body = posted_bodies[1][1] + self.assertIn("attempts > 0", btql_body["query"]) + + def test_saved_function_ref_key_handles_local_and_global_refs(self): + self.assertEqual( + topics_smoke.saved_function_ref_key({"type": "function", "id": "facet_1"}), + "function_id:facet_1", + ) + self.assertEqual( + topics_smoke.saved_function_ref_key( + {"type": "global", "function_type": "facet", "name": "Task"} + ), + "global:facet:Task", + ) + + def test_saved_function_ref_from_inserted_function_keeps_version(self): + self.assertEqual( + topics_smoke.saved_function_ref_from_inserted_function( + {"id": "function_1"}, + {"xact_id": "123"}, + ), + {"type": "function", "id": "function_1", "version": "123"}, + ) + + def test_saved_function_ref_to_use_body_converts_local_and_global_refs(self): + self.assertEqual( + topics_smoke.saved_function_ref_to_use_body( + {"type": "function", "id": "function_1", "version": "123"} + ), + {"function_id": "function_1", "version": "123"}, + ) + self.assertEqual( + topics_smoke.saved_function_ref_to_use_body( + {"type": "global", "function_type": "facet", "name": "Task"} + ), + {"global_function": "Task", "function_type": "facet"}, + ) + + def test_verify_function_refs_resolve_calls_function_use_with_project_header(self): + calls = [] + + def fake_api_post(_ctx, path, body, **kwargs): + calls.append((path, body, kwargs)) + if body["function_id"] == "facet_function_1": + return {"function_data": {"type": "facet"}} + if body["function_id"] == "topic_map_function_1": + return {"function_data": {"type": "topic_map"}} + raise AssertionError(body) + + with ( + mock.patch.object(topics_smoke, "api_post", side_effect=fake_api_post), + redirect_stdout(StringIO()), + ): + topics_smoke.verify_function_refs_resolve( + mock.Mock(), + "project_1", + [{"type": "function", "id": "facet_function_1", "version": "10"}], + [{"function": {"type": "function", "id": "topic_map_function_1"}}], + timeout_seconds=0, + interval_seconds=1, + ) + + self.assertEqual([call[0] for call in calls], ["/function/use", "/function/use"]) + self.assertEqual(calls[0][1], {"function_id": "facet_function_1", "version": "10"}) + self.assertEqual(calls[0][2]["extra_headers"], {"x-bt-project-id": "project_1"}) + + def test_verify_function_refs_resolve_waits_through_transient_lookup_failure(self): + responses = [ + topics_smoke.SmokeError("not found yet"), + {"function_data": {"type": "facet"}}, + {"function_data": {"type": "topic_map"}}, + ] + + def fake_api_post(_ctx, _path, _body, **_kwargs): + response = responses.pop(0) + if isinstance(response, Exception): + raise response + return response + + with ( + mock.patch.object(topics_smoke, "api_post", side_effect=fake_api_post), + mock.patch.object(topics_smoke.time, "sleep") as sleep, + redirect_stdout(StringIO()), + ): + topics_smoke.verify_function_refs_resolve( + mock.Mock(), + "project_1", + [{"type": "function", "id": "facet_function_1"}], + [{"function": {"type": "function", "id": "topic_map_function_1"}}], + timeout_seconds=30, + interval_seconds=1, + ) + + sleep.assert_called_once_with(1) + + def test_create_topic_function_refs_use_builtin_facet_and_create_topic_map(self): + posted_bodies = [] + + def fake_api_post(_ctx, path, body): + posted_bodies.append((path, body)) + self.assertEqual(path, "/insert-functions") + return {"functions": [{"id": "topic_map_function_1"}]} + + with mock.patch.object(topics_smoke, "api_post", side_effect=fake_api_post): + facet_names, facet_refs, topic_map_refs = topics_smoke.create_topic_function_refs( + mock.Mock(), + "project_1", + ) + + self.assertEqual(facet_names, ["Task"]) + self.assertEqual( + facet_refs, + [{"type": "global", "name": "Task", "function_type": "facet"}], + ) + self.assertEqual( + topic_map_refs, + [{"function": {"type": "function", "id": "topic_map_function_1"}}], + ) + body = posted_bodies[0][1] + function = body["functions"][0] + self.assertEqual(function["function_type"], "classifier") + self.assertEqual(function["slug"], "task-topic-map") + self.assertEqual(function["function_data"]["source_facet"], "Task") + self.assertEqual( + function["function_data"]["embedding_model"], + topics_smoke.TOPIC_MAP_EMBEDDING_MODEL, + ) + + def test_build_batched_facet_inline_function_matches_topics_pipeline_shape(self): + inline_function = topics_smoke.build_batched_facet_inline_function( + "Task", + {"function": {"type": "function", "id": "topic_map_function_1"}}, + ) + + self.assertEqual(inline_function["type"], "batched_facet") + self.assertNotIn("model", inline_function["facets"][0]) + self.assertEqual( + inline_function["facets"][0]["embedding_model"], + topics_smoke.TOPIC_MAP_EMBEDDING_MODEL, + ) + self.assertEqual( + inline_function["topic_maps"]["Task"][0]["topic_map_id"], + "topic_map_function_1", + ) + self.assertEqual( + inline_function["topic_maps"]["Task"][0]["topic_map_data"]["embedding_model"], + topics_smoke.TOPIC_MAP_EMBEDDING_MODEL, + ) + + def test_enable_topics_uses_builtin_facet_without_facet_model_override(self): + posted_bodies = [] + + def fake_api_post(_ctx, path, body): + posted_bodies.append((path, body)) + if path == "/insert-functions": + return {"functions": [{"id": "topic_map_function_1"}]} + if path == "/api/project_automation/register": + return { + "project_automation": { + "id": "automation_1", + "config": body["config"], + } + } + return {} + + args = self.make_args( + generation_cadence_seconds=3600, + relabel_overlap_seconds=600, + topic_window_seconds=3600, + ) + with ( + mock.patch.object(topics_smoke, "list_topic_automations", return_value=[]), + mock.patch.object(topics_smoke, "api_post", side_effect=fake_api_post), + mock.patch.object(topics_smoke, "seed_topic_automation_cursors"), + ): + topics_smoke.enable_topics( + mock.Mock(), + topics_smoke.Project(id="project_1", name="project"), + args, + ) + + register_body = next( + body for path, body in posted_bodies if path == "/api/project_automation/register" + ) + self.assertNotIn("facet_model", register_body["config"]) + self.assertEqual( + register_body["config"]["facet_functions"], + [{"type": "global", "name": "Task", "function_type": "facet"}], + ) + self.assertEqual( + register_body["config"]["topic_map_functions"], + [{"function": {"type": "function", "id": "topic_map_function_1"}}], + ) + + def test_preflight_topics_pipeline_invokes_async_batch_gateway_path(self): + args = self.make_args() + event = {"root_span_id": "root_1"} + calls = [] + + def fake_api_post(_ctx, path, body, **kwargs): + calls.append((path, body, kwargs)) + if path == "/function/invoke" and body.get("global_function") == "Task": + return "User wants to test topics" + if path == "/function/invoke": + return {"Task": "User wants to test topics"} + if path == "/function/invoke-async-batch": + return {"status": "success", "succeeded": 1, "failed": 0, "total": 1} + raise AssertionError(path) + + with ( + mock.patch.object(topics_smoke, "api_post", side_effect=fake_api_post), + redirect_stdout(StringIO()), + ): + topics_smoke.preflight_topics_pipeline( + mock.Mock(), + topics_smoke.Project(id="project_1", name="project"), + event, + args, + ["Task"], + [{"type": "global", "name": "Task", "function_type": "facet"}], + [{"function": {"type": "function", "id": "topic_map_function_1"}}], + ) + + async_call = calls[-1] + self.assertEqual(async_call[0], "/function/invoke-async-batch") + self.assertEqual(async_call[1][0]["request"]["inline_function"]["type"], "batched_facet") + self.assertEqual(async_call[2]["extra_headers"], {"x-bt-use-gateway": "true"}) + + +if __name__ == "__main__": + unittest.main()