1616_celery_app = None
1717
1818
19+ SOCKET_TIMEOUT = 5
20+ SOCKET_CONNECT_TIMEOUT = 5
21+
22+
1923def create_celery_app (_app = None ):
2024 """Create the Celery app, using the Flask app in _app"""
2125 global task , _celery_app
@@ -67,6 +71,9 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
6771 app = Celery (
6872 "ibutsu_server" ,
6973 broker = _app .config .get ("CELERY_BROKER_URL" ),
74+ broker_connection_retry = True ,
75+ broker_connection_retry_on_startup = True ,
76+ worker_cancel_long_running_tasks_on_connection_loss = True ,
7077 include = [
7178 "ibutsu_server.tasks.db" ,
7279 "ibutsu_server.tasks.importers" ,
@@ -76,6 +83,13 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
7683 "ibutsu_server.tasks.runs" ,
7784 ],
7885 )
86+ app .conf .redis_socket_timeout = SOCKET_TIMEOUT
87+ app .conf .redis_socket_connect_timeout = SOCKET_CONNECT_TIMEOUT
88+ app .conf .redis_retry_on_timeout = True
89+ app .conf .broker_transport_options = app .conf .result_backend_transport_options = {
90+ "socket_timeout" : SOCKET_TIMEOUT ,
91+ "socket_connect_timeout" : SOCKET_CONNECT_TIMEOUT ,
92+ }
7993 app .conf .result_backend = _app .config .get ("CELERY_RESULT_BACKEND" )
8094 app .Task = IbutsuTask
8195 # Shortcut for the decorator
@@ -120,7 +134,11 @@ def retry_task_on_exception(*args, **kwargs):
120134def lock (name , timeout = LOCK_EXPIRE , app = None ):
121135 if not app :
122136 app = current_app
123- redis_client = Redis .from_url (app .config ["CELERY_BROKER_URL" ])
137+ redis_client = Redis .from_url (
138+ app .config ["CELERY_BROKER_URL" ],
139+ socket_timeout = SOCKET_TIMEOUT ,
140+ socket_connect_timeout = SOCKET_CONNECT_TIMEOUT ,
141+ )
124142 try :
125143 # Get a lock so that we don't run this task concurrently
126144 logging .info (f"Trying to get a lock for { name } " )
0 commit comments