- 事件驱动模式(Event-Driven Architecture / Event-Driven Pattern)是一种常见的软件设计模式,核心思想是系统中的各个组件通过“事件(Event)”进行解耦通信。一个组件(事件源,Event Producer)产生事件,另一个或多个组件(事件消费者,Event Consumer)监听并处理这些事件,彼此之间无需直接调用或依赖。
- 事件(Event) 一种状态变化或行为发生的消息,例如“用户点击按钮”“订单已创建”。
- 事件生产者(Producer) 负责发布事件,不关心事件由谁处理。
- 事件通道(Event Bus / Event Queue) 用于传输事件,可以是内存消息队列、消息中间件(如 Kafka、RabbitMQ)、甚至简单的回调列表。
- 事件消费者(Consumer) 监听事件并做出响应,可能有一个或多个。
| 类型 | 特点 | 示例场景 |
|---|---|---|
| 1. 同步事件驱动(观察者模式) | 事件立即触发回调函数 | GUI 中按钮点击监听 |
| 2. 异步事件驱动(消息队列架构) | 事件发送后进入队列,消费者异步处理 | 微服务架构中订单创建后异步发消息到库存系统 |
[Producer] --(Event)--> [Event Bus] --(Dispatch)--> [Consumer A]
└--> [Consumer B]
| 实现模式 | 属于哪一层 | 示例 |
|---|---|---|
| 回调函数 | 代码语法层 | JS 中 button.onClick = fn |
| 观察者模式 | 设计模式层 | Java Swing、C#事件委托 |
| 发布-订阅(Pub/Sub) | 应用架构层 | Node.js EventEmitter |
| 消息队列(MQ) | 分布式系统层 | Kafka / RabbitMQ |
// 类图内容
- UI 交互:用户界面中的按钮点击、表单提交等,天然是事件。
- 微服务架构:服务之间通过发布事件进行通信,实现最终一致性,避免紧耦合。
- IoT (物联网):传感器数据(如温度变化)作为事件上报,触发相应处理。
- 复杂的业务流程:一个业务动作(如“订单支付成功”)需要触发多个下游动作(通知仓库、更新积分、发送邮件),使用事件可以轻松解耦和扩展。
✅ 高解耦:生产者不需要知道消费者是谁,反之亦然。 ✅ 可扩展性强:可以随时新增消费者来增加新功能,而无需修改现有生产者。 ✅ 异步处理:对于耗时操作,可以将其放入队列异步处理,提高系统响应速度和吞吐量。 ✅ 灵活性和弹性:系统各部分可以独立部署和伸缩。
❌ 调试困难:事件链条很长时,很难追踪一个请求的完整处理过程。 ❌ 一致性挑战:在分布式系统中,保证最终一致性比强一致性更复杂,需要处理失败、重试、补偿等问题。 ❌ 顺序与幂等性:事件可能重复或乱序抵达,消费者需要设计成幂等的,并能处理乱序情况。
好的,我结合你熟悉的 Odoo 18 销售订单流程,用一个**“发货中订单 → 客户申请换货 → 自动触发处理逻辑”** 的场景,演示一个事件驱动模式在 Odoo 中的实现方式。
- 销售订单(sale.order)处于 "发货中" 状态(state='delivery')
- 客户提交 "换货申请"(例如通过网页表单、API 回调、客服按钮)
- 我们不在提交处直接写业务逻辑,而是发布一个事件
- 系统中可以有多个 "事件消费者"(handler) 监听这个事件,比如:
- 监听器1:创建售后单(
after.sale.service) - 监听器2:通知仓库暂缓发货
- 监听器3:若 VIP 用户,直接自动批准
- 监听器1:创建售后单(
| 角色 | 行为 | 示例 |
|---|---|---|
| 1. 事件生产者 (Producer) | 触发动作但不处理业务 | apply_exchange_request() |
| 2. 事件总线 (Event Bus) | 转发事件 | publish() |
| 3. 事件消费者 (Consumer) | 监听并处理 | handle_exchange_request() |
Producer (SaleOrder.apply_exchange_request) → EventEngine.with_delay() → queue_job (入队) → Worker 执行作业 → EventEngine.process_event() → Strategy Rule 匹配规则 → 调用 Handler (同步或再次 with_delay()) → 记录执行日志 → 结束/重试
一个完整的事件处理流程,其实是由两部分共同定义的:
1. 处理器 (Handler):定义了“做什么”(What/How)
这正是你所指出的 handle_* 方法。它们是具体的“动作实现”。你可以把 SaleOrderEventHandler
这个类想象成一个“工具箱”,里面放着各种能解决特定问题的工具(handle_exchange_vip 是一个工具,handle_exchange_standard 是另一个工具)。
2. 规则 (Rules):定义了“何时”与“是否”做(When/If)
这部分是由我们在数据库中配置的 strategy.rule 记录定义的。它扮演着“决策大脑”的角色。它会规定:
* “当” 听到 'exchange_requested' 这个事件时...
* “并且如果” 满足 order.partner_id.is_vip == True 这个条件...
* “那么” 就去工具箱里拿出 handle_exchange_vip 这个工具来使用。
所以,一个更完整的描述是:
EventEngine 引擎就像一个总指挥,它不自己干活。当事件发生时,它首先去查阅“决策大脑”(strategy.rule),“大脑”告诉它在当前情况下应该使用哪个“工具”。然后,总
指挥才会命令那个“工具”(handle_* 方法)去执行具体的任务。
结论:
* 处理器 (Handler) 定义了业务逻辑的“执行”。
* 规则 (Rules) 定义了业务逻辑的“决策”。
这个“决策与执行相分离”的设计,正是我们这个框架灵活性的根源。它允许我们在不修改“工具”的情况下,仅仅通过改变“决策”方式,就能适应新的业务需求。
flowchart TD
A1["客户UI操作或API调用"] --> B1["SaleOrder.apply_exchange_request()"]
B1 --> C1["调用 EventEngine.with_delay().process_event()"]
C1 --> D1["创建 queue_job 并入队"]
D1 --> E1["后台 Worker 执行作业"]
E1 --> F1["EventEngine.process_event()"]
F1 --> G1["StrategyRule 匹配规则"]
G1 --> H1{"同步 or 异步?"}
H1 -->|同步| I1["直接执行 Handler 方法"]
H1 -->|异步| J1["调用 Handler.with_delay()._run_handler()"]
I1 --> K1["执行最终业务逻辑"]
J1 --> L1["创建另一个 queue_job"]
L1 --> K1
K1 --> M1["记录日志并结束"]
flowchart TD
R1["EventEngine.process_event()"] --> R2["加载所有 strategy.rule (按 event_name + priority 排序)"]
R2 --> R3{"遍历每一条 Rule"}
R3 -->|condition_type = domain| R4["safe_eval(domain) → 比对 order 是否命中"]
R3 -->|condition_type = py| R5["safe_eval(expression) → 返回 True/False"]
R4 --> R6{"匹配成功?"}
R5 --> R6
R6 -->|是| R7["加入命中规则列表"]
R6 -->|否| R3
R7 --> R8{"rule.stop_on_success = True ?"}
R8 -->|是| R9["立即终止匹配流程"]
R8 -->|否| R3
R9 --> R10{"执行 Action"}
R10 -->|async_mode=True| R11["调度后台作业 (queue_job)"]
R10 -->|async_mode=False| R12["直接调用 handler 方法"]
R11 --> R13["执行日志 rule.execution.log.create(...)"]
R12 --> R13
R13 --> R14["返回处理结果"]
在 sale.order 模型中添加入口方法,它负责发布事件,并使用 queue_job 实现异步处理。
import json
import logging
from odoo import api, fields, models, _
from odoo.tools.safe_eval import safe_eval
_logger = logging.getLogger(__name__)
class SaleOrder(models.Model):
_inherit = 'sale.order'
need_manual_review = fields.Boolean(string="Need Manual Review", copy=False)
def apply_exchange_request(self, reason=None, by_uid=None):
"""Public method to trigger the exchange request event.
This will trigger the Event Engine asynchronously using queue_job.
"""
self.ensure_one()
payload = {
'order_id': self.id,
'reason': reason or '',
'channel': self.channel if hasattr(self, 'channel') else False,
'user_id': by_uid or self.env.uid,
}
# Use queue_job to process the event asynchronously
self.env['event.engine'].with_delay().process_event('exchange_requested', payload)
return True这个模型现在被大大简化,主要用于同步场景的演示,核心的异步分发逻辑已移至 EventEngine。
class EventBus(models.AbstractModel):
_name = 'event.bus'
_description = 'Simple In-Memory Event Bus'
def publish(self, event_name, payload):
"""
This method is now simplified. The main logic is in EventEngine.
It evaluates rules and dispatches to the correct handler.
This could be used for simple, synchronous, non-rule-based events if needed.
"""
rule_engine = self.env['strategy.rule']
handler_path = rule_engine.evaluate(event_name, payload)
if handler_path:
model_name, method = handler_path.rsplit('.', 1)
handler = self.env[model_name]
getattr(handler, method)(payload)
else:
# A fallback or logging could be implemented here
_logger.info(f"No handler path found for event {event_name}")这些是实际执行业务逻辑的方法。我们还添加了一个 _run_handler 的辅助方法,以便 queue_job 可以动态调用。
class SaleOrderEventHandler(models.AbstractModel):
_name = 'sale.order.event.handler'
_description = 'Handler for sale order events'
def _get_order(self, payload):
order = None
if payload.get('order'):
maybe = payload.get('order')
if isinstance(maybe, int):
order = self.env['sale.order'].browse(maybe)
elif isinstance(maybe, dict) and maybe.get('order_id'):
order = self.env['sale.order'].browse(maybe.get('order_id'))
else:
order = maybe if isinstance(maybe, models.BaseModel) else None
elif payload.get('order_id'):
order = self.env['sale.order'].browse(payload.get('order_id'))
return order
def handle_exchange_standard(self, payload):
order = self._get_order(payload)
if not order:
_logger.warning("handle_exchange_standard: order not found in payload %s", payload)
return
# ensure idempotency by checking existing after sale
exist = self.env['after.sale.service'].search([('sale_order_id', '=', order.id), ('type', '=', 'exchange')], limit=1)
if exist:
order.message_post(body=f"换货申请已存在 by rule. skip. exist id={exist.id}")
return
after = self.env['after.sale.service'].create({
'sale_order_id': order.id,
'type': 'exchange',
'reason': payload.get('reason', '由规则自动创建'),
})
order.message_post(body=f"📦 标准换货:已创建售后单 {after.id}")
def handle_exchange_vip(self, payload):
order = self._get_order(payload)
if not order:
_logger.warning("handle_exchange_vip: order not found")
return
exist = self.env['after.sale.service'].search([('sale_order_id', '=', order.id), ('type', '=', 'exchange')], limit=1)
if exist:
order.message_post(body=f"VIP 换货已存在 {exist.id}")
return
after = self.env['after.sale.service'].create({
'sale_order_id': order.id,
'type': 'exchange',
'state': 'approved', # depends on your after.sale.service model
'reason': payload.get('reason', 'VIP 自动批准'),
})
order.message_post(body="🌟 VIP 用户:已自动批准并创建售后单")
def handle_exchange_manual(self, payload):
order = self._get_order(payload)
if not order:
_logger.warning("handle_exchange_manual: order not found")
return
order.write({'need_manual_review': True})
order.message_post(body="⚠️ 订单已转人工审核(特殊渠道)")
def _run_handler(self, method_name, payload):
"""Helper to be called by queue_job to run a dynamic method."""
handler_method = getattr(self, method_name, None)
if handler_method:
handler_method(payload)
else:
_logger.error(f"_run_handler could not find method {method_name} on {self._name}")这是我们架构的核心。它负责接收事件,匹配规则,并根据规则是同步还是异步(async_mode)来直接执行或通过 queue_job 调度执行。
class EventEngine(models.Model):
_name = "event.engine"
_description = "Event Engine - evaluate rules and dispatch handler"
def process_event(self, event_name, payload):
"""
Receives an event and its payload, finds matching rules,
and executes their actions either synchronously or asynchronously via queue_job.
This method is designed to be called by `with_delay()`.
"""
if isinstance(payload, str):
payload = json.loads(payload)
rules = self.env['strategy.rule']._get_rules_for_event(event_name, set_code='AFTER_SALE')
executed_rules = []
for rule in rules:
try:
matched = rule.evaluate_conditions(payload)
self.env['rule.execution.log'].create({
'event_name': event_name,
'rule_id': rule.id,
'payload': json.dumps(payload, default=str),
'matched': bool(matched),
})
if not matched:
continue
for action in rule.action_ids:
try:
model, method = action.get_handler()
if action.async_mode:
# Asynchronous call using queue_job
handler_model = self.env[model.name]
handler_model.with_delay()._run_handler(method.__name__, payload)
else:
# Synchronous call
getattr(model, method)(payload)
self.env['rule.execution.log'].create({
'event_name': event_name,
'rule_id': rule.id,
'action_id': action.id,
'payload': json.dumps(payload, default=str),
'action_executed': True,
})
except Exception as e:
_logger.exception("Handler execution failed for action %s: %s", action.id, e)
# To ensure queue_job retries, we should re-raise the exception
raise
executed_rules.append(rule.id)
if rule.stop_on_success:
break
except Exception as e:
_logger.exception("Rule processing error for rule %s: %s", rule.id, e)
# Continue to the next rule
continue
return executed_rules
- 代码说明:
逻辑非常纯粹和固定,就像一个工作流程的“总调度官”。我们可以把它分解为以下几个步骤:
1. 接收任务:方法被调用,接收两个核心参数:event_name (事件名,一个字符串) 和 payload (事件的详细信息,一个字典)。
2. 查找操作手册:它根据传入的 event_name,去数据库的 strategy.rule 模型中,查找所有订阅了这个事件的“规则”,并按照 priority(优先级)排序。
3. 逐条评估规则:它开始遍历这些规则。对每一条规则,它会执行该规则的 condition(条件),检查当前的 payload 是否满足此规则的触发条件。
4. 决策与分发:
* 如果不满足条件,就跳过这条规则,继续评估下一条。
* 如果满足条件,它就会获取这条规则关联的 action(动作)。
5. 执行动作:对于每一个满足条件的动作,它会检查 async_mode 标志:
* 如果 async_mode 为 False(同步),它就立即、直接调用 handler_path 指向的那个 Python 方法。
* 如果 async_mode 为 True(异步),它就不直接调用,而是再次利用 queue_job 的 with_delay(),把这个 handler_path
指向的方法作为一个新的、独立的后台作业放入队列,然后就立刻返回,不等待其执行结果。
6. 记录与控制:在整个过程中,它会向 rule.execution.log 中写入详细的日志(哪条规则被评估了、是否命中、动作是否被执行等)。同时,它也会遵守规则上的
stop_on_success 标志,如果某条规则执行成功后需要停止,它就不会再评估后续的低优先级规则。
2. 它解决了哪些问题?
这个“总调度官”的角色,完美地解决了几个软件设计中的经典难题:
* 问题一:紧耦合。它解决了“事件发布者”和“事件消费者”之间的紧耦合问题。SaleOrder 模型只管发布事件,它完全不知道谁会处理、如何处理。EventEngine
就像一个中间的“中介”,隔离了双方。
* 问题二:硬编码的业务逻辑。如果没有 EventEngine,我们可能就得在 apply_exchange_request 方法里写一长串 if/else 来判断:if 用户是VIP then 调用A方法 else
调用B方法。这种代码非常僵化,每次需求变更都要修改代码并重新部署。EventEngine 通过把这些 if/else 逻辑变成了数据库里的“规则数据”,解决了这个问题。
* 问题三:动态扩展性。因为业务逻辑变成了数据库里的数据,我们就可以在系统运行时,通过修改数据来动态地改变系统行为,而无需修改代码。这就是我们之前讨论的,
增加一个“VIP自动批准”功能,只需要增加一条新规则即可。
3. 是否可以处理所有事件?后续有新事件还需要重写吗?
这是最关键的一点:EventEngine.process_event() 是完全通用的,它可以处理任何事件,未来有任何新事件,都完全不需要修改这个方法。
这正是我们称它为“引擎”或“运行器”的原因。它的逻辑是业务无关的,它只关心“根据事件名查找规则、评估条件、执行动作”这个固定的流程。
你可以把 EventEngine 想象成一个万能的自动化助理。你不需要为每个新任务都雇一个新助理。你只需要给同一个助理一本新的“操作手册”(规则),告诉他:“当听到这个
新的暗号(event_name)时,按照手册里的这些新步骤(handler_path)去执行”。
所以,当我们未来要增加一个新的“退货”事件时,我们要做的是:
1. 定义新的入口:在 sale.order 中增加 apply_return_request() 方法,发布新的事件名 'return_requested'。
2. 定义新的处理器:在 sale.order.event.handler 中增加 handle_return_standard() 等具体处理退货业务的方法。
3. 编写新的操作手册:在数据库中(通过XML或界面)创建新的 strategy.rule 记录,把 'return_requested' 事件和 handle_return_standard 方法关联起来。
你看,整个过程中,EventEngine 作为我们最核心的引擎,一行代码都不需要动。它天然地支持了未来的所有新事件。
class RuleSet(models.Model):
_name = "rule.set"
_description = "Rule Set / Policy"
name = fields.Char(required=True)
code = fields.Char()
description = fields.Text()class RuleCondition(models.Model):
_name = "rule.condition"
condition_type = fields.Selection([('domain', 'Domain on Record'),
('python', 'Python Expression'),
], string="Condition Type",
default='domain', required=True )
rule_id = fields.Many2one('strategy.rule')
field_path = fields.Char() # 'order.partner_id.is_vip'
operator = fields.Selection([('=', '='), ('!=', '!='), ('>', '>'), ('in', 'in'), ...])
value = fields.Char() # 'True', 'special', '1000'
logic = fields.Selection([('AND','AND'),('OR','OR')]) # for multiple condition grouping
python_expression = fields.Text(string="Python Expression",
help="An expression to be evaluated. Use 'record' to access the main record and 'payload' to access the event data.")
- RuleCondition 功能解析
真实场景:
- 一个事件的处理可能有:店铺、平台、产品、客户、金额、营销活动... 等规则配置对象。
- 每个对象都会有:会有各种各样的判断条件,这些都是决定业务逻辑的关键因素。
解决方案:
- 结构化、模块化的条件定义
- RuleCondition 模型设计意图就是为了应对这个挑战。
这样有什么好处?
一个复杂的判断条件,现在被拆解成了一组结构化的数据记录。例如,“订单金额大于1000元,且客户是VIP”这个规则,就变成了 RuleEngine 关联的两条 RuleCondition
记录:
* 记录1: field_path='order.amount_total', operator='>', value='1000'
* 记录2: field_path='order.partner_id.is_vip', operator='=', value='True'
RuleEngine 的 evaluate_conditions 方法也需要重写,不再是简单地 safe_eval 一个字符串,而是遍历 condition_ids,动态地构建出一个 Odoo Domain
来进行查询和判断。
最终,这实现了最重要的一点:规则的界面化配置。 业务人员可以通过下拉框选择字段、选择操作符、填写数值,来自己组合和创建业务规则,而完全不需要写代码。
class RuleAction(models.Model):
_name = "rule.action"
_description = "Rule Action / Handler"
rule_id = fields.Many2one('strategy.rule', required=True, ondelete='cascade')
name = fields.Char()
handler_path = fields.Char(required=True, help="model_path.method_name, e.g. sale.order.event.handler.handle_exchange_vip")
async_mode = fields.Boolean(default=False, help="If checked, the handler will be executed asynchronously in a background job.")
params = fields.Text(help="JSON params for handler", default='{}')
def get_handler(self):
"""Return (model, method) for handler_path or raise ValueError."""
if not self.handler_path or '.' not in self.handler_path:
raise ValueError("Bad handler_path for RuleAction %s" % self.id)
model_name, method_name = self.handler_path.rsplit('.', 1)
model = self.env[model_name]
method = getattr(model, method_name, None)
if not method:
raise ValueError(f"Method {method_name} not found on model {model_name}")
return model, method- RuleAction 解释
RuleAction 的核心目的就是为了配置,它在“决策大脑”(Rule)和“工具箱”(Handler)之间建立了一座清晰的桥梁。这个设计让我们的“决策”和“执行”分离得更加彻底,也更加强大。
RuleAction 是映射 SaleOrderEventHandler 中具体的方法,方便配置
这背后有两个关键的设计考量,也正是 RuleAction 的另外两个重要作用:
1. 实现“一个规则触发多个动作” (一对多关系)
这是最主要的原因。在真实的业务场景中,满足一个条件后,我们往往需要做一连串的事情,而不是仅仅一件事。
举个例子:
当一个“VIP客户”申请换货时(rule_vip_exchange 规则被命中),我们希望系统自动完成三个动作:
1. 动作一:自动批准售后单 (handle_exchange_vip)。
2. 动作二:给这位客户的专属客户经理发送一条通知 (handle_notify_account_manager)。
3. 动作三:给客户的账户增加50个积分,作为VIP关怀 (handle_add_bonus_points)。
如果 handler_path 是直接定义在 StrategyRule 上的,那么一条规则就只能对应一个动作。而通过引入 RuleAction,我们就建立了一个“一对多”的关系:
* 一个 StrategyRule 记录,可以关联多个 RuleAction 记录。
* 每个 RuleAction 记录代表一个具体的动作。
这样,我们就可以灵活地编排一个动作序列,实现复杂的组合业务,而这一切都只是通过配置完成的。
2. 实现对“每个动作”的精细化控制
RuleAction 不仅仅是指定了要调用哪个方法,它还携带了这个动作本身的“属性”,比如我们设计的 async_mode 字段。
这让我们可以对上面例子中的三个动作进行差异化处理:
1. 动作一(批准售后单):我们希望它同步执行,因为这是核心业务,需要立即完成。所以这个 RuleAction 的 async_mode 会设为 False。
2. 动作二(通知客户经理):这个动作相对次要,可以稍微延迟,我们可以让它同步执行,也可以异步。
3. 动作三(增加积分):调用积分系统可能是个外部接口,比较耗时,我们不希望它阻塞主流程。所以我们可以把这个 RuleAction 的 async_mode 设为
True,让它作为一个异步后台作业去执行。
RuleAction 让我们能够对同一个规则触发的一系列动作,进行独立的、精细化的配置。这是直接在 StrategyRule 上加字段无法实现的。
总结
所以,RuleAction 的作用远不止是“方便配置的映射”,它更是我们这个框架实现组合性 (Composability) 和 灵活性 (Flexibility) 的关键所在。
* `StrategyRule` 回答了 “IF... THEN...” 的问题 (如果条件满足,就去做...)。
* `RuleAction` 则具体定义了 “THEN” 后面要做的动作清单,以及每个动作的执行方式。
class StrategyRule(models.Model):
_name = "strategy.rule"
_description = "Strategy Rule"
name = fields.Char(required=True)
code = fields.Char()
model_name = fields.Char()
condition_logic = fields.Selection( [('AND', 'All conditions must be met (AND)'),
('OR', 'Any condition can be met (OR)')],
string="Condition Logic",
default='AND', required=True)
set_id = fields.Many2one('rule.set', string="Rule Set", required=True)
event_name = fields.Char(required=True, index=True)
priority = fields.Integer(default=10)
condition_ids = fields.One2many("rule.condition", "rule_id", string="Conditions")
action_ids = fields.One2many('rule.action', 'rule_id', string="Actions")
stop_on_success = fields.Boolean(default=False, help="If checked, no lower priority rules will be processed if this one succeeds.")
is_fallback = fields.Boolean(default=False, help="Check this for a rule that should run if no other rules match.")
active = fields.Boolean(default=True)
@api.model
def _get_rules_for_event(self, event_name, set_code=None):
domain = [('event_name', '=', event_name), ('active', '=', True)]
if set_code:
set_rec = self.env['rule.set'].search([('code', '=', set_code)], limit=1)
if set_rec:
domain.append(('set_id', '=', set_rec.id))
return self.search(domain, order='priority asc')
def _get_record_from_payload(self, payload):
"""
一个通用的辅助方法,根据规则上定义的 model_name 从 payload 中提取主记录。
"""
if not self.model_name:
return None
# 约定 payload 里的 key 是 'sale_order_id', 'crm_lead_id' 等形式
record_id_key = f"{self.model_name.replace('.', '_')}_id"
record_id = payload.get(record_id_key)
if record_id:
return self.env[self.model_name].browse(record_id).exists()
return None
def evaluate_conditions(self, payload):
"""
重写后的、强大的条件评估方法。
"""
self.ensure_one()
if not self.condition_ids:
return True # 没有条件的规则,默认通过
main_record = self._get_record_from_payload(payload)
# 准备一个安全的执行上下文,用于执行Python表达式
eval_context = {
'env': self.env,
'payload': payload,
'record': main_record,
'user': self.env.user,
}
# 遍历所有条件
results = []
for condition in self.condition_ids:
result = False
if condition.condition_type == 'domain':
# 对于Domain类型的条件,必须有主记录才能评估
if not main_record:
results.append(False)
continue
try:
value = safe_eval(condition.value)
except Exception:
value = condition.value
domain = [(condition.field_path, condition.operator, value), ('id', '=', main_record.id)]
if main_record.search(domain, limit=1):
result = True
elif condition.condition_type == 'python':
# 对于Python表达式,可以直接在上下文中执行
try:
if safe_eval(condition.python_expression, eval_context):
result = True
except Exception as e:
_logger.error(f"Rule {self.id} python condition eval error: {e}")
result = False
results.append(result)
# 根据规则的逻辑组合方式(AND/OR)返回最终结果
if self.condition_logic == 'AND':
return all(results)
else: # OR
return any(results)- StrategyRule 逻辑解析
StrategyRule 、 RuleSet 与 RuleAction 三者的关系:
* `RuleSet` 和 `StrategyRule` 的关系是“一对多”:一个 RuleSet(规则集)可以包含多个 StrategyRule(规则)。RuleSet 的主要作用是分组和分类。比如,我们可以创建
一个叫“售后流程”的规则集,把所有与售后相关的规则都放进去;再创建一个叫“发货流程”的规则集。这让管理变得更清晰。
* `StrategyRule` 和 `RuleAction` 的关系也是“一对多”:一个 StrategyRule(规则)可以包含多个 RuleAction(动作)。
关系图是:
`RuleSet` 1 → N `StrategyRule` 1 → N `RuleAction`
StrategyRule 并没有去“映射” RuleSet 和 RuleAction。它更像是核心主体,它“属于”一个 RuleSet,同时它“拥有”多个 RuleAction。
---
2. 规则引擎的作用:
- 从技术底层来看,它的确像一个非常复杂的“映射表”。你说它是“映射表”是抓住了它的数据结构本质。
- 但从业务和设计层面来看,称它为“引擎”更合适,因为它做的不仅仅是 A → B 的简单映射。它做的是 `Event + Condition → Actions` 的条件化映射。
* 映射表:通常是静态的,Key 直接对应 Value。
* 规则引擎:是动态的,它包含逻辑(`Condition`)。它会根据你给的实时数据(payload)进行判断,然后才决定要映射到哪个动作。
所以,它的作用远不止是“映射”,而是“决策”。它是我们之前比喻的那个“决策大脑”,负责根据情况,动态地决定下一步该怎么走。
---
RuleAction 与 SaleOrderEventHandler 中的方法一一对应,但是可以有多个同样配置的 RuleAction,这样同一个定义在 SaleOrderEventHandler 的方法可以给多个 RuleSet 复用
这个设计的另一个巨大优势:处理器的复用性! 我们来把这个复用链条理清楚:
1. 我们在 SaleOrderEventHandler 中定义了一个可复用的、通用的方法,比如
handle_send_notification(payload)。这个方法只负责发送通知,它不关心是什么事件触发的。
2. 现在,我们可以创建多个 RuleAction 记录,它们的 handler_path 都指向这个 handle_send_notification 方法。
* Action A 指向它,并且通过 params 字段告诉它使用“换货成功”的模板。
* Action B 也指向它,但通过 params 告诉它使用“退货完成”的模板。
3. Action A 可能被一个属于 RuleSet A(比如“换货规则集”)的规则所使用。
4. Action B 可能被一个属于 RuleSet B(比如“退货规则集”)的规则所使用。
最终结果就是: 你只写了一次 handle_send_notification 方法,但它成功地被“换货”和“退货”两个完全不同的业务场景(甚至属于不同的规则集)复用了。
总结:
- 通过 RuleAction 这个中间层,我们将具体的“动作实现”(Handler方法)与“决策逻辑”(Rule)解耦,使得这些“动作实现”能像乐高积木一样,被不同的规则和场景灵活地组合、复用。
如何使用这套增强版框架?
假设你的业务需求是:“对于来自‘亚马逊’店铺的销售订单,如果订单金额超过500元,并且使用了内部折扣码 `SAVE10`,则触发一个特殊动作。”
使用我们这套新框架,你不需要写一行新代码,只需要在Odoo界面或XML中创建以下数据记录:
1. 创建一个 `StrategyRule` 记录:
* name: "亚马逊大额订单-特殊折扣码规则"
* event_name: "exchange_requested" (或 "order_confirmed")
* model_name: "sale.order"
* condition_logic: "AND"
2. 为这条规则创建三个 `RuleCondition` 记录:
* 条件1 (店铺):
* condition_type: "domain"
* field_path: "store_id.name"
* operator: "="
* value: "'Amazon'"
* 条件2 (金额):
* condition_type: "domain"
* field_path: "amount_total"
* operator: ">"
* value: "500"
* 条件3 (折扣码):
* condition_type: "python"
* python_expression: payload.get('promo_code') == 'SAVE10'
你看,我们把所有复杂的业务判断,都变成了可配置、可组合的数据。我们的 evaluate_conditions
方法就像一个通用的解释器,它不关心“店铺”或“折扣码”是什么,它只负责忠实地执行这些数据所定义的判断逻辑。
这套实现,就真正达成了你所期望的——用配置来驱动业务变化,而不是代码。
class RuleExecutionLog(models.Model):
_name = "rule.execution.log"
_description = "Rule execution log"
create_date = fields.Datetime()
event_name = fields.Char()
rule_id = fields.Many2one('strategy.rule')
action_id = fields.Many2one('rule.action')
payload = fields.Text()
matched = fields.Boolean(default=False)
action_executed = fields.Boolean(default=False)
error = fields.Text()这里的 async_mode 字段现在直接控制处理程序是同步执行还是作为后台作业异步执行。
<odoo>
<record id="ruleset_after_sale" model="rule.set">
<field name="name">After Sale Rules</field>
<field name="code">AFTER_SALE</field>
</record>
<!-- VIP 自动批准规则 (同步) -->
<record id="rule_vip_exchange" model="strategy.rule">
<field name="name">VIP Exchange Auto Approve</field>
<field name="set_id" ref="ruleset_after_sale"/>
<field name="event_name">exchange_requested</field>
<field name="priority">1</field>
<field name="condition_type">py</field>
<field name="condition">"order.partner_id.is_vip"</field>
<field name="stop_on_success">True</field>
</record>
<record id="rule_vip_action" model="rule.action">
<field name="rule_id" ref="rule_vip_exchange"/>
<field name="handler_path">sale.order.event.handler.handle_exchange_vip</field>
<field name="async_mode">False</field>
</record>
<!-- 兜底规则 (异步) -->
<record id="rule_default_exchange" model="strategy.rule">
<field name="name">Default Exchange Creation</field>
<field name="set_id" ref="ruleset_after_sale"/>
<field name="event_name">exchange_requested</field>
<field name="priority">100</field>
<field name="condition_type">py</field>
<field name="condition">"True"</field>
<field name="stop_on_success">True</field>
</record>
<record id="rule_default_action" model="rule.action">
<field name="rule_id" ref="rule_default_exchange"/>
<field name="handler_path">sale.order.event.handler.handle_exchange_standard</field>
<field name="async_mode">True</field>
</record>
</odoo># 假设在 Odoo 的某个按钮点击事件中调用
# self 是一个 sale.order 记录
self.apply_exchange_request("客户说尺寸不合适")- 入口简单:事件发布点(如
apply_exchange_request)应只负责调用with_delay(),不做任何业务逻辑。 - 幂等性:Handler 必须设计为幂等的。
queue_job可能会重试失败的作业,要确保多次执行同一个作业不会产生副作用(如此处创建重复的售后单)。 - 事务边界:
with_delay()会在当前事务成功提交后才将作业入队。这意味着,如果apply_exchange_request之后的代码抛出异常,作业将不会被创建。 - 日志记录:保留
rule.execution.log对于追踪和调试异步流程至关重要。 - 监控:使用 Odoo 的
queue_job看板来监控作业状态(等待、失败、完成),并设置重试策略。