Skip to content

kris927b/dataset-api

Repository files navigation

Dataset Monitor API

API Spec

Project: Dataset Monitor API (Python/FastAPI)

1) Purpose & Scope

A Python API server using the FastAPI framework that:

  • Monitors a local filesystem tree of Parquet-backed datasets.
  • Exposes REST endpoints to discover datasets/versions/files and read metadata.
  • Runs lightweight analyses (row counts, basic stats, column sums/means, distincts).
  • Produces simple plots (histograms, boxplots, timeseries line charts) as images or Vega-Lite JSON.
  • Orchestrates processing with TextBlaster by publishing jobs to a message queue.

Non-goals (for v1)

  • No arbitrary SQL over datasets (limited curated analyses only).
  • No multi-tenant auth/authorization (can be added later).
  • No distributed compute (single-node, but parallelized IO/CPU).

2) Filesystem model

datasets/
├── <dataset_slug>/
│   └── original/
│       └── v<semver>/
│           └── <dataset_slug>.parquet
  • dataset_slug: kebab-case ASCII (e.g., ai-aktindsigt).
  • Collections (future): allow multiple parquet files under a version.
  • The server must be resilient to missing or partial versions.

3) Technology choices (Python/FastAPI)

  • Web framework: FastAPI running on an ASGI server like uvicorn.
  • Parquet/frames: polars (via lazy API) for high-performance data manipulation.
  • Plotting: altair for Vega-Lite JSON emission.
  • FS watching: watchdog.
  • Background jobs: FastAPI BackgroundTasks for simple, in-process tasks.
  • Config: pydantic-settings for deserializing settings from environment variables or .env files.
  • Logging/metrics: structlog for structured logging, opentelemetry-python for tracing, and prometheus-fastapi-instrumentator for Prometheus metrics.
  • Message Queue (TextBlaster): aio-pika for asynchronous communication with RabbitMQ.
  • Caching: In-memory dict with asyncio.Lock or a library like aiocache.

4) Configuration

# .env file
ROOT_DIR="/path/to/datasets"
BIND_ADDR="0.0.0.0"
BIND_PORT=8080
MAX_PARALLEL_SCANS=4
PLOT_BACKEND="vega"
CACHE_TTL_SECONDS=600
WATCH=True
RABBITMQ_URL="amqp://guest:guest@localhost/"
JOB_QUEUE_NAME="textblaster_jobs"

Environment variables or a .env file are used for configuration, loaded via pydantic-settings.

5) Data model (in-API)

Pydantic models are used to define the data structures.

Dataset {
  slug: str,
  path: str,
  variants: List[str] = ["original"],
  versions: List[str] = [],
  files: List[Dict],
  inferred_schema: List[Dict]
}

AnalysisRequest {
  dataset: str,
  variant: str = "original",
  version: str,
  operations: List[Dict],
  filters: Optional[List[Dict]] = None
}

AnalysisResult {
  stats: Dict,
  plots: Optional[List[Dict]] = None
}

TextBlasterJobRequest {
  input_file: str,
  output_file: str,
  excluded_file: str,
  text_column: str,
  id_column: str
}

6) Server responsibilities

  1. Indexing

    • On boot: scan root_dir for dataset structure; build index.
    • Extract light metadata (file size, mtime) and attempt schema inference from Parquet metadata.
  2. Watching

    • If watch=true, subscribe to FS changes (create/remove/modify) and update the index using watchdog.
  3. Analysis

    • Ad-hoc analysis pipeline over Polars lazyframes.
    • Column existence/type validation with clear HTTP 400 errors.
    • Results cached in-memory with a TTL.
  4. Plotting

    • For vega backend: return Vega-Lite JSON spec generated by altair.
  5. TextBlaster Integration

    • Publish job requests to a RabbitMQ queue. The server acts as a producer and does not track job status directly.

7) REST API (v1)

Base URL: /api/v1 FastAPI automatically generates OpenAPI (/openapi.json) and interactive documentation (/docs, /redoc).

Discovery

  • GET /datasets → list all datasets.
  • GET /datasets/{slug} → details for one dataset.
  • GET /datasets/{slug}/{variant} → versions.
  • GET /datasets/{slug}/{variant}/{version} → files + inferred schema.

Analysis

  • POST /analysis/preview → lightweight: row_count + column list.
  • POST /analysis/run → accepts AnalysisRequest; returns AnalysisResult.
  • GET /analysis/cache/{hash} → fetch cached result by key.

Plots (direct)

  • POST /plots → accepts a subset of AnalysisRequest with a single plot op; returns Vega-Lite JSON.

TextBlaster jobs

  • POST /textblaster/jobs → publish job to RabbitMQ; body: TextBlasterJobRequest. This is a "fire-and-forget" operation.

Health & Introspection

  • GET /healthz{ "status": "ok" }.
  • GET /readyz → verifies root_dir exists and RabbitMQ is reachable.
  • GET /metrics → Prometheus metrics.

8) Errors

  • Use standard FastAPI exception handling to return descriptive JSON error responses.
{
  "detail": "Column 'amount' not found"
}
  • Common codes: 400 (validation), 404 (dataset/missing version), 500 (internal), 503 (downstream service unavailable).

9) Security

  • v1: optional API key via header X-API-Key implemented using FastAPI's dependency injection system.
  • CORS: configurable via FastAPI's CORSMiddleware.
  • Path traversal prevention: ensure all file access is securely contained within root_dir.

10) Performance considerations

  • Polars lazy execution + predicate pushdown.
  • Run uvicorn with multiple worker processes for concurrency.
  • Parallel file reads up to max_parallel_scans.

11) Observability

  • Structured logging with structlog.
  • Tracing with opentelemetry-python.
  • Metrics for request durations, scan times, and job queue publishing.

12) Deployment

  • Python application run via a WSGI server like gunicorn or uvicorn.
  • Systemd unit example + Dockerfile provided.
  • Read-only FS mode supported (cache is in-memory).

13) Testing strategy

  • Unit tests: pytest for business logic (analysis, plotting, etc.).
  • Integration: pytest with httpx.AsyncClient to make requests to the test application.
  • Use temporary directories with sample datasets for testing file operations.

14) Example flows

A) Row count + sum

  1. Client calls POST /analysis/run with {dataset:"cellar",variant:"original",version:"v1.0.0",operations:[{"op":"row_count"},{"op":"sum","column":"amount"}]}.
  2. Server executes Polars plan; returns { stats: { row_count: 123456, sum_amount: 987654.32 } }.

B) Launch TextBlaster

  1. POST /textblaster/jobs with job details.
  2. Server validates the request and publishes a JSON message to the configured RabbitMQ queue.

15) Implementation outline (modules)

A typical FastAPI project structure:

app/
├── api/
│   ├── __init__.py
│   ├── datasets.py
│   ├── analysis.py
│   └── textblaster.py
├── services/
│   ├── __init__.py
│   ├── indexing.py
│   ├── analysis.py
│   └── queue.py
├── models/
│   ├── __init__.py
│   └── domain.py
├── core/
│   ├── __init__.py
│   └── config.py
├── __init__.py
└── main.py
tests/
pyproject.toml
  • main.py – bootstrap, configuration, and API router setup.
  • core/config.py – load/validate settings using Pydantic.
  • services/indexing.py – scanning, schema inference, and filesystem watching.
  • services/analysis.py – request parsing, polars plans, result formatting, caching, plotting.
  • services/queue.py – RabbitMQ connection and message publishing.
  • api/ – FastAPI routers, request/response models.
  • models/ – Pydantic models for internal data representation.

16) Compatibility & limits

  • Requires datasets with consistent schemas per version.
  • Parquet compression supported by Polars (Snappy, Zstd, Gzip).
  • Time-series ops assume a parseable datetime column.

17) Future extensions

  • Arrow Flight or DataFusion SQL for richer queries.
  • Materialized views of common analyses.
  • User-specified derived columns (safe expressions).
  • gRPC API surface alongside REST.
  • Role-based access control.

Resolved choices (user responses)

  1. Multiple files per version: Always a single Parquet file per version. The server will assume exactly one file at datasets/<slug>/<variant>/<version>/<slug>.parquet and error if missing or multiple files are found.
  2. Plot backend: Vega-Lite JSON preferred — server will emit Vega-Lite specs (via altair); client will render them.
  3. TextBlaster interface: Message queue integration. The server will publish a job request to a RabbitMQ queue.
  4. Filter language: Not required for v1 — analyses will be global over the dataset.
  5. Auth: API key header (X-API-Key) support is desired and will remain configurable in v1.
  6. Result limits: No enforced limits in v1.
  7. Schema drift: The server must validate that the single file in a version conforms to the expected schema and report schema drift as a validation error.
  8. TextBlaster output location: The output location is specified in the message sent to TextBlaster.
  9. Number formatting: Server returns raw numeric values; client is responsible for formatting.
  10. Deployment target: Bare-metal (systemd) — Docker is optional but not required.

Example API Requests

All requests assume the API server is running on http://localhost:8080.

1. Health Check

curl http://localhost:8080/healthz

2. Readiness Check

curl http://localhost:8080/readyz

3. List All Datasets

curl http://localhost:8080/api/v1/datasets

4. Get Dataset Details

curl http://localhost:8080/api/v1/datasets/ai-aktindsigt

5. Get Dataset Version Details

curl http://localhost:8080/api/v1/datasets/ai-aktindsigt/original/v1.0.0

6. Run Analysis (Row Count and Sum)

curl -X POST http://localhost:8080/api/v1/analysis/run \
     -H "Content-Type: application/json" \
     -d '{
           "dataset": "ai-aktindsigt",
           "variant": "original",
           "version": "v1.0.0",
           "operations": [
             { "op": "row_count" },
             { "op": "sum", "column": "amount" }
           ]
         }'

7. Generate Plot (Histogram)

curl -X POST http://localhost:8080/api/v1/plots \
     -H "Content-Type: application/json" \
     -d '{
           "dataset": "ai-aktindsigt",
           "variant": "original",
           "version": "v1.0.0",
           "operations": [
             { "op": "histogram", "column": "token_count", "bins": 20 }
           ]
         }'

8. Queue TextBlaster Job

curl -X POST http://localhost:8080/api/v1/textblaster/jobs \
     -H "Content-Type: application/json" \
     -d '{
           "input_file": "/path/to/datasets/ai-aktindsigt/original/v1.0.0/ai-aktindsigt.parquet",
           "output_file": "/path/to/datasets/ai-aktindsigt/processed/v1.0.0/ai-aktindsigt.parquet",
           "excluded_file": "/path/to/datasets/ai-aktindsigt/excluded/v1.0.0/ai-aktindsigt.parquet",
           "text_column": "text",
           "id_column": "id"
         }'

Updated Implementation Plan:

Phase 1: Project Structure & Core Dependencies

  1. Initialize a Python project (e.g., with poetry or a venv and pip).
  2. Add core dependencies:
    • fastapi for the web framework.
    • uvicorn as the ASGI server.
    • polars for data processing.
    • pydantic and pydantic-settings for data validation and configuration.
    • aio-pika for RabbitMQ integration.
    • watchdog for filesystem monitoring.
    • altair for plotting.
    • structlog, opentelemetry-python, prometheus-fastapi-instrumentator for observability.
  3. Set up the project structure as outlined in section 15.

Phase 2: Configuration and CLI

  1. Implement configuration loading (app/core/config.py) using pydantic-settings.
  2. (Optional) Add a simple CLI using typer for starting the server or running management tasks.

Phase 3: Filesystem Index & Dataset Discovery

  1. Implement dataset indexing (app/services/indexing.py) to scan the ROOT_DIR.
  2. Add the filesystem watcher using watchdog to update the index on changes.
  3. Create the discovery endpoints (app/api/datasets.py) to serve indexed data.

Phase 4: Data Analysis Engine

  1. Build the analysis engine (app/services/analysis.py) using Polars LazyFrames.
  2. Implement in-memory caching with a TTL.
  3. Add Vega-Lite plotting with altair.
  4. Create the analysis and plotting endpoints (app/api/analysis.py).

Phase 5: TextBlaster Integration

  1. Implement a RabbitMQ publisher service (app/services/queue.py) using aio-pika.
  2. Ensure robust connection management (e.g., retries).
  3. Create the TextBlaster endpoint (app/api/textblaster.py) to accept job requests and publish them to the queue.

Phase 6: Final Assembly & API

  1. Wire everything together in app/main.py, including routers, middleware, and application state (like the RabbitMQ connection pool).
  2. Implement health and readiness endpoints (/healthz, /readyz).
  3. Configure the Prometheus middleware.

Phase 7: Testing & Documentation

  1. Add unit tests with pytest.
  2. Add integration tests using httpx.AsyncClient.
  3. Ensure the auto-generated OpenAPI documentation at /docs is clean and usable.
  4. Create deployment artifacts (Dockerfile, example systemd service file).

About

FastAPI server to investigate datasets used for LLM training

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages