Skip to content

Commit 4743aff

Browse files
authored
Improve Redis/Celery reliability to prevent task loss (#1853)
- Add visibility_timeout (12h) to prevent tasks from being redelivered while still running - Enable task_acks_late and task_reject_on_worker_lost to ensure tasks are requeued if a worker crashes - Set worker_prefetch_multiplier=1 to prevent task hoarding - Add broker_heartbeat for faster connection problem detection - Configure socket timeouts and retry_on_timeout for Redis connections - Add exponential backoff retry (3 retries) for Redis client - Add health_check_interval for periodic connection validation - Add configurable task time limits (24h hard, 23h soft) - Fix task_track_started from tuple (True,) to boolean True New environment variables for configuration: - CELERY_VISIBILITY_TIMEOUT, CELERY_BROKER_HEARTBEAT - CELERY_TASK_TIME_LIMIT, CELERY_TASK_SOFT_TIME_LIMIT - CELERY_BROKER_POOL_LIMIT, CELERY_REDIS_MAX_CONNECTIONS - REDIS_SOCKET_TIMEOUT, REDIS_SOCKET_CONNECT_TIMEOUT - REDIS_HEALTH_CHECK_INTERVAL AI-assisted: Claude Code Signed-off-by: Christian Berendt <[email protected]>
1 parent 6e1e262 commit 4743aff

File tree

3 files changed

+65
-3
lines changed

3 files changed

+65
-3
lines changed

osism/settings.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ def read_secret(secret_name):
2121
REDIS_HOST: str = os.getenv("REDIS_HOST", "redis")
2222
REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379"))
2323
REDIS_DB: int = int(os.getenv("REDIS_DB", "0"))
24+
REDIS_SOCKET_TIMEOUT: int = int(os.getenv("REDIS_SOCKET_TIMEOUT", "30"))
25+
REDIS_SOCKET_CONNECT_TIMEOUT: int = int(os.getenv("REDIS_SOCKET_CONNECT_TIMEOUT", "30"))
26+
REDIS_HEALTH_CHECK_INTERVAL: int = int(os.getenv("REDIS_HEALTH_CHECK_INTERVAL", "10"))
2427

2528

2629
NETBOX_URL = os.getenv("NETBOX_API", os.getenv("NETBOX_URL"))

osism/tasks/__init__.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,63 @@
1818

1919

2020
class Config:
21+
# Basic settings
2122
broker_connection_retry_on_startup = True
2223
enable_utc = True
2324
enable_ironic = os.environ.get("ENABLE_IRONIC", "True")
24-
broker_url = "redis://redis"
25-
result_backend = "redis://redis"
25+
26+
# Redis connection settings from environment
27+
_redis_host = os.environ.get("REDIS_HOST", "redis")
28+
_redis_port = os.environ.get("REDIS_PORT", "6379")
29+
_redis_db = os.environ.get("REDIS_DB", "0")
30+
31+
broker_url = f"redis://{_redis_host}:{_redis_port}/{_redis_db}"
32+
result_backend = f"redis://{_redis_host}:{_redis_port}/{_redis_db}"
33+
34+
# Connection pool settings
35+
broker_pool_limit = int(os.environ.get("CELERY_BROKER_POOL_LIMIT", "10"))
36+
redis_max_connections = int(os.environ.get("CELERY_REDIS_MAX_CONNECTIONS", "20"))
37+
38+
# Redis transport options for reliability
39+
broker_transport_options = {
40+
"visibility_timeout": int(
41+
os.environ.get("CELERY_VISIBILITY_TIMEOUT", "43200")
42+
), # 12 hours
43+
"socket_timeout": 30,
44+
"socket_connect_timeout": 30,
45+
"retry_on_timeout": True,
46+
"health_check_interval": 10,
47+
}
48+
49+
# Result backend transport options
50+
result_backend_transport_options = {
51+
"socket_timeout": 30,
52+
"socket_connect_timeout": 30,
53+
"retry_on_timeout": True,
54+
}
55+
56+
# Task acknowledgement settings - acknowledge after completion
57+
task_acks_late = True
58+
task_reject_on_worker_lost = True
59+
60+
# Worker settings - prevent task hoarding
61+
worker_prefetch_multiplier = 1
62+
63+
# Heartbeat for connection detection
64+
broker_heartbeat = int(os.environ.get("CELERY_BROKER_HEARTBEAT", "10"))
65+
66+
# Task time limits (can be overridden per-task)
67+
task_time_limit = int(
68+
os.environ.get("CELERY_TASK_TIME_LIMIT", "86400")
69+
) # 24h hard limit
70+
task_soft_time_limit = int(
71+
os.environ.get("CELERY_TASK_SOFT_TIME_LIMIT", "82800")
72+
) # 23h soft limit
73+
74+
# Queue settings
2675
task_create_missing_queues = True
2776
task_default_queue = "default"
28-
task_track_started = (True,)
77+
task_track_started = True
2978
task_routes = {
3079
"osism.tasks.ansible.*": {"queue": "osism-ansible"},
3180
"osism.tasks.ceph.*": {"queue": "ceph-ansible"},

osism/utils/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import pynetbox
1414
from pottery import Redlock
1515
from redis import Redis
16+
from redis.backoff import ExponentialBackoff
17+
from redis.retry import Retry
1618
import requests
1719
from requests.adapters import HTTPAdapter
1820
import urllib3
@@ -188,11 +190,19 @@ def get_netbox_connection(
188190
return nb
189191

190192

193+
# Redis retry configuration with exponential backoff
194+
_redis_retry = Retry(ExponentialBackoff(), retries=3)
195+
191196
redis = Redis(
192197
host=settings.REDIS_HOST,
193198
port=settings.REDIS_PORT,
194199
db=settings.REDIS_DB,
195200
socket_keepalive=True,
201+
socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
202+
socket_connect_timeout=settings.REDIS_SOCKET_CONNECT_TIMEOUT,
203+
retry_on_timeout=True,
204+
retry=_redis_retry,
205+
health_check_interval=settings.REDIS_HEALTH_CHECK_INTERVAL,
196206
)
197207
redis.ping()
198208

0 commit comments

Comments
 (0)