Skip to content

Commit 4f8c9bc

Browse files
committed
feat: 静态阈值策略 access-detect 合并处理优化 #1010158081129865566
1 parent 92b718e commit 4f8c9bc

File tree

3 files changed

+499
-1
lines changed

3 files changed

+499
-1
lines changed

bkmonitor/alarm_backends/service/access/data/processor.py

Lines changed: 189 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,13 +711,201 @@ def _limit_records_by_time_points(self, records: list) -> tuple[list, int | None
711711

712712
return limited_records, last_time_point
713713

714+
def _is_all_static_threshold(self, item: Item) -> bool:
715+
"""
716+
判断 Item 的所有检测算法是否均为静态阈值。
717+
718+
静态阈值算法的特点:
719+
- 纯计算:只需要当前数据点的值,不依赖历史数据
720+
- 无外部依赖:不需要调用外部服务
721+
- 计算简单:只是简单的数值比较
722+
723+
Args:
724+
item: 策略项
725+
726+
Returns:
727+
bool: True 表示所有算法都是静态阈值
728+
"""
729+
for algorithm in item.algorithms:
730+
# Threshold 算法类型
731+
if algorithm.get("type") != "Threshold":
732+
return False
733+
return True
734+
735+
def _can_merge_access_detect(self) -> bool:
736+
"""
737+
判断当前策略组是否可以进行 access-detect 合并处理。
738+
739+
条件:
740+
1. 配置开关启用
741+
2. 策略的所有检测算法均为静态阈值
742+
3. 如果配置了灰度列表,策略必须在灰度列表中
743+
744+
Returns:
745+
bool: True 表示可以合并处理
746+
"""
747+
# 检查开关
748+
if not settings.ACCESS_DETECT_MERGE_ENABLED:
749+
return False
750+
751+
# 检查是否有策略项
752+
if not self.items:
753+
return False
754+
755+
# 检查灰度列表
756+
merge_strategy_ids = getattr(settings, "ACCESS_DETECT_MERGE_STRATEGY_IDS", [])
757+
if merge_strategy_ids:
758+
# 如果配置了灰度列表,检查策略是否在列表中
759+
for item in self.items:
760+
if item.strategy.id not in merge_strategy_ids:
761+
return False
762+
763+
# 检查所有 Item 的算法类型
764+
for item in self.items:
765+
if not self._is_all_static_threshold(item):
766+
return False
767+
768+
return True
769+
770+
def _detect_and_push_abnormal(self, output_client=None):
771+
"""
772+
在 access 模块直接执行静态阈值检测,并推送异常数据。
773+
774+
优化方案:复用 DetectProcess,避免重复实现检测逻辑。
775+
通过 pull_data(item, inputs=data_points) 直接传入数据,
776+
自动获得所有监控指标(延迟统计、大延迟告警、double_check 等)。
777+
778+
流程:
779+
1. 去重和优先级检查(复用 access 原有逻辑)
780+
2. 创建 DetectProcess 实例
781+
3. gen_strategy_snapshot 生成策略快照
782+
4. pull_data(item, inputs=data_points) 直接传入数据
783+
5. handle_data 执行检测
784+
6. double_check 二次确认(自动获得)
785+
7. push_data 推送异常数据(自动获得所有监控指标)
786+
8. 推送无数据检测数据
787+
9. 推送降噪数据
788+
10. 上报 detect 模块指标(DETECT_PROCESS_TIME/COUNT)
789+
790+
Args:
791+
output_client: Redis 客户端(可选)
792+
"""
793+
from alarm_backends.service.detect import DataPoint
794+
from alarm_backends.service.detect.process import DetectProcess
795+
796+
# 记录检测开始时间,用于上报 DETECT_PROCESS_TIME
797+
detect_start_time = time.time()
798+
exc = None
799+
800+
# 去除重复数据(复用原有逻辑)
801+
records: list[DataRecord] = [record for record in self.record_list if not record.is_duplicate]
802+
803+
# 优先级检查(复用原有逻辑)
804+
PriorityChecker.check_records(records)
805+
806+
strategy_id = self.items[0].strategy.id
807+
808+
try:
809+
# 创建 DetectProcess 实例,复用成熟的检测逻辑
810+
detect_process = DetectProcess(strategy_id)
811+
812+
# 生成策略快照(与 detect 模块保持一致)
813+
detect_process.strategy.gen_strategy_snapshot()
814+
815+
for item in self.items:
816+
# 转换数据格式:将 DataRecord 转换为 DataPoint
817+
# 过滤条件与原有 push 逻辑一致:is_retains 且非 inhibitions
818+
data_points = []
819+
valid_records = []
820+
for record in records:
821+
if record.is_retains.get(item.id) and not record.inhibitions.get(item.id):
822+
try:
823+
data_point = DataPoint(record.data, item)
824+
data_points.append(data_point)
825+
valid_records.append(record)
826+
except ValueError as e:
827+
logger.warning(
828+
f"[access-detect-merge] strategy({strategy_id}) item({item.id}) "
829+
f"failed to create DataPoint: {e}"
830+
)
831+
832+
# 使用 DetectProcess 复用检测逻辑
833+
# pull_data 支持直接传入数据,不需要从 Redis 拉取
834+
detect_process.pull_data(item, inputs=data_points)
835+
detect_process.handle_data(item)
836+
837+
# 二次确认(自动获得)
838+
try:
839+
detect_process.double_check(item)
840+
except Exception:
841+
logger.exception(
842+
"[access-detect-merge] strategy(%s) 二次确认时发生异常,不影响告警主流程", strategy_id
843+
)
844+
845+
# 推送无数据检测数据(如果启用)
846+
# 无数据检测需要知道有哪些维度有数据上报,用于判断哪些维度无数据
847+
if item.no_data_config.get("is_enabled"):
848+
self._push(item, records, output_client, key.NO_DATA_LIST_KEY)
849+
850+
# 推送降噪数据
851+
if valid_records:
852+
try:
853+
self._push_noise_data(item, valid_records)
854+
except Exception as e:
855+
logger.exception(f"[access-detect-merge] push noise data of strategy({strategy_id}) error: {e}")
856+
857+
# 推送异常数据(自动获得所有监控指标:延迟统计、大延迟告警、PROCESS_OVER_FLOW 等)
858+
detect_process.push_data()
859+
860+
except Exception as e:
861+
exc = e
862+
raise
863+
864+
finally:
865+
# 上报 detect 模块指标,保持监控连续性
866+
# 即使合并处理跳过了 detect 异步任务,也需要上报这些指标
867+
detect_end_time = time.time()
868+
869+
# DETECT_PROCESS_TIME: 检测处理耗时
870+
metrics.DETECT_PROCESS_TIME.labels(strategy_id=metrics.TOTAL_TAG).observe(
871+
detect_end_time - detect_start_time
872+
)
873+
874+
# DETECT_PROCESS_COUNT: 检测处理次数(包含成功/失败状态)
875+
metrics.DETECT_PROCESS_COUNT.labels(
876+
strategy_id=metrics.TOTAL_TAG,
877+
status=metrics.StatusEnum.from_exc(exc),
878+
exception=exc,
879+
).inc()
880+
881+
# 日志记录
882+
logger.info(
883+
f"[access-detect-merge] strategy_group_key({self.strategy_group_key}) "
884+
f"strategy({strategy_id}) merged processing completed, "
885+
f"records: {len(records)}, detect_time: {detect_end_time - detect_start_time:.3f}s"
886+
)
887+
888+
# 指标上报:数据处理计数
889+
# 复用 ACCESS_PROCESS_PUSH_DATA_COUNT 指标,与原有 push 流程保持一致
890+
metrics.ACCESS_PROCESS_PUSH_DATA_COUNT.labels(
891+
strategy_id=metrics.TOTAL_TAG,
892+
type="data",
893+
).inc(len(records))
894+
714895
def push(self, records: list = None, output_client=None):
715896
# 方案 B:限制处理的时间点数量(在处理阶段限制,不影响查询)
716897
# 这样可以控制推送到下游 detect 模块的数据量
717898
limited_records, last_time_point = self._limit_records_by_time_points(self.record_list)
718899
self.record_list = limited_records
719900

720-
super().push(records=records, output_client=output_client)
901+
# 判断是否可以合并处理(access-detect 合并)
902+
# 当策略的所有检测算法均为静态阈值时,直接在 access 模块执行检测
903+
if self._can_merge_access_detect():
904+
# 直接在 access 中执行检测并推送异常数据
905+
self._detect_and_push_abnormal()
906+
else:
907+
# 走原有流程:推送到 Redis 队列,由 detect 异步任务处理
908+
super().push(records=records, output_client=output_client)
721909

722910
checkpoint = Checkpoint(self.strategy_group_key)
723911
checkpoint_timestamp = checkpoint.get()

0 commit comments

Comments
 (0)