5
5
import celery
6
6
import celery_aio_pool
7
7
8
- from backend .app .task .conf import task_settings
9
8
from backend .core .conf import settings
10
9
11
10
__all__ = ['celery_app' ]
12
11
13
12
14
13
def get_broker_url () -> str :
15
14
"""获取消息代理 URL"""
16
- if task_settings .CELERY_BROKER == 'redis' :
15
+ if settings .CELERY_BROKER == 'redis' :
17
16
return (
18
17
f'redis://:{ settings .REDIS_PASSWORD } @{ settings .REDIS_HOST } :'
19
- f'{ settings .REDIS_PORT } /{ task_settings .CELERY_BROKER_REDIS_DATABASE } '
18
+ f'{ settings .REDIS_PORT } /{ settings .CELERY_BROKER_REDIS_DATABASE } '
20
19
)
21
20
return (
22
- f'amqp://{ task_settings . RABBITMQ_USERNAME } :{ task_settings . RABBITMQ_PASSWORD } @'
23
- f'{ task_settings . RABBITMQ_HOST } :{ task_settings . RABBITMQ_PORT } '
21
+ f'amqp://{ settings . CELERY_RABBITMQ_USERNAME } :{ settings . CELERY_RABBITMQ_PASSWORD } @'
22
+ f'{ settings . CELERY_RABBITMQ_HOST } :{ settings . CELERY_RABBITMQ_PORT } '
24
23
)
25
24
26
25
27
26
def get_result_backend () -> str :
28
27
"""获取结果后端 URL"""
29
28
return (
30
29
f'redis://:{ settings .REDIS_PASSWORD } @{ settings .REDIS_HOST } :'
31
- f'{ settings .REDIS_PORT } /{ task_settings .CELERY_BACKEND_REDIS_DATABASE } '
30
+ f'{ settings .REDIS_PORT } /{ settings .CELERY_BACKEND_REDIS_DATABASE } '
32
31
)
33
32
34
33
35
34
def get_result_backend_transport_options () -> dict [str , Any ]:
36
35
"""获取结果后端传输选项"""
37
36
return {
38
- 'global_keyprefix' : task_settings .CELERY_BACKEND_REDIS_PREFIX ,
37
+ 'global_keyprefix' : settings .CELERY_BACKEND_REDIS_PREFIX ,
39
38
'retry_policy' : {
40
- 'timeout' : task_settings .CELERY_BACKEND_REDIS_TIMEOUT ,
39
+ 'timeout' : settings .CELERY_BACKEND_REDIS_TIMEOUT ,
41
40
},
42
41
}
43
42
@@ -55,7 +54,7 @@ def init_celery() -> celery.Celery:
55
54
'fba_celery' ,
56
55
enable_utc = False ,
57
56
timezone = settings .DATETIME_TIMEZONE ,
58
- beat_schedule = task_settings .CELERY_SCHEDULE ,
57
+ beat_schedule = settings .CELERY_SCHEDULE ,
59
58
broker_url = get_broker_url (),
60
59
broker_connection_retry_on_startup = True ,
61
60
result_backend = get_result_backend (),
@@ -65,7 +64,7 @@ def init_celery() -> celery.Celery:
65
64
)
66
65
67
66
# 自动发现任务
68
- app .autodiscover_tasks (task_settings .CELERY_TASK_PACKAGES )
67
+ app .autodiscover_tasks (settings .CELERY_TASK_PACKAGES )
69
68
70
69
return app
71
70
0 commit comments