This cookbook provides practical, copy-paste recipes for common integration scenarios with the Katana Manufacturing ERP API.
- Inventory Management
- Order Processing
- Manufacturing Operations
- Error Handling Patterns
- Integration Patterns
- Observability
- Performance Optimization
- Testing Patterns
All recipes assume you have:
# .env file with your API key
KATANA_API_KEY=your-api-key-here
KATANA_BASE_URL=https://api.katanamrp.com/v1 # OptionalKeep Katana inventory synchronized with an external warehouse management system.
import asyncio
from typing import Any
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.inventory import get_all_inventory_point
from katana_public_api_client.api.variant import get_all_variants
from katana_public_api_client.utils import unwrap_data
async def sync_inventory_from_warehouse(warehouse_data: list[dict[str, Any]]) -> dict[str, int]:
"""
Sync inventory levels from external warehouse system.
Args:
warehouse_data: List of dicts with 'sku' and 'quantity' keys
Returns:
Dict with 'updated', 'skipped', 'errors' counts
Example warehouse_data:
[
{"sku": "WDG-001", "quantity": 150},
{"sku": "WDG-002", "quantity": 75},
]
"""
stats = {"updated": 0, "skipped": 0, "errors": 0}
async with KatanaClient() as client:
# Get all variants to build SKU -> variant_id lookup
from katana_public_api_client.api.variant import get_all_variants
variants_response = await get_all_variants.asyncio_detailed(client=client)
variants = unwrap_data(variants_response)
# Build SKU lookup map
sku_to_variant = {v.sku: v for v in variants if v.sku}
# Update inventory for each warehouse item
from katana_public_api_client.api.stock_adjustment import create_stock_adjustment
from katana_public_api_client.models import CreateStockAdjustmentRequest
for item in warehouse_data:
sku = item["sku"]
new_quantity = item["quantity"]
variant = sku_to_variant.get(sku)
if not variant:
print(f"Warning: SKU {sku} not found in Katana")
stats["skipped"] += 1
continue
try:
# Create stock adjustment to set new quantity
adjustment = CreateStockAdjustmentRequest(
variant_id=variant.id,
adjustment_type="set", # Set to absolute value
quantity=new_quantity,
note=f"Synced from warehouse system"
)
response = await create_stock_adjustment.asyncio_detailed(
client=client,
body=adjustment
)
if response.status_code == 201:
stats["updated"] += 1
print(f"✓ Updated {sku}: {new_quantity} units")
else:
stats["errors"] += 1
print(f"✗ Failed to update {sku}: {response.status_code}")
except Exception as e:
stats["errors"] += 1
print(f"✗ Error updating {sku}: {e}")
return stats
# Usage
if __name__ == "__main__":
warehouse_inventory = [
{"sku": "WDG-001", "quantity": 150},
{"sku": "WDG-002", "quantity": 75},
{"sku": "GADGET-A", "quantity": 200},
]
results = asyncio.run(sync_inventory_from_warehouse(warehouse_inventory))
print(f"\nSync complete: {results}")Identify products that need reordering.
import asyncio
from typing import Any
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.inventory import get_all_inventory_point
from katana_public_api_client.utils import unwrap_data
async def get_low_stock_alerts(threshold: int = 10) -> list[dict[str, Any]]:
"""
Get products below stock threshold with supplier information.
Args:
threshold: Minimum stock level before alert
Returns:
List of low stock items with details
"""
low_stock_items = []
async with KatanaClient() as client:
# Get all inventory points
inventory_response = await get_all_inventory_point.asyncio_detailed(
client=client
)
inventory_points = unwrap_data(inventory_response)
# Get variant details for items below threshold
from katana_public_api_client.api.variant import get_variant
for inv_point in inventory_points:
if inv_point.in_stock < threshold:
# Get variant details
variant_response = await get_variant.asyncio_detailed(
client=client,
id=inv_point.variant_id
)
if variant_response.parsed:
variant = variant_response.parsed
low_stock_items.append({
"sku": variant.sku,
"name": variant.name,
"current_stock": inv_point.in_stock,
"location": inv_point.location_name,
"reorder_point": getattr(inv_point, "reorder_point", None),
"variant_id": variant.id,
})
return low_stock_items
# Usage
if __name__ == "__main__":
low_stock = asyncio.run(get_low_stock_alerts(threshold=20))
print(f"Found {len(low_stock)} low stock items:\n")
for item in low_stock:
print(f" {item['sku']}: {item['current_stock']} units at {item['location']}")Detect and report negative inventory situations.
import asyncio
from datetime import datetime
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.inventory import get_all_negative_stock
from katana_public_api_client.utils import unwrap_data
async def monitor_negative_stock() -> list[dict]:
"""
Monitor and report negative stock situations.
Returns:
List of negative stock items with details
"""
async with KatanaClient() as client:
response = await get_all_negative_stock.asyncio_detailed(client=client)
negative_items = unwrap_data(response)
issues = []
for item in negative_items:
issues.append({
"variant_sku": item.variant_sku,
"variant_name": item.variant_name,
"location": item.location_name,
"quantity": item.in_stock, # Negative value
"timestamp": datetime.now().isoformat(),
})
return issues
# Usage
if __name__ == "__main__":
negative_stock = asyncio.run(monitor_negative_stock())
if negative_stock:
print(f"⚠️ WARNING: {len(negative_stock)} items with negative stock!\n")
for item in negative_stock:
print(f" {item['variant_sku']}: {item['quantity']} units at {item['location']}")
else:
print("✓ No negative stock issues")Efficiently create multiple sales orders from an external system.
import asyncio
from typing import Any
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.sales_order import create_sales_order
from katana_public_api_client.models import CreateSalesOrderRequest
async def process_bulk_orders(
orders: list[dict[str, Any]]
) -> tuple[list[int], list[dict]]:
"""
Process a batch of sales orders efficiently.
Args:
orders: List of order dicts with customer_id, items, etc.
Returns:
Tuple of (successful_order_ids, failed_orders)
Example order format:
{
"customer_id": 123,
"items": [
{"variant_id": 456, "quantity": 5, "price": 29.99},
],
"notes": "Rush order"
}
"""
successful = []
failed = []
async with KatanaClient() as client:
for order_data in orders:
try:
# Build sales order request
sales_order = CreateSalesOrderRequest(
customer_id=order_data["customer_id"],
sales_order_rows=[
{
"variant_id": item["variant_id"],
"quantity": item["quantity"],
"price": item["price"],
}
for item in order_data["items"]
],
notes=order_data.get("notes", ""),
)
response = await create_sales_order.asyncio_detailed(
client=client,
body=sales_order
)
if response.status_code == 201 and response.parsed:
successful.append(response.parsed.id)
print(f"✓ Created order {response.parsed.id}")
else:
failed.append({
"order": order_data,
"status": response.status_code,
"error": "Failed to create order"
})
except Exception as e:
failed.append({
"order": order_data,
"error": str(e)
})
print(f"✗ Error creating order: {e}")
return successful, failed
# Usage
if __name__ == "__main__":
orders_to_process = [
{
"customer_id": 123,
"items": [
{"variant_id": 456, "quantity": 5, "price": 29.99},
{"variant_id": 457, "quantity": 2, "price": 49.99},
],
"notes": "Express shipping requested"
},
{
"customer_id": 124,
"items": [
{"variant_id": 458, "quantity": 10, "price": 19.99},
],
},
]
successful, failed = asyncio.run(process_bulk_orders(orders_to_process))
print(f"\nProcessed: {len(successful)} successful, {len(failed)} failed")Find sales orders that are past their expected delivery date.
import asyncio
from datetime import datetime, timedelta
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.sales_order import get_all_sales_orders
from katana_public_api_client.utils import unwrap_data
async def check_overdue_orders(days_overdue: int = 0) -> list[dict]:
"""
Find all overdue sales orders.
Args:
days_overdue: Number of days past due date (0 = due today or earlier)
Returns:
List of overdue orders with details
"""
async with KatanaClient() as client:
# Get all open/in-progress orders
response = await get_all_sales_orders.asyncio_detailed(
client=client,
status="open" # or "in_progress"
)
orders = unwrap_data(response)
today = datetime.now().date()
cutoff_date = today - timedelta(days=days_overdue)
overdue = []
for order in orders:
if hasattr(order, 'expected_delivery_date') and order.expected_delivery_date:
# Parse the delivery date
delivery_date = order.expected_delivery_date
if isinstance(delivery_date, str):
delivery_date = datetime.fromisoformat(delivery_date).date()
if delivery_date <= cutoff_date:
days_late = (today - delivery_date).days
overdue.append({
"order_id": order.id,
"customer_name": getattr(order, "customer_name", "Unknown"),
"expected_date": delivery_date.isoformat(),
"days_late": days_late,
"status": order.status,
})
return sorted(overdue, key=lambda x: x["days_late"], reverse=True)
# Usage
if __name__ == "__main__":
overdue_orders = asyncio.run(check_overdue_orders(days_overdue=0))
if overdue_orders:
print(f"⚠️ {len(overdue_orders)} overdue orders:\n")
for order in overdue_orders:
print(f" Order #{order['order_id']}: {order['days_late']} days late")
print(f" Customer: {order['customer_name']}")
print(f" Expected: {order['expected_date']}\n")
else:
print("✓ No overdue orders")Get real-time view of manufacturing capacity and status.
import asyncio
from collections import defaultdict
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.manufacturing_order import get_all_manufacturing_orders
from katana_public_api_client.utils import unwrap_data
async def check_manufacturing_capacity() -> dict[str, int]:
"""
Get real-time view of manufacturing capacity by status.
Returns:
Dict with counts by status (planned, in_progress, done, etc.)
"""
async with KatanaClient() as client:
response = await get_all_manufacturing_orders.asyncio_detailed(client=client)
manufacturing_orders = unwrap_data(response)
# Count by status
status_counts = defaultdict(int)
for mo in manufacturing_orders:
status_counts[mo.status] += 1
return dict(status_counts)
# Usage
if __name__ == "__main__":
capacity = asyncio.run(check_manufacturing_capacity())
print("Manufacturing Capacity Overview:\n")
for status, count in capacity.items():
print(f" {status}: {count} orders")Automatically create manufacturing orders when sales orders are received.
import asyncio
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.manufacturing_order import make_to_order_manufacturing_order
from katana_public_api_client.models import MakeToOrderManufacturingOrderRequest
async def create_manufacturing_from_sales(sales_order_id: int) -> int | None:
"""
Automatically create manufacturing orders for a sales order.
Args:
sales_order_id: ID of the sales order
Returns:
Manufacturing order ID if created, None if failed
"""
async with KatanaClient() as client:
try:
request = MakeToOrderManufacturingOrderRequest(
sales_order_id=sales_order_id
)
response = await make_to_order_manufacturing_order.asyncio_detailed(
client=client,
body=request
)
if response.status_code == 201 and response.parsed:
mo_id = response.parsed.id
print(f"✓ Created manufacturing order {mo_id} for sales order {sales_order_id}")
return mo_id
else:
print(f"✗ Failed to create MO: {response.status_code}")
return None
except Exception as e:
print(f"✗ Error creating manufacturing order: {e}")
return None
# Usage
if __name__ == "__main__":
mo_id = asyncio.run(create_manufacturing_from_sales(sales_order_id=12345))
if mo_id:
print(f"Manufacturing order {mo_id} ready for production")Implement custom retry logic for application-level errors.
import asyncio
from typing import Any, TypeVar, Callable
import httpx
from katana_public_api_client import KatanaClient
T = TypeVar('T')
async def retry_with_backoff(
operation: Callable[[], T],
max_attempts: int = 3,
backoff_factor: float = 2.0,
) -> T:
"""
Retry an operation with exponential backoff.
Note: Network errors and 429/5xx are already handled by KatanaClient.
This is for application-level retries.
Args:
operation: Async function to retry
max_attempts: Maximum number of attempts
backoff_factor: Multiplier for delay between retries
Returns:
Result from operation
Raises:
Last exception if all retries fail
"""
last_exception = None
delay = 1.0
for attempt in range(max_attempts):
try:
return await operation()
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
await asyncio.sleep(delay)
delay *= backoff_factor
else:
print(f"All {max_attempts} attempts failed")
raise last_exception
# Usage example
async def create_order_with_retry(order_data: dict[str, Any]):
"""Create order with custom retry logic."""
async with KatanaClient() as client:
from katana_public_api_client.api.sales_order import create_sales_order
from katana_public_api_client.models import CreateSalesOrderRequest
async def create_op():
request = CreateSalesOrderRequest(**order_data)
response = await create_sales_order.asyncio_detailed(
client=client,
body=request
)
if response.status_code != 201:
raise ValueError(f"Failed to create order: {response.status_code}")
return response.parsed
return await retry_with_backoff(create_op, max_attempts=3)
if __name__ == "__main__":
order = {
"customer_id": 123,
"sales_order_rows": [
{"variant_id": 456, "quantity": 5, "price": 29.99}
]
}
result = asyncio.run(create_order_with_retry(order))
print(f"Order created: {result.id}")Handle API failures with fallback to cached data.
import asyncio
import json
from pathlib import Path
from typing import Any
import httpx
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.utils import unwrap_data
CACHE_FILE = Path("product_cache.json")
async def get_products_with_fallback() -> list[Any]:
"""
Get products with fallback to cached data on failure.
Returns:
List of products (from API or cache)
"""
try:
async with KatanaClient(timeout=10.0) as client:
response = await get_all_products.asyncio_detailed(client=client)
products = unwrap_data(response)
# Cache successful response
cache_data = [
{
"id": p.id,
"name": p.name,
"sku": p.sku,
}
for p in products
]
CACHE_FILE.write_text(json.dumps(cache_data, indent=2))
print(f"✓ Retrieved {len(products)} products from API")
return products
except (httpx.TimeoutException, httpx.NetworkError, Exception) as e:
print(f"⚠️ API request failed: {e}")
# Fallback to cache
if CACHE_FILE.exists():
cache_data = json.loads(CACHE_FILE.read_text())
print(f"✓ Using {len(cache_data)} cached products")
return cache_data
else:
print("✗ No cache available")
raise
# Usage
if __name__ == "__main__":
products = asyncio.run(get_products_with_fallback())
print(f"Have {len(products)} products available")Handle incoming webhook events from Katana.
from typing import Any
from flask import Flask, request, jsonify
import hmac
import hashlib
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.sales_order import get_sales_order
app = Flask(__name__)
WEBHOOK_SECRET = "your-webhook-secret" # From Katana webhook settings
def verify_webhook_signature(payload: bytes, signature: str) -> bool:
"""
Verify webhook signature from Katana.
Args:
payload: Raw request body bytes
signature: X-Katana-Signature header value
Returns:
True if signature is valid
"""
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
@app.route("/webhooks/katana", methods=["POST"])
def handle_katana_webhook():
"""Handle incoming webhooks from Katana."""
# Verify signature
signature = request.headers.get("X-Katana-Signature", "")
if not verify_webhook_signature(request.data, signature):
return jsonify({"error": "Invalid signature"}), 401
# Parse event
event = request.json
event_type = event.get("event_type")
print(f"Received webhook: {event_type}")
# Handle different event types
if event_type == "sales_order.created":
handle_sales_order_created(event)
elif event_type == "inventory.low_stock":
handle_low_stock_alert(event)
elif event_type == "manufacturing_order.completed":
handle_mo_completed(event)
return jsonify({"status": "received"}), 200
def handle_sales_order_created(event: dict[str, Any]) -> None:
"""Process new sales order event."""
order_id = event["data"]["id"]
# Fetch full order details from API
import asyncio
async def process_order():
async with KatanaClient() as client:
response = await get_sales_order.asyncio_detailed(
client=client,
id=order_id
)
if response.parsed:
order = response.parsed
print(f"Processing new order: {order.id}")
# Add your business logic here:
# - Send order confirmation email
# - Create manufacturing orders
# - Update external systems
asyncio.run(process_order())
def handle_low_stock_alert(event: dict[str, Any]) -> None:
"""Handle low stock alert."""
variant_id = event["data"]["variant_id"]
current_stock = event["data"]["in_stock"]
print(f"Low stock alert for variant {variant_id}: {current_stock} units")
# Add your logic:
# - Send alert to purchasing team
# - Automatically create purchase order
# - Update safety stock levels
def handle_mo_completed(event: dict[str, Any]) -> None:
"""Handle completed manufacturing order."""
mo_id = event["data"]["id"]
print(f"Manufacturing order {mo_id} completed")
# Add your logic:
# - Update production tracking system
# - Trigger quality inspection workflow
# - Send completion notification
if __name__ == "__main__":
app.run(port=8080, debug=True)Periodically sync data between Katana and external systems.
import asyncio
from datetime import datetime, timedelta
from typing import Any
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.sales_order import get_all_sales_orders
from katana_public_api_client.utils import unwrap_data
async def sync_recent_orders_to_external_system(hours_back: int = 24) -> dict[str, int]:
"""
Sync recent orders to external CRM/ERP system.
Args:
hours_back: How many hours back to sync
Returns:
Dict with sync statistics
"""
cutoff_time = datetime.now() - timedelta(hours=hours_back)
stats = {"synced": 0, "failed": 0, "skipped": 0}
async with KatanaClient() as client:
# Get recent orders
response = await get_all_sales_orders.asyncio_detailed(
client=client,
created_after=cutoff_time.isoformat()
)
orders = unwrap_data(response)
for order in orders:
try:
# Check if already synced
if hasattr(order, 'custom_fields'):
synced = any(
cf.get('name') == 'external_sync' and cf.get('value') == 'true'
for cf in (order.custom_fields or [])
)
if synced:
stats["skipped"] += 1
continue
# Sync to external system
await push_to_external_system({
"order_id": order.id,
"customer": order.customer_name if hasattr(order, 'customer_name') else None,
"total": order.total if hasattr(order, 'total') else 0,
"status": order.status,
})
# Mark as synced (you'd use update_sales_order API here)
stats["synced"] += 1
print(f"✓ Synced order {order.id}")
except Exception as e:
stats["failed"] += 1
print(f"✗ Failed to sync order {order.id}: {e}")
return stats
async def push_to_external_system(order_data: dict[str, Any]) -> None:
"""Push order to external system (implement your logic)."""
# Simulate external API call
await asyncio.sleep(0.1)
print(f" → Pushed order {order_data['order_id']} to external system")
async def run_scheduled_sync():
"""Run periodic sync (call this from cron/scheduler)."""
print(f"Starting scheduled sync at {datetime.now()}")
stats = await sync_recent_orders_to_external_system(hours_back=24)
print(f"Sync complete: {stats['synced']} synced, {stats['skipped']} skipped, {stats['failed']} failed")
# Usage with scheduler (e.g., APScheduler)
if __name__ == "__main__":
# Run once
asyncio.run(run_scheduled_sync())
# Or use with APScheduler:
# from apscheduler.schedulers.asyncio import AsyncIOScheduler
#
# scheduler = AsyncIOScheduler()
# scheduler.add_job(run_scheduled_sync, 'interval', hours=1)
# scheduler.start()
#
# asyncio.get_event_loop().run_forever()Configure comprehensive logging for production use.
import asyncio
import logging
import sys
from typing import Any
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.utils import unwrap_data
def setup_logging(level: int = logging.INFO) -> logging.Logger:
"""
Setup structured logging for production.
Args:
level: Logging level (logging.INFO, logging.DEBUG, etc.)
Returns:
Configured logger instance
"""
# Create custom logger
logger = logging.getLogger("katana_integration")
logger.setLevel(level)
# Console handler with detailed format
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(level)
# Structured format
formatter = logging.Formatter(
'{"timestamp": "%(asctime)s", "level": "%(levelname)s", '
'"module": "%(name)s", "message": "%(message)s"}'
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Also configure httpx logging for request details
httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(logging.WARNING) # Reduce noise
return logger
async def fetch_products_with_logging() -> list[Any]:
"""Fetch products with comprehensive logging."""
logger = setup_logging(level=logging.INFO)
logger.info("Starting product fetch operation")
try:
async with KatanaClient(logger=logger) as client:
logger.debug("KatanaClient initialized")
response = await get_all_products.asyncio_detailed(client=client)
products = unwrap_data(response)
logger.info(f"Successfully fetched {len(products)} products")
# Log sample data (be careful with PII)
if products:
logger.debug(f"First product: {products[0].name}")
return products
except Exception as e:
logger.error(f"Failed to fetch products: {e}", exc_info=True)
raise
# Usage
if __name__ == "__main__":
products = asyncio.run(fetch_products_with_logging())
print(f"Retrieved {len(products)} products")Track API performance metrics for monitoring.
import asyncio
import time
from collections import defaultdict
from contextlib import asynccontextmanager
from typing import Any
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.sales_order import get_all_sales_orders
from katana_public_api_client.utils import unwrap_data
class MetricsCollector:
"""Simple in-memory metrics collector."""
def __init__(self):
self.request_counts = defaultdict(int)
self.request_durations = defaultdict(list)
self.error_counts = defaultdict(int)
def record_request(self, endpoint: str, duration: float, success: bool):
"""Record API request metrics."""
self.request_counts[endpoint] += 1
self.request_durations[endpoint].append(duration)
if not success:
self.error_counts[endpoint] += 1
def get_stats(self) -> dict[str, Any]:
"""Get aggregated statistics."""
stats = {}
for endpoint, durations in self.request_durations.items():
count = len(durations)
avg_duration = sum(durations) / count if count > 0 else 0
max_duration = max(durations) if count > 0 else 0
errors = self.error_counts[endpoint]
stats[endpoint] = {
"total_requests": count,
"avg_duration_ms": round(avg_duration * 1000, 2),
"max_duration_ms": round(max_duration * 1000, 2),
"errors": errors,
"error_rate": round(errors / count * 100, 2) if count > 0 else 0,
}
return stats
def print_stats(self):
"""Print formatted statistics."""
print("\n" + "=" * 60)
print("API Performance Metrics")
print("=" * 60)
for endpoint, stats in self.get_stats().items():
print(f"\n{endpoint}:")
print(f" Total Requests: {stats['total_requests']}")
print(f" Avg Duration: {stats['avg_duration_ms']}ms")
print(f" Max Duration: {stats['max_duration_ms']}ms")
print(f" Errors: {stats['errors']} ({stats['error_rate']}%)")
# Global metrics instance
metrics = MetricsCollector()
@asynccontextmanager
async def track_request(endpoint: str):
"""Context manager to track request timing."""
start_time = time.time()
success = False
try:
yield
success = True
finally:
duration = time.time() - start_time
metrics.record_request(endpoint, duration, success)
async def fetch_orders_with_metrics():
"""Fetch orders with performance tracking."""
async with KatanaClient() as client:
async with track_request("get_all_sales_orders"):
response = await get_all_sales_orders.asyncio_detailed(client=client)
orders = unwrap_data(response)
print(f"Fetched {len(orders)} orders")
return orders
# Usage
if __name__ == "__main__":
# Make several requests
for _ in range(3):
asyncio.run(fetch_orders_with_metrics())
# Print metrics summary
metrics.print_stats()Integrate with OpenTelemetry for distributed tracing.
import asyncio
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.utils import unwrap_data
# Setup OpenTelemetry
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Export spans to console (use OTLP exporter in production)
span_processor = BatchSpanProcessor(ConsoleSpanExporter())
trace.get_tracer_provider().add_span_processor(span_processor)
# Instrument httpx (used by KatanaClient)
HTTPXClientInstrumentor().instrument()
async def fetch_products_with_tracing():
"""Fetch products with OpenTelemetry tracing."""
with tracer.start_as_current_span("fetch_products") as span:
span.set_attribute("operation", "product_sync")
async with KatanaClient() as client:
with tracer.start_as_current_span("api.get_all_products"):
response = await get_all_products.asyncio_detailed(client=client)
products = unwrap_data(response)
span.set_attribute("product.count", len(products))
print(f"Fetched {len(products)} products")
return products
# Usage
if __name__ == "__main__":
products = asyncio.run(fetch_products_with_tracing())
# In production, traces would be sent to your observability backend
# (Jaeger, Zipkin, Honeycomb, etc.)Handle large datasets efficiently with pagination control.
import asyncio
from typing import Any
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.sales_order import get_all_sales_orders
from katana_public_api_client.utils import unwrap_data
async def fetch_all_orders_efficiently(
page_size: int = 100,
max_pages: int | None = None
) -> list[Any]:
"""
Fetch all orders with efficient pagination.
Auto-pagination is ON by default for all GET requests. All pages are
collected automatically. Use `limit` to control page size and `max_pages`
on the client for safety limits.
To get a specific page instead, use an explicit `page` parameter.
Args:
page_size: Number of items per page (default 100, max 100)
max_pages: Maximum pages to fetch (None = unlimited)
Returns:
List of all orders
"""
async with KatanaClient(max_pages=max_pages or 1000) as client:
# Use limit parameter to control page size
response = await get_all_sales_orders.asyncio_detailed(
client=client,
limit=page_size # Fetch 100 items per page
)
orders = unwrap_data(response)
print(f"Fetched {len(orders)} orders total")
# Pagination info is available in response object
if hasattr(response, 'pagination_info'):
info = response.pagination_info
print(f"Pages: {info.get('page', 'N/A')}/{info.get('total_pages', 'N/A')}")
return orders
async def process_orders_in_batches(batch_size: int = 50):
"""
Process large order lists in batches to avoid memory issues.
Args:
batch_size: Number of orders to process at once
"""
async with KatanaClient() as client:
# Fetch all orders
response = await get_all_sales_orders.asyncio_detailed(
client=client,
limit=100
)
all_orders = unwrap_data(response)
# Process in batches
for i in range(0, len(all_orders), batch_size):
batch = all_orders[i:i + batch_size]
print(f"Processing batch {i // batch_size + 1}: {len(batch)} orders")
# Process batch (e.g., update external system)
for order in batch:
# Your processing logic here
pass
# Optional: small delay between batches
await asyncio.sleep(0.1)
# Usage
if __name__ == "__main__":
# Fetch efficiently with pagination control
orders = asyncio.run(fetch_all_orders_efficiently(page_size=100, max_pages=10))
# Or process in batches
asyncio.run(process_orders_in_batches(batch_size=50))Make multiple API requests concurrently for better performance.
import asyncio
from typing import Any
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_product
from katana_public_api_client.api.variant import get_variant
async def fetch_multiple_products_concurrent(product_ids: list[int]) -> list[Any]:
"""
Fetch multiple products concurrently.
Args:
product_ids: List of product IDs to fetch
Returns:
List of product objects
"""
async with KatanaClient() as client:
# Create tasks for concurrent requests
tasks = [
get_product.asyncio_detailed(client=client, id=product_id)
for product_id in product_ids
]
# Execute all requests concurrently
responses = await asyncio.gather(*tasks, return_exceptions=True)
# Extract successful results
products = []
for i, response in enumerate(responses):
if isinstance(response, Exception):
print(f"✗ Failed to fetch product {product_ids[i]}: {response}")
elif hasattr(response, 'parsed') and response.parsed:
products.append(response.parsed)
print(f"Successfully fetched {len(products)}/{len(product_ids)} products")
return products
async def fetch_variants_for_products(product_ids: list[int]) -> dict[int, list[Any]]:
"""
Fetch all variants for multiple products concurrently.
Args:
product_ids: List of product IDs
Returns:
Dict mapping product_id to list of variants
"""
from katana_public_api_client.api.variant import get_all_variants
from katana_public_api_client.utils import unwrap_data
async with KatanaClient() as client:
# Fetch all variants (this is more efficient than per-product requests)
response = await get_all_variants.asyncio_detailed(client=client)
all_variants = unwrap_data(response)
# Group by product_id
variants_by_product = {pid: [] for pid in product_ids}
for variant in all_variants:
if hasattr(variant, 'product_id') and variant.product_id in product_ids:
variants_by_product[variant.product_id].append(variant)
return variants_by_product
async def parallel_data_enrichment(order_ids: list[int]):
"""
Enrich multiple orders with related data using parallel requests.
Args:
order_ids: List of order IDs to enrich
"""
from katana_public_api_client.api.sales_order import get_sales_order
from katana_public_api_client.api.customer import get_all_customers
async with KatanaClient() as client:
# Fetch orders and customers concurrently
order_tasks = [
get_sales_order.asyncio_detailed(client=client, id=order_id)
for order_id in order_ids
]
# Fetch all customers once
customers_task = get_all_customers.asyncio_detailed(client=client)
# Wait for all to complete
results = await asyncio.gather(*order_tasks, customers_task)
orders = [r.parsed for r in results[:-1] if hasattr(r, 'parsed') and r.parsed]
customers_response = results[-1]
print(f"Fetched {len(orders)} orders with customer data")
# Usage
if __name__ == "__main__":
# Fetch multiple products concurrently (much faster than sequential)
product_ids = [1, 2, 3, 4, 5]
products = asyncio.run(fetch_multiple_products_concurrent(product_ids))
# Fetch variants efficiently
variants_map = asyncio.run(fetch_variants_for_products(product_ids))
for pid, variants in variants_map.items():
print(f"Product {pid}: {len(variants)} variants")Implement caching to reduce API calls for frequently accessed data.
import asyncio
from datetime import datetime, timedelta
from typing import Any, Optional
import json
from pathlib import Path
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.utils import unwrap_data
class SimpleCache:
"""Simple file-based cache with TTL."""
def __init__(self, cache_dir: str = ".cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _cache_path(self, key: str) -> Path:
"""Get cache file path for key."""
return self.cache_dir / f"{key}.json"
def get(self, key: str, ttl_seconds: int = 3600) -> Optional[Any]:
"""
Get cached value if not expired.
Args:
key: Cache key
ttl_seconds: Time to live in seconds
Returns:
Cached value or None if expired/missing
"""
cache_file = self._cache_path(key)
if not cache_file.exists():
return None
# Check if expired
cache_age = datetime.now().timestamp() - cache_file.stat().st_mtime
if cache_age > ttl_seconds:
cache_file.unlink() # Delete expired cache
return None
return json.loads(cache_file.read_text())
def set(self, key: str, value: Any):
"""Set cached value."""
cache_file = self._cache_path(key)
cache_file.write_text(json.dumps(value, default=str))
# Global cache instance
cache = SimpleCache()
async def get_products_cached(ttl_seconds: int = 3600) -> list[Any]:
"""
Get products with caching.
Args:
ttl_seconds: Cache TTL in seconds (default 1 hour)
Returns:
List of products (from cache or API)
"""
cache_key = "products_all"
# Try cache first
cached = cache.get(cache_key, ttl_seconds=ttl_seconds)
if cached:
print(f"✓ Using cached products ({len(cached)} items)")
return cached
# Fetch from API
print("Fetching products from API...")
async with KatanaClient() as client:
response = await get_all_products.asyncio_detailed(client=client)
products = unwrap_data(response)
# Serialize for caching
products_data = [
{
"id": p.id,
"name": p.name,
"sku": p.sku,
}
for p in products
]
# Cache the results
cache.set(cache_key, products_data)
print(f"✓ Fetched and cached {len(products)} products")
return products_data
async def get_product_by_sku_cached(sku: str, ttl_seconds: int = 3600) -> Optional[dict]:
"""
Get product by SKU with caching.
Args:
sku: Product SKU
ttl_seconds: Cache TTL in seconds
Returns:
Product data or None
"""
# Get all products (cached)
products = await get_products_cached(ttl_seconds=ttl_seconds)
# Find by SKU
for product in products:
if product.get("sku") == sku:
return product
return None
# Usage
if __name__ == "__main__":
# First call - fetches from API
products1 = asyncio.run(get_products_cached(ttl_seconds=300)) # 5 min cache
# Second call - uses cache (much faster)
products2 = asyncio.run(get_products_cached(ttl_seconds=300))
# Find specific product
product = asyncio.run(get_product_by_sku_cached("WDG-001"))
if product:
print(f"Found product: {product['name']}")Test your integration code without making real API calls.
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.models import Product
@pytest.mark.asyncio
async def test_product_fetch_with_mock():
"""Test product fetching with mocked API response."""
# Create mock products
mock_product = Product(
id=123,
name="Test Product",
sku="TEST-001",
)
# Mock the API response
mock_response = MagicMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.parsed = [mock_product]
# Patch the API function
with patch(
'katana_public_api_client.api.product.get_all_products.asyncio_detailed',
new=AsyncMock(return_value=mock_response)
):
async with KatanaClient(api_key="test-key", base_url="https://test.api") as client:
response = await get_all_products.asyncio_detailed(client=client)
assert response.status_code == 200
assert len(response.parsed) == 1
assert response.parsed[0].sku == "TEST-001"
@pytest.mark.asyncio
async def test_error_handling_with_mock():
"""Test error handling with mocked error response."""
# Mock an error response
with patch(
'katana_public_api_client.api.product.get_all_products.asyncio_detailed',
new=AsyncMock(side_effect=httpx.TimeoutException("Request timeout"))
):
async with KatanaClient(api_key="test-key", base_url="https://test.api") as client:
with pytest.raises(httpx.TimeoutException):
await get_all_products.asyncio_detailed(client=client)
@pytest.mark.asyncio
async def test_rate_limit_handling():
"""Test that rate limits are handled correctly."""
# Mock a 429 response followed by success
mock_429 = MagicMock(spec=httpx.Response)
mock_429.status_code = 429
mock_429.headers = {"Retry-After": "1"}
mock_200 = MagicMock(spec=httpx.Response)
mock_200.status_code = 200
mock_200.parsed = []
with patch(
'katana_public_api_client.api.product.get_all_products.asyncio_detailed',
new=AsyncMock(side_effect=[mock_429, mock_200])
):
async with KatanaClient(api_key="test-key", base_url="https://test.api") as client:
# Client should automatically retry after 429
response = await get_all_products.asyncio_detailed(client=client)
assert response.status_code == 200Set up integration tests with real API (for CI/CD).
import asyncio
import os
import pytest
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
from katana_public_api_client.utils import unwrap_data
# Skip integration tests if no API key
pytestmark = pytest.mark.skipif(
not os.getenv("KATANA_API_KEY"),
reason="KATANA_API_KEY not set"
)
@pytest.fixture
async def katana_client():
"""Fixture providing KatanaClient for integration tests."""
async with KatanaClient() as client:
yield client
@pytest.mark.integration
@pytest.mark.asyncio
async def test_fetch_products_integration(katana_client):
"""Integration test for fetching products."""
response = await get_all_products.asyncio_detailed(client=katana_client)
# Check response structure
assert response.status_code == 200
products = unwrap_data(response)
assert isinstance(products, list)
# Validate data structure if products exist
if products:
product = products[0]
assert hasattr(product, 'id')
assert hasattr(product, 'name')
@pytest.mark.integration
@pytest.mark.asyncio
async def test_pagination_integration(katana_client):
"""Integration test for pagination behavior."""
# Fetch with small page size
response = await get_all_products.asyncio_detailed(
client=katana_client,
limit=10 # Small page size
)
products = unwrap_data(response)
# If there are more than 10 products, pagination should work
# The client automatically fetches all pages
assert isinstance(products, list)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_retry_on_network_error(katana_client):
"""Test that network errors are retried."""
# This test would need a way to simulate network errors
# For real integration tests, you might use network fault injection
pass
# Run integration tests with:
# pytest -m integration tests/import asyncio
import pytest
from typing import Any
from unittest.mock import AsyncMock, MagicMock
from katana_public_api_client import KatanaClient
class TestHelpers:
"""Helper utilities for testing Katana integrations."""
@staticmethod
def create_mock_client() -> KatanaClient:
"""Create a mock KatanaClient for testing."""
return KatanaClient(
api_key="test-api-key",
base_url="https://test.api.katanamrp.com/v1"
)
@staticmethod
def create_mock_response(status_code: int, data: Any = None) -> MagicMock:
"""
Create a mock API response.
Args:
status_code: HTTP status code
data: Response data
Returns:
Mock response object
"""
mock_response = MagicMock()
mock_response.status_code = status_code
mock_response.parsed = data
return mock_response
@staticmethod
async def assert_async_raises(exception_class, coroutine):
"""Assert that async function raises specific exception."""
with pytest.raises(exception_class):
await coroutine
# Example usage in tests
@pytest.mark.asyncio
async def test_with_helpers():
"""Example test using helper utilities."""
client = TestHelpers.create_mock_client()
assert client._base_url == "https://test.api.katanamrp.com/v1"
# Create mock response
response = TestHelpers.create_mock_response(200, data=[])
assert response.status_code == 200- Check the API Documentation
- Review Client Guide for client features
- See examples directory for runnable code samples
- Open an issue on GitHub for questions or feedback
Last Updated: 2025-01-21