Status: β
Production-Ready
Implementation: Complete worker system with Kubernetes integration
Coverage: Job queues, worker lifecycle, async offloading, scaling
Enable horizontal scaling and asynchronous processing of provider operations. Offload long-running tasks to backend worker pods while API server returns immediately with job tracking IDs.
The Worker Role system transforms synchronous provider operations into asynchronous, scalable jobs:
Client Request
β
API Server ββ Provider Logic (blocking for 5+ seconds) ββ Response
β
Client waits for completion
Client Request
β
API Server ββ Submit to Queue (instant) ββ Response with job_id
β
Worker Pod (processes asynchronously)
β
Result stored for polling
β
Horizontal Scaling β Scale worker pods independently from API
β
Async Processing β Client gets immediate response with job_id
β
Load Distribution β Multiple workers share operation workload
β
Resilience β Failed jobs captured in dead-letter queue
β
Kubernetes Native β Full RBAC, health checks, Prometheus metrics
β
Production Ready β Used in ITL ControlPlane core infrastructure
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β API Server (Main) β
β - Receives HTTP requests β
β - Validates input β
β - Submits jobs to queue β
β - Returns job_id to client for tracking β
ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
β Submit Job
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RabbitMQ Job Queue (Message Broker) β
β - provider.jobs: Queue for pending jobs β
β - provider.results: Queue for job results β
β - provider.jobs.dlq: Dead-letter queue for failures β
ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
β Consume Job
βΌ
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Worker Pod #1 Worker Pod #2 Worker Pod #3 β
β ββββββββββββββββββββββ ββββββββββββββββββββββ β
β β ProviderWorker β β ProviderWorker β β
β β - Process jobs β β - Process jobs β β
β β - Execute ops β β - Execute ops β β
β β - Store results β β - Store results β β
β ββββββββββββββββββββββ ββββββββββββββββββββββ β
ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
β Publish Result
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Result Store (Redis/Cache) β
β (Clients poll this for job completion) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
| Component | Purpose | Implementation |
|---|---|---|
| JobQueue | Reliable job distribution | RabbitMQ-based message broker |
| WorkerRole | Abstract base class | Lifecycle + processing interface |
| ProviderWorker | Concrete worker | Consumes jobs, executes operations |
| OffloadingProviderRegistry | Async job submission | Replaces sync registry for offloading |
| WorkerRegistry | Worker fleet management | Tracks active workers and health |
# Development (1 worker)
helm install core-provider ./charts \
-f charts/values-dev.yaml
# Production (5 workers with HA)
helm install core-provider ./charts \
-f charts/values-prod.yamlfrom itl_controlplane_sdk.workers import OffloadingProviderRegistry, JobQueue
from fastapi import FastAPI
app = FastAPI()
job_queue = JobQueue(rabbitmq_url="amqp://guest:guest@rabbitmq/")
registry = OffloadingProviderRegistry(job_queue)
@app.post("/resources")
async def create_resource(request: ResourceRequest):
# Submit to workers, get immediate response
response = await registry.create_or_update_resource(
"ITL.Core", "ResourceGroup", request
)
# Client gets job_id to track progress
return {
"job_id": response.job_id,
"status": "pending",
"message": "Operation submitted to worker queue"
}
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
# Poll for result
result = await registry.get_job_result(job_id)
return result or {"status": "pending"}# Check workers are running
kubectl get pods -l app.kubernetes.io/component=worker
# View worker logs
kubectl logs -f deployment/core-provider-worker
# Check health
kubectl exec <worker-pod> -- curl http://localhost:8001/health
# See RabbitMQ management UI
kubectl port-forward svc/rabbitmq 15672:15672
# Visit http://localhost:15672Handles message distribution and job lifecycle via RabbitMQ.
job_queue = JobQueue(
rabbitmq_url="amqp://guest:guest@localhost/"
)job_id = await job_queue.submit_job(
provider_namespace="ITL.Core",
resource_type="ResourceGroup",
operation="create",
request=request,
priority=7 # 0-10, higher = more urgent
)Returns: Job ID (string)
result = await job_queue.get_result(
job_id="abc-123",
timeout=30.0 # Wait up to 30 seconds
)
# Returns JobResult or None if timeoutstats = await job_queue.get_queue_stats()
# {
# "connected": true,
# "queues": {
# "provider.jobs": {"message_count": 5, "consumer_count": 3},
# "provider.jobs.dlq": {"message_count": 0}
# }
# }Base class for implementing worker processes.
class WorkerRole:
# Lifecycle
async def start() # Start worker
async def stop() # Stop gracefully
# Job processing
async def process_job( # Process single job
job_id: str,
provider_namespace: str,
resource_type: str,
operation: str,
request: ResourceRequest
) -> Dict[str, Any]
# Status
def get_status() -> Dict # Get worker statusConcrete worker implementation for provider operations.
worker = ProviderWorker(
worker_id="worker-1",
provider_registry=registry, # ResourceProviderRegistry
job_queue=job_queue # JobQueue instance
)await worker.start()
await worker.start_consuming_jobs() # Blocks, consuming jobsProvider registry that submits jobs instead of executing directly.
registry = OffloadingProviderRegistry(job_queue)# Create/Update
response = await registry.create_or_update_resource(
"ITL.Core", "ResourceGroup", request
)
# Returns: ResourceResponse with job_id (status="pending")
# Get
response = await registry.get_resource(
"ITL.Core", "ResourceGroup", request
)
# Delete
response = await registry.delete_resource(
"ITL.Core", "ResourceGroup", request
)
# List
response = await registry.list_resources(
"ITL.Core", "ResourceGroup", request
)
# Action
response = await registry.execute_action(
"ITL.Core", "ResourceGroup", request
)result = await registry.get_job_result(
job_id="abc-123",
timeout=30.0 # Wait timeout in seconds
)Blocking variant that waits for job completion.
registry = SyncOffloadingProviderRegistry(
job_queue=job_queue,
default_timeout=30.0
)
# Waits for job to complete
response = await registry.create_or_update_resource_sync(
"ITL.Core", "ResourceGroup", request,
timeout=60 # Override default
)
# Returns completed ResourceResponseManages multiple worker instances and fleet.
registry = WorkerRegistry()
# Register worker
registry.register_worker(worker)
# Get status
status = registry.get_registry_status()
# {
# "total_workers": 3,
# "active_workers": 3,
# "total_jobs_processed": 1542,
# "total_jobs_failed": 3
# }
# Start/stop all
await registry.start_all_workers()
await registry.stop_all_workers()workers:
enabled: true
replicaCount: 1
logLevel: DEBUG
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 250m
memory: 256Miworkers:
enabled: true
replicaCount: 5
logLevel: INFO
jobTimeout: 600
maxConcurrentJobs: 10
resources:
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 1Gi
podDisruptionBudget:
enabled: true
minAvailable: 3
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- topologyKey: kubernetes.io/hostnameworkers:
enabled: true
replicaCount: 20
logLevel: WARNING
maxConcurrentJobs: 10
resources:
limits:
cpu: 2000m
memory: 2Gi
autoscaling:
enabled: true
minReplicas: 10
maxReplicas: 50
targetCPUUtilizationPercentage: 70Worker Process:
WORKER_ID=worker-1 # Auto-generated if not set
RABBITMQ_URL=amqp://guest:guest@localhost/ # RabbitMQ URL
LOG_LEVEL=INFO # DEBUG|INFO|WARNING|ERROR
HEALTH_CHECK_PORT=8001 # Health check endpoint
API Server:
OFFLOADING_ENABLED=true # Enable worker offloading
RABBITMQ_URL=amqp://guest:guest@localhost/ # RabbitMQ URL
JOB_TIMEOUT=300 # Default job timeout (seconds)
| File | Purpose |
|---|---|
worker-deployment.yaml |
Worker Pod Deployment |
worker-serviceaccount.yaml |
ServiceAccount for RBAC |
worker-clusterrole.yaml |
ClusterRole (minimal permissions) |
worker-clusterrolebinding.yaml |
ClusterRoleBinding binding |
worker-configmap.yaml |
Worker configuration |
worker-pdb.yaml |
PodDisruptionBudget for HA |
# Development
helm install core-provider ./charts \
-f charts/values-dev.yaml
# Production
helm install core-provider ./charts \
-f charts/values-prod.yaml \
--set workers.replicaCount=5
# Custom configuration
helm install core-provider ./charts \
--set workers.enabled=true \
--set workers.replicaCount=10 \
--set workers.resources.limits.memory=2Gi# Check workers are running
kubectl get pods -l app.kubernetes.io/component=worker
# Check logs
kubectl logs -f deployment/core-provider-worker
# Check health
kubectl exec <worker-pod> -- curl http://localhost:8001/health
# {"status": "healthy", "worker_id": "worker-1"}Each worker exposes health check endpoints:
| Endpoint | Purpose | Response |
|---|---|---|
GET /health |
Liveness probe | {"status": "healthy"} |
GET /status |
Detailed status | Worker metrics JSON |
GET /metrics |
Prometheus metrics | Metrics in Prometheus format |
GET /status
{
"worker_id": "worker-1",
"worker_type": "provider",
"is_running": true,
"jobs_processed": 1542,
"jobs_failed": 3,
"uptime_seconds": 3600,
"started_at": "2024-02-10T12:00:00Z"
}worker_jobs_processed{worker_id="worker-1"} 1542
worker_jobs_failed{worker_id="worker-1"} 3
worker_uptime_seconds{worker_id="worker-1"} 3600
worker_running{worker_id="worker-1"} 1
livenessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 10
periodSeconds: 5# Port-forward to RabbitMQ management UI
kubectl port-forward svc/rabbitmq 15672:15672
# Visit http://localhost:15672
# Username: guest
# Password: guest
# Check queue depths, consumer counts, message rates# Scale to 10 workers
kubectl scale deployment core-provider-worker --replicas=10
# Auto-scaling (via HPA)
kubectl autoscale deployment core-provider-worker \
--min=3 --max=20 --cpu-percent=70workers:
resources:
requests:
cpu: 2000m # Allocate for CPU-bound operations
memory: 2Gi # Allocate for memory-intensive ops
limits:
cpu: 4000m
memory: 4Gi# Urgent jobs (priority 9)
await queue.submit_job(..., priority=9)
# Normal jobs (priority 5)
await queue.submit_job(..., priority=5)
# Background jobs (priority 1)
await queue.submit_job(..., priority=1)| Workload | Workers | Max Jobs | Timeout | Resources |
|---|---|---|---|---|
| Light (1K jobs/day) | 1-2 | 5 | 300s | 256Mi mem |
| Medium (10K jobs/day) | 5-10 | 10 | 300s | 512Mi mem |
| Heavy (100K jobs/day) | 20+ | 20 | 600s | 2Gi mem |
| Long-running | 3-5 | 1 | 3600s | 1Gi mem |
Failed jobs are automatically moved to DLQ for analysis:
provider.jobs β [Processing]
βββ SUCCESS β β Result stored
βββ FAILURE β provider.jobs.dlq
# Get queue statistics
stats = await job_queue.get_queue_stats()
dlq_count = stats['queues']['provider.jobs.dlq']['message_count']
if dlq_count > 0:
# Investigate failure cause
logger.error(f"Jobs in DLQ: {dlq_count}")class JobStatus(str, Enum):
PENDING = "pending" # Waiting to be processed
PROCESSING = "processing" # Currently being processed
COMPLETED = "completed" # Successfully completed
FAILED = "failed" # Failed, moved to DLQ
CANCELLED = "cancelled" # Explicitly cancelled@dataclass
class JobResult:
job_id: str
status: JobStatus
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
timestamp: datetime = Noneclass CustomWorker(ProviderWorker):
async def _process_job_impl(self, job_id, ...):
try:
result = await self._execute_provider_operation(...)
return result
except ValueError as e:
# Invalid request - don't retry
logger.warning(f"Invalid request {job_id}: {e}")
raise
except Exception as e:
# Unexpected error - will retry via DLQ
logger.error(f"Job failed {job_id}: {e}")
raiserules:
- apiGroups: [""]
resources: ["configmaps", "secrets"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get"]# Create secret with credentials
kubectl create secret generic core-provider-rabbitmq \
--from-literal=url="amqp://user:password@rabbitmq:5672/"
# Reference in Helm
workers:
rabbitmq:
secretName: core-provider-rabbitmqworkers:
securityContext:
runAsNonRoot: true
runAsUser: 1000
readOnlyRootFilesystem: trueProblem: Workers deployed but not processing jobs
Solution:
# 1. Check RabbitMQ is running
kubectl get svc rabbitmq
# 2. Check worker logs
kubectl logs deployment/core-provider-worker
# 3. Verify RABBITMQ_URL
kubectl get deployment -o yaml | grep RABBITMQ_URL
# 4. Test RabbitMQ connectivity
kubectl exec <worker-pod> -- \
python -c "from aio_pika import connect; connect('amqp://...')"Problem: Jobs submitted but never processed
Solution:
# 1. Check worker count
kubectl get pods -l app.kubernetes.io/component=worker
# 2. Check worker health
kubectl exec <pod> -- curl http://localhost:8001/health
# 3. Check queue stats
kubectl exec <pod> -- curl http://localhost:8001/queue-stats
# 4. Scale up workers
kubectl scale deployment core-provider-worker --replicas=10Problem: Many jobs in dead-letter queue
Solution:
# 1. Check worker logs
kubectl logs deployment/core-provider-worker | grep failed
# 2. Check resource usage
kubectl top pods -l app.kubernetes.io/component=worker
# 3. Increase resources
helm upgrade core-provider ./charts \
--set workers.resources.limits.memory=2Gi
# 4. Inspect DLQ messages via RabbitMQ UI
kubectl port-forward svc/rabbitmq 15672:15672Problem: Workers pod keeps restarting
Solution:
# 1. Check logs
kubectl logs <pod-name>
# 2. Check resource limits
kubectl describe pod <pod-name>
# 3. Check RabbitMQ connection
kubectl exec <pod> -- env | grep RABBITMQ
# 4. Increase memory limit
helm upgrade --set workers.resources.limits.memory=1Gifrom fastapi import FastAPI
from itl_controlplane_sdk.workers import OffloadingProviderRegistry, JobQueue
app = FastAPI()
job_queue = JobQueue(rabbitmq_url="amqp://guest:guest@rabbitmq/")
registry = OffloadingProviderRegistry(job_queue)
@app.post("/resources")
async def create_resource(request: ResourceRequest):
response = await registry.create_or_update_resource(
"ITL.Core", "ResourceGroup", request
)
return {
"job_id": response.job_id,
"status": "pending",
"location": f"/jobs/{response.job_id}"
}
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
result = await registry.get_job_result(job_id, timeout=0)
if result:
return result
return {"status": "pending", "job_id": job_id}import asyncio
from itl_controlplane_sdk.workers import ProviderWorker, JobQueue, WorkerRegistry
from itl_controlplane_sdk.providers import ResourceProviderRegistry
async def main():
# Create queue
job_queue = JobQueue(rabbitmq_url="amqp://guest:guest@rabbitmq/")
# Create provider registry
provider_registry = ResourceProviderRegistry()
provider_registry.register_provider("ITL.Core", "ResourceGroup", core_provider)
# Create worker registry
worker_registry = WorkerRegistry()
# Start 5 workers
for i in range(5):
worker = ProviderWorker(
worker_id=f"worker-{i}",
provider_registry=provider_registry,
job_queue=job_queue
)
worker_registry.register_worker(worker)
asyncio.create_task(worker.start_consuming_jobs())
# Monitor
status = worker_registry.get_registry_status()
print(f"Started {status['total_workers']} workers")
# Keep running
await asyncio.sleep(float('inf'))
asyncio.run(main())from itl_controlplane_sdk.workers import SyncOffloadingProviderRegistry, JobQueue
registry = SyncOffloadingProviderRegistry(
job_queue=job_queue,
default_timeout=30.0
)
# Waits for job to complete
try:
response = await registry.create_or_update_resource_sync(
"ITL.Core", "ResourceGroup", request,
timeout=60
)
print(f"Created: {response.id}")
except asyncio.TimeoutError:
print("Job timed out after 60 seconds")# Get operations (high priority, low latency SLA)
await queue.submit_job(..., priority=8)
# Create operations (normal priority)
await queue.submit_job(..., priority=5)
# Background jobs (low priority)
await queue.submit_job(..., priority=2)# Short timeout for synchronous operations
result = await queue.get_result(job_id, timeout=10)
# Longer timeout for background operations
result = await queue.get_result(job_id, timeout=300)# Monitor queue health
stats = await queue.get_queue_stats()
if stats['queues']['provider.jobs.dlq']['message_count'] > 0:
# Investigate failures
logger.error("Jobs in dead-letter queue")try:
await worker.start()
finally:
await worker.stop()
await job_queue.disconnect()- 08-API_ENDPOINTS.md - FastAPI integration
- 06-HANDLER_MIXINS.md - Handler patterns
- 07-LOCATION_VALIDATION.md - Validation
- 23-BEST_PRACTICES.md - Best practices
pip install aio-pika # Required for JobQueuefrom itl_controlplane_sdk.workers import JobQueue
queue = JobQueue(rabbitmq_url="amqp://guest:guest@rabbitmq/")job_id = await queue.submit_job(
provider_namespace="ITL.Core",
resource_type="ResourceGroup",
operation="create",
request=request,
priority=5
)result = await queue.get_result(job_id, timeout=30)from itl_controlplane_sdk.workers import OffloadingProviderRegistry
registry = OffloadingProviderRegistry(queue)response = await registry.create_or_update_resource(
"ITL.Core", "ResourceGroup", request
)
return {"job_id": response.job_id, "status": "pending"}Document Version: 1.0 (Consolidated from 4 docs)
Last Updated: February 14, 2026
Status: β
Production-Ready