A modern, type-safe, composable Python framework for building AI workflows with streaming as the primary interface. Built on Pydantic and pydantic-ai, every node exposes an async stream of progress, with non-streaming results produced by consuming the stream internally.
- Streaming-Native: Every node streams progress items as its primary interface (
astream()), withrun()as a convenience wrapper - Type-Safe by Design: Full Python 3.14+ generics support with comprehensive IDE integration
- Pydantic-Powered: Built around
BaseModelfor schema definition and validation - Reference-Based Wiring: Connect nodes using
.outputreferences, not magic strings - pydantic-ai Integration: Use user-supplied pydantic-ai agents directly - no wrapping or renaming
- Progress Vocabulary: Small, focused set of progress items (tokens, tool calls, retrievals, partial fields, errors, heartbeats)
- Streaming Parser: Tolerant incremental JSON parsing with partial field extraction
- Automatic DAG Resolution: Intelligent dependency tracking and execution ordering (learn more)
- Human-in-the-Loop (HITL): Comprehensive interruption, approval, and resumption support (learn more)
- Production-Ready Prompt Library: Standalone templating system with multiple engines (learn more)
- Serializable & Inspectable: Full observability and debugging support with OpenTelemetry integration
pip install pydantic-flowimport asyncio
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_flow import AgentNode, iter_tokens
class Query(BaseModel):
question: str
# Create a user-supplied pydantic-ai agent
agent = Agent(
"openai:gpt-4",
instructions="Be concise and helpful."
)
async def main():
# Create a streaming agent node
node = AgentNode[Query, str](
agent=agent,
prompt_template="{question}",
name="answer_node"
)
query = Query(question="What is the capital of France?")
# Stream tokens as they arrive
print("Streaming response:")
async for token in iter_tokens(node.astream(query)):
print(token, end="", flush=True)
# Or get the final result (consumes stream internally)
result = await node.run(query)
print(f"\n\nFinal result: {result}")
if __name__ == "__main__":
asyncio.run(main())# The run() method consumes the stream and returns the final validated result
result = await node.run(query)
print(result) # Fully typed and validatedEvery node in pydantic-flow implements astream() as its primary interface:
async def astream(self, input_data: InputT) -> AsyncIterator[ProgressItem]:
"""Stream progress items: start, tokens, tools, fields, end."""The run() method is a convenience wrapper that consumes the stream:
async def run(self, input_data: InputT) -> OutputT:
"""Consume the stream and return the final validated result."""Nodes yield a small vocabulary of progress items during execution:
- StreamStart: Execution begins
- TokenChunk: Text token from LLM
- PartialFields: Incremental structured field updates
- ToolCall: Tool invocation declared
- ToolArgProgress: Tool arguments forming
- ToolResult: Tool execution completed
- RetrievalItem: Search/retrieval result
- NonFatalError: Recoverable error or warning
- StreamEnd: Execution completes
- Heartbeat: Liveness signal during long operations
Nodes are the building blocks of your workflow. Each node is typed with input and output models:
Calls an LLM using a templated prompt. Supports simple string templates or full PromptTemplate/ChatPromptTemplate objects with multiple formats (f-string, Jinja2, Mustache):
# Simple string template (backward compatible)
weather_prompt = PromptNode[WeatherQuery, str](
prompt="Get weather for {location} in {temperature_unit}",
config=PromptConfig(model="openai:gpt-4"),
name="weather_prompt"
)
# Advanced: Full PromptTemplate with Jinja2
from pydantic_flow import PromptTemplate, TemplateFormat
template = PromptTemplate[WeatherQuery, str](
template="""Get weather for {{ location }}.
{% if temperature_unit %}
Use {{ temperature_unit }} for temperature.
{% endif %}""",
format=TemplateFormat.JINJA2,
input_model=WeatherQuery,
)
weather_prompt = PromptNode[WeatherQuery, str](
prompt=template,
config=PromptConfig(model="openai:gpt-4"),
name="weather_prompt"
)
# With structured output parser
from pydantic_flow import JsonModelParser
structured_prompt = PromptNode[WeatherQuery, WeatherInfo](
prompt=template,
config=PromptConfig(model="openai:gpt-4"),
output_parser=JsonModelParser(model=WeatherInfo),
name="structured_weather"
)Applies a function to transform data:
weather_parser = ParserNode[str, WeatherInfo](
parser_func=parse_weather_text,
input=weather_prompt.output,
name="weather_parser"
)Calls external tools/APIs:
weather_api = ToolNode[WeatherQuery, WeatherInfo](
tool_func=call_weather_service,
name="weather_api"
)Wraps nodes with retry logic:
reliable_api = RetryNode(
wrapped_node=weather_api,
max_retries=3,
name="reliable_weather"
)Enables conditional branching:
weather_branch = IfNode(
predicate=lambda data: data.temperature > 25,
if_true=hot_weather_node,
if_false=normal_weather_node,
input=weather_api.output,
name="weather_condition"
)Enable fan-in patterns where multiple node outputs combine into one:
# Multiple nodes producing different data
analysis_node = ToolNode[Input, Analysis](
tool_func=analyze_data,
name="analysis"
)
facts_node = ToolNode[Input, Facts](
tool_func=gather_facts,
name="facts"
)
metadata_node = ToolNode[Input, Metadata](
tool_func=get_metadata,
name="metadata"
)
# Merge multiple inputs into single output
def combine_all(analysis: Analysis, facts: Facts, metadata: Metadata) -> Report:
return Report(
analysis=analysis,
facts=facts,
metadata=metadata
)
merge_node = MergeToolNode[Analysis, Facts, Metadata, Report](
inputs=(analysis_node.output, facts_node.output, metadata_node.output),
tool_func=combine_all,
name="merge"
)
# MergePromptNode for combining inputs into LLM prompts
merge_prompt = MergePromptNode[Analysis, Facts, str](
inputs=(analysis_node.output, facts_node.output),
prompt="Analyze: {analysis}\nFacts: {facts}\nProvide summary:",
model="openai:gpt-4",
name="llm_merge"
)
# Continue processing merged result
final_node = ToolNode[Report, FinalOutput](
tool_func=finalize_report,
input=merge_node.output,
name="final"
)
# Pattern: A -> B,C,D -> E -> F
flow.add_nodes(analysis_node, facts_node, metadata_node, merge_node, final_node)The Flow class manages execution with automatic dependency resolution:
flow = Flow()
flow.add_nodes(node1, node2, node3)
# Automatic topological sorting
execution_order = flow.get_execution_order()
# Type-safe execution
results = await flow.run(input_data)pydantic-flow supports cycles and conditional control flow using a stepper-based execution engine. This enables patterns like while-loops, retry logic, and dynamic routing based on state.
A node that loops back to itself until a condition is met:
from pydantic import BaseModel
from pydantic_flow import Flow, Route, RunConfig
from pydantic_flow.core.routing import T_Route
from pydantic_flow.nodes import BaseNode
class CounterState(BaseModel):
n: int
class OutputState(BaseModel):
tick: CounterState
class TickNode(BaseNode[CounterState, CounterState]):
async def run(self, input_data: CounterState) -> CounterState:
return CounterState(n=input_data.n + 1)
# Create flow with loop
flow = Flow(input_type=CounterState, output_type=OutputState)
tick_node = TickNode(name="tick")
flow.add_nodes(tick_node)
flow.set_entry_nodes("tick")
# Add conditional routing
def router(state: BaseModel) -> T_Route:
tick_state = getattr(state, "tick", None)
if tick_state and tick_state.n >= 5:
return Route.END # Terminate
return "tick" # Loop back
flow.add_conditional_edges("tick", router)
# Compile and execute
compiled = flow.compile()
config = RunConfig(max_steps=50)
result = await compiled.invoke(CounterState(n=0), config)
# result.tick.n == 5Plan-execute pattern with conditional routing:
from pydantic import BaseModel
from pydantic_flow import Flow, Route, RunConfig
from pydantic_flow.nodes import BaseNode
class WorkState(BaseModel):
iterations: int
total: int
class FullOutput(BaseModel):
plan: WorkState
execute: WorkState
class PlanNode(BaseNode[WorkState, WorkState]):
async def run(self, input_data: WorkState) -> WorkState:
return WorkState(
iterations=input_data.iterations + 1,
total=input_data.total
)
class ExecuteNode(BaseNode[WorkState, WorkState]):
async def run(self, input_data: WorkState) -> WorkState:
new_total = input_data.total + (input_data.iterations * 10)
return WorkState(iterations=input_data.iterations, total=new_total)
# Create flow with two-node loop
flow = Flow(input_type=WorkState, output_type=FullOutput)
plan_node = PlanNode(name="plan")
execute_node = ExecuteNode(name="execute")
flow.add_nodes(plan_node, execute_node)
flow.set_entry_nodes("plan")
flow.add_edge("plan", "execute")
# Router decides: loop back to plan or END
def router(state: BaseModel) -> T_Route:
execute_state = getattr(state, "execute", None)
if execute_state and execute_state.iterations >= 5:
return Route.END
return "plan"
flow.add_conditional_edges("execute", router)
# Execute with safety limits
compiled = flow.compile()
config = RunConfig(max_steps=50, trace_iterations=True)
result = await compiled.invoke(WorkState(iterations=0, total=0), config)Map router outcomes to target nodes:
def boolean_router(state: BaseModel) -> str:
# Return a key that maps to a target node
should_proceed = getattr(state, "ready", False)
return "proceed" if should_proceed else "retry"
flow.add_conditional_edges(
"checker",
boolean_router,
mapping={"proceed": "next_step", "retry": "checker"}
)Production flows should handle potential errors gracefully. Here's a comprehensive error handling pattern:
from pydantic_flow import (
Flow, Route, RunConfig,
RecursionLimitError, RoutingError, FlowTimeoutError, FlowError
)
import logging
logger = logging.getLogger(__name__)
async def run_with_error_handling():
"""Example of proper error handling for loop-capable flows."""
flow = create_my_flow()
compiled = flow.compile()
config = RunConfig(
max_steps=100,
timeout_seconds=60,
trace_iterations=True
)
try:
result = await compiled.invoke(initial_state, config)
logger.info(f"Flow completed successfully: {result}")
return result
except RecursionLimitError as e:
# Flow exceeded max_steps - likely an infinite loop
logger.error(f"Flow hit iteration limit: {e}")
# Consider: save partial state, alert monitoring, etc.
raise
except FlowTimeoutError as e:
# Flow took too long - may need more time or optimization
logger.error(f"Flow execution timed out: {e}")
# Consider: increase timeout, optimize nodes, add caching
raise
except RoutingError as e:
# Invalid router configuration - this is a programming error
logger.error(f"Router configuration error: {e}")
# This should be caught in testing - fix your routers!
raise
except FlowError as e:
# Other flow execution errors
logger.error(f"Flow execution failed: {e}")
raise
except Exception as e:
# Unexpected errors (should be rare)
logger.exception(f"Unexpected error in flow execution: {e}")
raiseCommon Error Scenarios:
- RecursionLimitError: Usually indicates a router that never returns
Route.END. Check your termination conditions. - FlowTimeoutError: Either increase
timeout_secondsor optimize slow nodes. Check node execution times. - RoutingError: Programming error - router returned invalid node name. Validate router logic and use mapping dicts for type safety.
Quick Examples:
# RecursionLimitError: Raised when max_steps is exceeded
config = RunConfig(max_steps=25) # Default limit
try:
result = await compiled.invoke(input_data, config)
except RecursionLimitError as e:
print(f"Loop exceeded limit: {e}")
# e includes recent iteration trace
# RoutingError: Raised when router returns an invalid target
def bad_router(state: BaseModel) -> str:
return "nonexistent_node" # RoutingError!
flow.add_conditional_edges("start", bad_router)
# FlowTimeoutError: Raised when execution exceeds time limit
config = RunConfig(timeout_seconds=30)
try:
result = await compiled.invoke(input_data, config)
except FlowTimeoutError:
print("Flow execution timed out")- Route.END: Special sentinel to terminate flow execution
- Conditional Edges: Dynamic routing based on current state
- RunConfig: Control
max_steps,timeout_seconds, andtrace_iterations - Stepper Engine: Automatically selected for flows with cycles or conditional edges
- Type Safety: Router functions receive and return typed values
See examples/loops/ for complete runnable examples.
pydantic-flow supports two patterns for building flows, each suited to different use cases:
Best for acyclic workflows where dependencies are clear and linear:
from pydantic_flow.nodes import NodeWithInput
class ProcessNode(NodeWithInput[DataType, ResultType]):
"""Node with explicit input dependency."""
def __init__(self, input_node: BaseNode[Any, DataType]):
self.input = input_node.output
super().__init__(name="process")
async def run(self, input_data: DataType) -> ResultType:
# Process the data from input_node
return process(input_data)
# Build flow - execution order determined automatically
flow = Flow(input_type=InputType, output_type=OutputType)
fetch = FetchNode(name="fetch")
process = ProcessNode(input_node=fetch) # Dependency declared in constructor
transform = TransformNode(input_node=process)
flow.add_nodes(fetch, process, transform)
# No need to call set_entry_nodes() - inferred from dependencies
# No explicit edges needed - determined via topological sort
result = await flow.run(input_data)When to use:
- Linear or tree-like data pipelines
- No loops or conditional routing needed
- Dependencies naturally expressed through node constructors
Required for loops, conditional routing, or complex control flow:
flow = Flow(input_type=StateType, output_type=StateType)
# Add all nodes first
plan = PlanNode(name="plan")
execute = ExecuteNode(name="execute")
flow.add_nodes(plan, execute)
# Explicitly define entry point (required for stepper engine)
flow.set_entry_nodes("plan")
# Add static edges
flow.add_edge("plan", "execute")
# Add conditional routing
def should_continue(state: BaseModel) -> T_Route:
execute_state = getattr(state, "execute")
if execute_state.iterations >= 5:
return Route.END
return "plan" # Loop back to start
flow.add_conditional_edges("execute", should_continue)
# Compile (automatically uses stepper engine due to conditional edges)
compiled = flow.compile()
result = await compiled.invoke(initial_state, config)When to use:
- Loops or cycles in the workflow
- Conditional routing based on state
- Complex control flow patterns
- Multiple entry points
You can also explicitly control which engine to use:
from pydantic_flow import ExecutionMode
# Force DAG mode (will error if cycles/conditional edges exist)
compiled = flow.compile(mode=ExecutionMode.DAG)
# Force stepper engine (works with any flow structure)
compiled = flow.compile(mode=ExecutionMode.STEPPER)
# Auto-detect (default) - uses stepper if needed, otherwise DAG
compiled = flow.compile(mode=ExecutionMode.AUTO)Don't mix implicit and explicit construction in the same flow:
# β BAD: Mixing patterns causes confusion
flow = Flow(...)
node_a = NodeWithInput(input_node=start) # Implicit dependency
flow.add_nodes(start, node_a)
flow.add_edge("start", "node_b") # Explicit edge
flow.set_entry_nodes("start") # Explicit entry
# Result: Unclear execution order, potential conflictsInstead, choose one pattern consistently:
# β
GOOD: Pure explicit pattern
flow = Flow(...)
flow.add_nodes(start, node_a, node_b)
flow.set_entry_nodes("start")
flow.add_edge("start", "node_a")
flow.add_edge("node_a", "node_b")If you need both patterns, use sub-flows (FlowNode) to compose them separately.
The FlowNode enables hierarchical composition by wrapping complete flows as nodes:
# Create a reusable sub-flow
research_flow = Flow(input_type=UserQuery, output_type=ResearchResults)
research_node = ToolNode[UserQuery, ResearchData](
tool_func=research_api,
name="research"
)
research_flow.add_nodes(research_node)
# Wrap the sub-flow as a node
research_flow_node = FlowNode[UserQuery, ResearchResults](
flow=research_flow,
name="research_sub_flow"
)
# Use in a parent flow
parent_flow = Flow(input_type=UserQuery, output_type=FinalResults)
summary_node = ToolNode[ResearchResults, SummaryData](
tool_func=generate_summary,
input=research_flow_node.output, # Chain sub-flows
name="summary"
)
parent_flow.add_nodes(research_flow_node, summary_node)
# Execute hierarchical workflow
results = await parent_flow.run(query)
research_data = results.research_sub_flow.research
summary_data = results.summaryKey benefits of sub-flow composition:
- Hierarchical Architecture: Build complex workflows from simpler, reusable components
- Encapsulation: Sub-flows hide internal complexity behind clean interfaces
- Reusability: Same sub-flow can be used in multiple parent flows
- Type Safety: Full type checking across sub-flow boundaries
- Testing: Sub-flows can be tested in isolation
FlowNode supports three memory modes to control how conversation history is shared between parent and sub-flows:
from pydantic_flow.memory import MemoryMode
# SHARED: Sub-flow uses parent's memory directly (default)
shared_node = FlowNode[Query, Result](
flow=sub_flow,
name="shared_sub_flow",
memory_mode=MemoryMode.SHARED
)
# ISOLATED: Sub-flow gets separate memory
isolated_node = FlowNode[Query, Result](
flow=sub_flow,
name="isolated_sub_flow",
memory_mode=MemoryMode.ISOLATED,
seed_isolated_memory=True # Optional: copy parent's messages for context
)
# READONLY: Sub-flow has read-only access to parent memory
readonly_node = FlowNode[Query, Result](
flow=sub_flow,
name="readonly_sub_flow",
memory_mode=MemoryMode.READONLY
)Memory Mode Behaviors:
- SHARED (default): Changes to memory in sub-flow propagate to parent flow
- ISOLATED: Sub-flow gets a separate memory; changes don't affect parent
seed_isolated_memory=False: Sub-flow starts with empty memoryseed_isolated_memory=True: Sub-flow gets copy of parent's message history
- READONLY: Sub-flow can read parent memory but cannot modify it
- Attempts to append/extend/clear memory raise
ReadOnlyMemoryError
- Attempts to append/extend/clear memory raise
Use cases:
- SHARED: Default behavior for most scenarios where full context is needed
- ISOLATED: Parallel sub-flows that shouldn't see each other's messages
- READONLY: Sub-flows that need context but shouldn't pollute parent memory
pydantic-flow provides comprehensive support for workflows that require human intervention, review, or approval:
from pydantic_flow.nodes.human import HumanNode, ApprovalNode, HumanResponse
from pydantic_flow.core.errors import InterruptionRequested
# Create nodes that require human input
review_node = HumanNode[ContentInput, HumanResponse](
prompt="Please review this content"
)
approval_node = ApprovalNode[ProcessedData](
prompt="Approve for publication?"
)
# Build flow with human checkpoints
flow = Flow(input_type=ContentInput, output_type=HumanResponse)
processor = PromptNode[ContentInput, str](prompt="Process content...")
flow.add_node(processor)
flow.add_node(review_node, dependencies=[processor])
# Execute - will interrupt when human input needed
try:
result = await flow.run(input_data)
except InterruptionRequested as exc:
checkpoint = exc.checkpoint
# Present to human, get response
response = get_human_input(checkpoint.interrupt_reason)
# Resume execution
result = await flow.resume(checkpoint, inputs=input_data)Key Features:
- Three-layer interruption: Event-level, node-level, and flow-level handlers
- Priority-based handlers: Control execution order (0-100, lower executes first)
- Checkpoints: Serialize workflow state for resumption
- HumanNode: Always interrupts for human input with dynamic prompts and options
- ApprovalNode: Specialized node for yes/no decisions
- Conditional interruption: Only interrupt when specific criteria are met
See docs/hitl.md for complete HITL documentation.
# Full IDE support with autocomplete
weather_node: PromptNode[WeatherQuery, str] = ...
parsed_node: ParserNode[str, WeatherInfo] = ...
# Compile-time type checking
weather_info = results["weather_api"] # Type: WeatherInfo
temperature = weather_info.temperature # Type: float# β
Type-safe references
parser_node = ParserNode(
parser_func=parse_data,
input=prompt_node.output # NodeOutput[str]
)
# β No magic strings
parser_node = ParserNode(
parser_func=parse_data,
input="prompt_node.output" # Fragile!
)class MyInput(BaseModel):
query: str
options: list[str] = []
class MyOutput(BaseModel):
result: str
confidence: float
# Automatic validation and serialization
node = ToolNode[MyInput, MyOutput](...)pydantic-flow includes a production-ready prompt templating system that can be used standalone or integrated with workflows:
from pydantic import BaseModel
from pydantic_flow.prompt import (
PromptTemplate,
ChatPromptTemplate,
TemplateFormat,
ChatMessage,
ChatRole,
JsonModelParser
)
# Define typed models
class WeatherQuery(BaseModel):
location: str
unit: str = "celsius"
class WeatherResponse(BaseModel):
temperature: float
condition: str
# Create type-safe template with parser
template = PromptTemplate[WeatherQuery, WeatherResponse](
template="What's the weather in {location} ({unit})?",
format=TemplateFormat.F_STRING,
input_model=WeatherQuery,
output_parser=JsonModelParser(WeatherResponse)
)
# Render with validation
query = WeatherQuery(location="Paris")
prompt_text = template.render(query)
# Or render and parse in one step
result: WeatherResponse = await template.render_and_parse(query)
# Chat templates with multiple messages
chat_template = ChatPromptTemplate[WeatherQuery, str](
messages=[
ChatMessage(role=ChatRole.SYSTEM, content="You are a weather assistant."),
ChatMessage(role=ChatRole.USER, content="Weather in {location}?")
],
format=TemplateFormat.JINJA2,
input_model=WeatherQuery
)
messages = chat_template.render_messages(query)Key Features:
- Multiple template engines: f-string, Jinja2, Mustache
- Type-safe input/output with Pydantic models
- Built-in output parsers (JSON, delimited, custom)
- OpenTelemetry observability integration
- Chat message support with role-based formatting
- Variable validation and extraction
# Multi-path workflow with branching
flow = Flow()
# Initial processing
data_node = ToolNode[Input, Data](...)
# Conditional branching
branch_node = IfNode(
predicate=lambda x: x.value > threshold,
if_true=high_value_processor,
if_false=low_value_processor,
input=data_node.output
)
# Merge results
final_node = ParserNode[ProcessedData, Output](
input=branch_node.output,
...
)
flow.add_nodes(data_node, branch_node, final_node)# Automatic retries with backoff
retry_node = RetryNode(
wrapped_node=unreliable_api,
max_retries=5
)
# Flow-level error handling
try:
results = await flow.run(input_data)
except FlowError as e:
print(f"Workflow failed: {e}")# Inspect execution order
print(flow.get_execution_order())
# Access intermediate results
results = await flow.run(input_data)
intermediate = results["parser_node"]
final = results["summary_node"]Built on modern Python foundations:
- Python 3.14+: Latest type system features
- Pydantic: Data validation and serialization
- Async-First: All operations are async by default
- Modern Generics:
Node[Input, Output]syntax - Entry Points: Plugin system for extensibility
We welcome contributions! See CONTRIBUTING.md for guidelines.
MIT License - see LICENSE for details.
- pydantic-ai - The foundational AI framework we build upon
- Pydantic - Data validation using Python type annotations
Made with β€οΈ for the Python AI community
- Tools: HTTP, filesystem, vector operations out of the box
- Memory: Persistent conversation history and context management
- Observability: Built-in tracing with OpenTelemetry
- Durability: State persistence and recovery mechanisms
- CLI: Complete command-line interface for agent management
- Full type annotations with comprehensive IDE support
- Modern Python 3.14+ syntax (
A | B,dict/list/tuple) - Pydantic models for all public interfaces
- Runtime validation with detailed error messages
- Async-first design with sync wrappers where appropriate
- Auto-discovery of plugins and tools
- Fast-running comprehensive test suite
- Rich terminal output and debugging
- Plugin architecture inspired by pytest ecosystem
- External libraries can create
pydantic-flow-*extensions - Type-safe protocol-based interfaces
- Dependency injection with automatic wiring
pip install pydantic-flowgit clone https://github.com/joshSzep/pydantic-flow.git
cd pydantic-flow
uv sync# Coming soon!pydantic-flow follows core principles that ensure scalability and maintainability:
- Long-Lived Agents: Persistent objects with integrated memory and tools
- Immutable State: Idempotent APIs over stateful operations
- Fail Fast: Rich exception context with custom error types
- Dependency Injection: Auto-discovery with explicit configuration
For detailed architectural decisions and patterns, see AGENTS.md.
- Python 3.14+
- uv for dependency management
# Install dependencies
uv sync
# Install pre-commit hooks
uv run pre-commit install
# Run tests
uv run pytest
# Run linting
uv run ruff check
uv run ruff formatpydantic-flow/
βββ src/pydantic_flow/ # Core framework
βββ tests/ # Test suite
βββ docs/ # Documentation
βββ AGENTS.md # Architecture guide
βββ pyproject.toml # Project configuration
We welcome contributions! Please see our Contributing Guide for details.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Run tests and linting (
uv run pytest && uv run ruff check) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Inspired by LangChain - the AI agent framework to rival
- Built on pydantic-ai - the foundation for type-safe AI agents
- Inspired by the Pydantic ecosystem's commitment to developer experience
- Plugin architecture inspired by pytest's extensible design
Status: Early development (v0.1.0) - APIs may change