Skip to content

Latest commit

 

History

History
582 lines (474 loc) · 22 KB

File metadata and controls

582 lines (474 loc) · 22 KB
status Blueprint (design spec)
languages
python
typescript
agent_pattern parallel-calls
agent_role You are a data enrichment assistant. Enrich each input record by gathering attributes in parallel and returning a consolidated result.
primitives
runtime_modes
default local_only
description swaps context_budget
Anthropic Claude Haiku for the enrichment fan-out.
input_max output_max
80000
8000
description swaps context_budget
Self-hosted vLLM.
stack/llm-claude
stack/llm-local-vllm
input_max output_max
32000
4000
smoke_test
ready exercise assert_jq
curl -sf http://localhost:8000/health
curl -sf -X POST http://localhost:8000/enrich \ -H 'content-type: application/json' \ -d '{"items":[{"id":1,"text":"smoke test"}]}'
.results | length > 0
cost_profile
tier sources typical_run_usd
low
anthropic
0.002
model_recommendation claude-haiku-4-5
env_overrides
APP_PORT CONCURRENCY
8000
10
est_tokens 3500
required_files
Dockerfile
docker-compose.yml
.github/workflows/ci.yml
app/main.py
app/agent/enricher.py
tests/unit/test_enricher.py
tests/integration/test_batch.py
tests/eval/test_enrichment_quality.py
recipe_dependencies
python typescript
fastapi pydantic-ai pydantic-settings redis structlog langfuse
>=0.110.0
>=0.0.13
>=2.0.0
>=5.0.0
>=24.1.0
>=2.0.0
hono @ai-sdk/anthropic ai zod ioredis langfuse
^4.0.0
^1.0.0
^4.0.0
^3.23.0
^5.4.0
^3.0.0
external_services
redis
langfuse
capabilities
cache.redis
obs.langfuse
eval.promptfoo
acceptance_contracts
http_endpoints required_env required_compose_services smoke_assertions
path method status
/health
GET
200
path method status
/enrich
POST
200
name source
ANTHROPIC_API_KEY
prompted
redis
langfuse
jq against
.results | length > 0
smoke_test.exercise.stdout
topology parallel
load_list
path required
../../vendored/blueprints/patterns/parallel-calls/overview.md
true
path required when
../frameworks/pydantic-ai.md
true
language == 'python'
path required when
../frameworks/vercel-ai-sdk.md
true
language == 'typescript'
path required
../cross-cutting/project-layout.md
true
path required
../stack/llm-claude.md
true
path required when
../stack/api-fastapi.md
false
language == 'python'
path required when
../stack/api-hono.md
false
language == 'typescript'
path required when
../stack/relational-postgres.md
false
capabilities contains 'relational.postgres'
path required when
../stack/cache-redis.md
false
capabilities contains 'cache.redis'
path required when
../stack/tracing-langfuse.md
false
capabilities contains 'obs.langfuse'
path required
../cross-cutting/auth-jwt.md
false
path required
../cross-cutting/logging-structured.md
false
path required
../cross-cutting/observability.md
false
path required
../cross-cutting/rate-limiting.md
false
path required
../cross-cutting/prompt-management.md
false

Recipe: Parallel Enricher

Status: Blueprint (design spec)

Composes

Load list

Feed these files to your AI coding assistant to build this agent:

Core (always load):

  • docs/recipes/parallel-enricher.md — this blueprint
  • vendored/blueprints/patterns/parallel-calls/overview.md — the parallel calls pattern
  • docs/frameworks/pydantic-ai.md (Python) or docs/frameworks/vercel-ai-sdk.md (TypeScript)
  • docs/stack/llm-claude.md — LLM integration and model selection

Stack (load for Tier 2 — API-ready):

  • docs/stack/api-fastapi.md or docs/stack/api-hono.md — API layer
  • docs/stack/cache-redis.md — rate limiting backend

Production concerns (load for Tier 3):

  • docs/cross-cutting/auth-jwt.md · docs/cross-cutting/rate-limiting.md · docs/cross-cutting/logging-structured.md · docs/cross-cutting/observability.md · docs/cross-cutting/testing-strategy.md

Scaffolding: docs/reference/docker-templates.md · docs/reference/docker-compose-template.md

Note: This agent is stateless batch processing — Postgres is optional (only needed if you want to persist enrichment results).

Generation prompt

Copy-paste this into Claude Code or Cursor to scaffold this recipe before agent-scaffold ships:

You are scaffolding a runnable agent project from a spec at https://github.com/jagguvarma15/agent-deployments.

Step 1 — Fetch:
  - https://raw.githubusercontent.com/jagguvarma15/agent-deployments/main/catalog.yaml
  - https://raw.githubusercontent.com/jagguvarma15/agent-deployments/main/docs/recipes/parallel-enricher.md
  - Every `load_list[].path` with `required: true` and `cache_tier: hot`.

Step 2 — Generate the project at `./parallel-enricher/` matching the recipe's `required_files[]`:
  - model(s): claude-haiku-4-5
  - framework: pydantic-ai (Python) or vercel-ai-sdk (TS)
  - runtime_mode: default
  - env vars: from `catalog.recipes[parallel-enricher].env_contract`

Step 3 — Bring it up: `docker compose up` + bootstrap per `LAYER_ORDER`.

Step 4 — Run the smoke test:

     curl -sf -X POST http://localhost:8000/enrich \
       -H 'content-type: application/json' \
       -d '{"items":[{"id":1,"text":"smoke test"}]}'

Step 5 — Validate against `catalog.recipes[parallel-enricher].acceptance_contracts`.

What it does

A batch enrichment agent. Given a list of records (e.g., company names, contact emails, product URLs), the agent enriches each record in parallel — extracting structured data, classifying, scoring, and augmenting with external information. Results are aggregated into a structured output.

This implements homogeneous fan-out with concurrency control — the same enrichment prompt runs on each item concurrently, with a semaphore limiting parallel LLM calls to avoid rate limits.

Architecture

Input (list of N records)
    |
    v
[Splitter] ──> N individual records
    |
    v
[Concurrency controller: semaphore(10)]
    |
    ├──> [Enrich record 1] ──┐
    ├──> [Enrich record 2] ──┤
    ├──> [Enrich record 3] ──┤
    │    ...                  │
    └──> [Enrich record N] ──┘
                              |
                              v
                     [Aggregator]
                              |
                              v
                     Enriched dataset

Data Models

Python (Pydantic)

from enum import Enum
from pydantic import BaseModel, Field


class CompanySize(str, Enum):
    startup = "startup"
    small = "small"
    mid_market = "mid-market"
    enterprise = "enterprise"
    unknown = "unknown"


class InputRecord(BaseModel):
    """A single record to enrich."""
    name: str = Field(..., min_length=1, description="Company or entity name")
    domain: str | None = Field(default=None, description="Website domain")
    email: str | None = Field(default=None, description="Contact email")
    extra: dict | None = Field(default=None, description="Any additional context")


class EnrichedRecord(BaseModel):
    """Enriched version of an input record."""
    name: str
    domain: str | None = None
    industry: str = Field(..., description="Inferred industry vertical")
    size: CompanySize = Field(..., description="Estimated company size")
    description: str = Field(..., description="One-line company description")
    score: float = Field(..., ge=0.0, le=1.0, description="Relevance/quality score")
    tags: list[str] = Field(default_factory=list, description="Classification tags")
    enrichment_source: str = Field(default="llm", description="How enrichment was performed")


class EnrichmentError(BaseModel):
    """Tracks a failed enrichment."""
    name: str
    error: str


class EnrichBatchRequest(BaseModel):
    records: list[InputRecord] = Field(..., min_length=1, max_length=100)
    concurrency: int = Field(default=10, ge=1, le=50, description="Max parallel LLM calls")


class EnrichBatchResponse(BaseModel):
    enriched: list[EnrichedRecord]
    errors: list[EnrichmentError] = Field(default_factory=list)
    total: int
    succeeded: int
    failed: int
    trace_id: str

TypeScript (Zod)

import { z } from "zod";

export const CompanySize = z.enum(["startup", "small", "mid-market", "enterprise", "unknown"]);
export type CompanySize = z.infer<typeof CompanySize>;

export const InputRecord = z.object({
  name: z.string().min(1),
  domain: z.string().optional(),
  email: z.string().email().optional(),
  extra: z.record(z.unknown()).optional(),
});
export type InputRecord = z.infer<typeof InputRecord>;

export const EnrichedRecord = z.object({
  name: z.string(),
  domain: z.string().optional(),
  industry: z.string(),
  size: CompanySize,
  description: z.string(),
  score: z.number().min(0).max(1),
  tags: z.array(z.string()).default([]),
  enrichment_source: z.string().default("llm"),
});
export type EnrichedRecord = z.infer<typeof EnrichedRecord>;

export const EnrichmentError = z.object({
  name: z.string(),
  error: z.string(),
});

export const EnrichBatchRequest = z.object({
  records: z.array(InputRecord).min(1).max(100),
  concurrency: z.number().min(1).max(50).default(10),
});
export type EnrichBatchRequest = z.infer<typeof EnrichBatchRequest>;

export const EnrichBatchResponse = z.object({
  enriched: z.array(EnrichedRecord),
  errors: z.array(EnrichmentError).default([]),
  total: z.number(),
  succeeded: z.number(),
  failed: z.number(),
  trace_id: z.string(),
});
export type EnrichBatchResponse = z.infer<typeof EnrichBatchResponse>;

API Contract

POST /enrich

Enrich a batch of records in parallel.

Request:

{
  "records": [
    {"name": "Acme Corp", "domain": "acme.com"},
    {"name": "Globex Inc", "domain": "globex.io"},
    {"name": "Initech", "email": "info@initech.com"}
  ],
  "concurrency": 10
}

Response (200):

{
  "enriched": [
    {
      "name": "Acme Corp",
      "domain": "acme.com",
      "industry": "Manufacturing",
      "size": "mid-market",
      "description": "Industrial manufacturing company specializing in consumer products",
      "score": 0.72,
      "tags": ["b2b", "manufacturing", "consumer-goods"],
      "enrichment_source": "llm"
    },
    {
      "name": "Globex Inc",
      "domain": "globex.io",
      "industry": "Technology",
      "size": "startup",
      "description": "Developer tools startup focused on API infrastructure",
      "score": 0.85,
      "tags": ["b2b", "saas", "developer-tools"],
      "enrichment_source": "llm"
    }
  ],
  "errors": [
    {"name": "Initech", "error": "Enrichment timed out after 30s"}
  ],
  "total": 3,
  "succeeded": 2,
  "failed": 1,
  "trace_id": "c3d4e5f6-a7b8-9012-cdef-345678901234"
}

Errors:

Status Body When
400 {"error": "Invalid request", "details": [...]} Empty records list or invalid concurrency
413 {"error": "Batch too large", "max_records": 100} More than 100 records
500 {"error": "Batch processing failed"} All records failed

GET /health

Returns {"status": "ok"}.

Tool Specifications

This agent has no external tools. Each enrichment is a pure LLM call with structured output (result_type=EnrichedRecord). The agent uses the LLM's world knowledge to infer industry, size, and classification.

For production use, you would add tools for external data sources (Clearbit, LinkedIn, Crunchbase), but the core pattern — parallel fan-out with semaphore — remains the same.

Prompt Specifications

Enrichment prompt (per record)

You are a data enrichment specialist. Given a company or entity record,
enrich it with structured information.

Record to enrich:
{record_json}

For this record, determine:
1. Industry vertical (e.g., "Technology", "Healthcare", "Manufacturing")
2. Company size: startup (<50), small (50-200), mid-market (200-1000), enterprise (1000+), or unknown
3. A one-line description of what the company does
4. A relevance/quality score from 0.0 to 1.0 (how confident you are in the enrichment)
5. Classification tags (2-5 tags)

Base your assessment on the company name, domain, and any additional context provided.
If you're uncertain, set score lower and size to "unknown". Do not fabricate specific
revenue figures or employee counts.

Design rationale:

  • "Do not fabricate specific revenue figures" — LLMs will confidently hallucinate numbers. The prompt constrains output to categorical assessments (size buckets, industry labels) where the model is more reliable.
  • Score as confidence signal — Downstream consumers can filter enrichments below a threshold. Low-confidence enrichments are still useful but flagged.
  • Categorical size buckets — More reliable than asking for exact employee counts. The model can reasonably distinguish "startup" from "enterprise" but not "487 employees" from "512 employees."

Key files

Follows the canonical project layoutapp/ package for Python, src/ for TypeScript, tests/{unit,integration,eval}/ for both.

Python track

File Role
app/main.py FastAPI entrypoint with lifespan, routers, health check
app/settings.py Config: model name, default concurrency, batch size limit
app/models/schemas.py All Pydantic models: InputRecord, EnrichedRecord, EnrichBatchRequest/Response
app/agent/enricher.py Pydantic AI agent with result_type=EnrichedRecord
app/agent/pipeline.py Fan-out orchestrator: split → semaphore → gather → aggregate
app/api/enrich.py /enrich endpoint — accepts batch, returns enriched batch

TypeScript track

File Role
src/index.ts Hono entrypoint with routes and health check
src/config.ts Zod-validated env config
src/schemas/index.ts All Zod schemas
src/agent/enricher.ts generateObject({ schema: EnrichedRecord }) per record
src/agent/pipeline.ts Fan-out: Promise.all() with concurrency limiter
src/api/enrich.ts /enrich route handler

Key implementation pattern (Python)

import asyncio
from pydantic_ai import Agent

agent = Agent("anthropic:claude-sonnet-4-6-20250514", result_type=EnrichedRecord)

async def enrich_one(record: InputRecord, semaphore: asyncio.Semaphore) -> EnrichedRecord:
    async with semaphore:
        result = await agent.run(f"Enrich this record: {record.model_dump_json()}")
        return result.data

async def enrich_batch(records: list[InputRecord], concurrency: int = 10) -> list[EnrichedRecord | Exception]:
    semaphore = asyncio.Semaphore(concurrency)
    tasks = [enrich_one(r, semaphore) for r in records]
    return await asyncio.gather(*tasks, return_exceptions=True)

Key implementation pattern (TypeScript)

import pLimit from "p-limit";
import { anthropic } from "@ai-sdk/anthropic";
import { generateObject } from "ai";

const limit = pLimit(10);

async function enrichBatch(records: InputRecord[]): Promise<(EnrichedRecord | Error)[]> {
  return Promise.all(
    records.map((record) =>
      limit(async () => {
        const result = await generateObject({
          model: anthropic(config.enrichModel),
          schema: EnrichedRecord,
          prompt: `Enrich this record: ${JSON.stringify(record)}`,
        });
        return result.object;
      })
    )
  );
}

Implementation Roadmap

Step Task Key deliverables
1 Project scaffolding FastAPI/Hono app with /health, settings, structured logging
2 Data models All Pydantic + Zod schemas for input, enriched, batch request/response
3 Enricher agent Pydantic AI agent with result_type=EnrichedRecord, system prompt
4 Fan-out pipeline asyncio.gather() / Promise.all() with semaphore concurrency control
5 Error handling return_exceptions=True, separate succeeded/failed in response
6 API endpoint POST /enrich with batch size validation, trace ID
7 Cross-cutting JWT auth, rate limiting, Langfuse tracing (one span per record)
8 Unit tests Schema validation, semaphore behavior, error aggregation
9 Integration + eval Batch enrichment with real LLM, promptfoo security scan

Environment & Deployment

Environment variables

Variable Required Default Description
ANTHROPIC_API_KEY Yes Anthropic API key
ENRICH_MODEL No claude-sonnet-4-6-20250514 Model for enrichment
DEFAULT_CONCURRENCY No 10 Default parallel LLM calls
MAX_BATCH_SIZE No 100 Maximum records per request
DATABASE_URL No postgresql+asyncpg://agent:agent@localhost:5432/agent_db Postgres connection
REDIS_URL No redis://localhost:6379 Redis for rate limiting
LANGFUSE_PUBLIC_KEY No pk-lf-local Langfuse public key
LANGFUSE_SECRET_KEY No sk-lf-local Langfuse secret key
LANGFUSE_HOST No http://localhost:3000 Langfuse server URL
JWT_SECRET No change-me-in-production JWT signing secret
APP_ENV No development Environment name
LOG_LEVEL No INFO Log level

Docker Compose

See Docker Compose template for base infrastructure. This agent needs: Redis, Langfuse. Postgres is optional.

Infrastructure dependencies

Component Required? Why
Postgres Optional Only if persisting enrichment results (batch processing is stateless)
Redis Yes Rate limiting backend
Qdrant No Not needed — this agent enriches records, not retrieves documents
Langfuse Recommended Per-record LLM call tracing (skip for local dev)

Test Strategy

Unit tests

def test_enriched_record_score_bounds():
    """Score must be between 0.0 and 1.0."""
    with pytest.raises(ValidationError):
        EnrichedRecord(name="x", industry="tech", size="startup",
                       description="d", score=1.5, tags=[])

def test_batch_request_max_size():
    """Batch requests are limited to 100 records."""
    records = [InputRecord(name=f"Company {i}") for i in range(101)]
    with pytest.raises(ValidationError):
        EnrichBatchRequest(records=records)

async def test_semaphore_limits_concurrency(mock_llm_client):
    """At most N enrichments run concurrently."""
    # Track max concurrent calls with a counter
    # Assert never exceeds semaphore limit

async def test_failed_records_dont_kill_batch(mock_llm_client):
    """One failing record doesn't prevent others from completing."""
    # Mock agent to fail on record 2 of 5
    # Assert 4 succeeded, 1 in errors list

Integration tests (main branch only)

async def test_batch_enrichment_e2e():
    """Enrich 3 records with real LLM, all should succeed."""
    response = await client.post("/enrich", json={
        "records": [
            {"name": "Stripe", "domain": "stripe.com"},
            {"name": "Anthropic", "domain": "anthropic.com"},
            {"name": "Vercel", "domain": "vercel.com"},
        ]
    })
    assert response.status_code == 200
    data = response.json()
    assert data["succeeded"] == 3
    assert data["failed"] == 0
    assert all(r["industry"] for r in data["enriched"])

Eval assertions

  • Well-known companies (Stripe, Google) get score ≥ 0.8
  • Unknown/ambiguous names get score < 0.5 and size "unknown"
  • Industry classification is reasonable (Stripe → "Technology" or "Fintech")
  • Batch of 10 completes within 30s at concurrency=10

Eval Dataset

{"input": {"records": [{"name": "Stripe", "domain": "stripe.com"}]}, "expected_industry": "Technology", "expected_min_score": 0.8}
{"input": {"records": [{"name": "Mayo Clinic", "domain": "mayoclinic.org"}]}, "expected_industry": "Healthcare", "expected_min_score": 0.7}
{"input": {"records": [{"name": "XYZZY Corp"}]}, "expected_size": "unknown", "expected_max_score": 0.5}
{"input": {"records": [{"name": "Toyota", "domain": "toyota.com"}]}, "expected_industry": "Automotive", "expected_min_score": 0.8}
{"input": {"records": [{"name": "Anthropic", "domain": "anthropic.com"}]}, "expected_industry": "Technology", "expected_min_score": 0.8}
{"input": {"records": [{"name": "a"}, {"name": "b"}, {"name": "c"}]}, "expected_total": 3}

See eval-data guide for generation + curation patterns.

Design Decisions

  • Semaphore-based concurrency: asyncio.Semaphore(10) limits parallel LLM calls to 10. Prevents rate-limit exhaustion while maximizing throughput.
  • return_exceptions=True: Individual record failures don't kill the batch. Failed records are reported separately in the errors list.
  • Structured output per record: result_type=EnrichedRecord ensures every enrichment returns validated structured data, not free text.
  • Pydantic AI for simplicity: No graph or workflow needed. Raw asyncio.gather() with Pydantic AI agents is the cleanest pattern for parallel independent work.
  • Configurable concurrency: The caller controls parallelism per request. High-priority small batches can use higher concurrency; large batches can throttle down.