forked from ZhuLinsen/daily_stock_analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscheduler.py
More file actions
184 lines (144 loc) · 5.35 KB
/
Copy pathscheduler.py
File metadata and controls
184 lines (144 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# -*- coding: utf-8 -*-
"""
===================================
定时调度模块
===================================
职责:
1. 支持每日定时执行股票分析
2. 支持定时执行大盘复盘
3. 优雅处理信号,确保可靠退出
依赖:
- schedule: 轻量级定时任务库
"""
import logging
import signal
import sys
import time
import threading
from datetime import datetime
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class GracefulShutdown:
"""
优雅退出处理器
捕获 SIGTERM/SIGINT 信号,确保任务完成后再退出
"""
def __init__(self):
self.shutdown_requested = False
self._lock = threading.Lock()
# 注册信号处理器
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""信号处理函数"""
with self._lock:
if not self.shutdown_requested:
logger.info(f"收到退出信号 ({signum}),等待当前任务完成...")
self.shutdown_requested = True
@property
def should_shutdown(self) -> bool:
"""检查是否应该退出"""
with self._lock:
return self.shutdown_requested
class Scheduler:
"""
定时任务调度器
基于 schedule 库实现,支持:
- 每日定时执行
- 启动时立即执行
- 优雅退出
"""
def __init__(self, schedule_time: str = "18:00"):
"""
初始化调度器
Args:
schedule_time: 每日执行时间,格式 "HH:MM"
"""
try:
import schedule
self.schedule = schedule
except ImportError:
logger.error("schedule 库未安装,请执行: pip install schedule")
raise ImportError("请安装 schedule 库: pip install schedule")
self.schedule_time = schedule_time
self.shutdown_handler = GracefulShutdown()
self._task_callback: Optional[Callable] = None
self._running = False
def set_daily_task(self, task: Callable, run_immediately: bool = True):
"""
设置每日定时任务
Args:
task: 要执行的任务函数(无参数)
run_immediately: 是否在设置后立即执行一次
"""
self._task_callback = task
# 设置每日定时任务
self.schedule.every().day.at(self.schedule_time).do(self._safe_run_task)
logger.info(f"已设置每日定时任务,执行时间: {self.schedule_time}")
if run_immediately:
logger.info("立即执行一次任务...")
self._safe_run_task()
def _safe_run_task(self):
"""安全执行任务(带异常捕获)"""
if self._task_callback is None:
return
try:
logger.info("=" * 50)
logger.info(f"定时任务开始执行 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("=" * 50)
self._task_callback()
logger.info(f"定时任务执行完成 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
except Exception as e:
logger.exception(f"定时任务执行失败: {e}")
def run(self):
"""
运行调度器主循环
阻塞运行,直到收到退出信号
"""
self._running = True
logger.info("调度器开始运行...")
logger.info(f"下次执行时间: {self._get_next_run_time()}")
while self._running and not self.shutdown_handler.should_shutdown:
self.schedule.run_pending()
time.sleep(30) # 每30秒检查一次
# 每小时打印一次心跳
if datetime.now().minute == 0 and datetime.now().second < 30:
logger.info(f"调度器运行中... 下次执行: {self._get_next_run_time()}")
logger.info("调度器已停止")
def _get_next_run_time(self) -> str:
"""获取下次执行时间"""
jobs = self.schedule.get_jobs()
if jobs:
next_run = min(job.next_run for job in jobs)
return next_run.strftime('%Y-%m-%d %H:%M:%S')
return "未设置"
def stop(self):
"""停止调度器"""
self._running = False
def run_with_schedule(
task: Callable,
schedule_time: str = "18:00",
run_immediately: bool = True
):
"""
便捷函数:使用定时调度运行任务
Args:
task: 要执行的任务函数
schedule_time: 每日执行时间
run_immediately: 是否立即执行一次
"""
scheduler = Scheduler(schedule_time=schedule_time)
scheduler.set_daily_task(task, run_immediately=run_immediately)
scheduler.run()
if __name__ == "__main__":
# 测试定时调度
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
)
def test_task():
print(f"任务执行中... {datetime.now()}")
time.sleep(2)
print("任务完成!")
print("启动测试调度器(按 Ctrl+C 退出)")
run_with_schedule(test_task, schedule_time="23:59", run_immediately=True)