Skip to content

[PD] Fix PD interaction and error response#7500

Open
juncaipeng wants to merge 1 commit intoPaddlePaddle:developfrom
juncaipeng:refine_pd
Open

[PD] Fix PD interaction and error response#7500
juncaipeng wants to merge 1 commit intoPaddlePaddle:developfrom
juncaipeng:refine_pd

Conversation

@juncaipeng
Copy link
Copy Markdown
Collaborator

Motivation

💡 If this PR is a Cherry Pick, the PR title needs to follow the format by adding the [Cherry-Pick] label at the very beginning and appending the original PR ID at the end. For example, [Cherry-Pick][CI] Add check trigger and logic(#5191)

💡 如若此PR是Cherry Pick,PR标题需遵循格式,在最开始加上[Cherry-Pick]标签,以及最后面加上原PR ID,例如[Cherry-Pick][CI] Add check trigger and logic(#5191)

Modifications

Usage or Command

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

Copilot AI review requested due to automatic review settings April 20, 2026 05:15
@paddle-bot
Copy link
Copy Markdown

paddle-bot bot commented Apr 20, 2026

Thanks for your contribution!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

该 PR 旨在改进 Splitwise(Prefill/Decode 分离)模式下的 PD 交互与错误返回:在发生 decode 侧资源不足/抢占等情况时,能更一致地向上游传递“PD Error”并尝试在 Router 侧做重试与更友好的错误响应。

Changes:

  • Router 增加 splitwise 模式下的 preempt 重试能力,并新增对应 CLI 参数。
  • PD 链路中统一/增强错误透传:decode->prefill 的 cache_sync 发送逻辑、以及引擎侧/输出侧错误码与错误文案。
  • OpenAI 协议层扩展 finish_reason,API 层在错误响应时尽量返回已生成内容并标记 pd_reschedule。

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
fastdeploy/splitwise/splitwise_connector.py 调整 decode 向 prefill 发送 cache_sync 的聚合/发送逻辑,扩大错误通知覆盖面。
fastdeploy/router/router.py 增加 preempt 重试参数与重试主流程(含可选排除上次 decode 实例)。
fastdeploy/output/token_processor.py prefill 发送 cache 失败时的错误码与错误信息调整(引入 “PD Error” 文案)。
fastdeploy/input/base_processor.py 遇到错误响应时跳过 token 解码,直接上抛给上游处理。
fastdeploy/entrypoints/openai/serving_chat.py 非流式错误场景下补齐 outputs 并返回已生成文本;新增 pd_reschedule finish_reason 判定。
fastdeploy/entrypoints/openai/protocol.py 扩展 OpenAI 协议 finish_reason 可选值:pd_reschedule。
fastdeploy/engine/common_engine.py PD 相关错误日志/错误响应文案与错误码调整(含 preempted 场景)。

Comment on lines +177 to +191
async def select_pd(self, exclude_decode=None):
"""Select one prefill and one decode server, optionally excluding a decode instance."""
async with self.lock:
if not self.prefill_servers:
raise RuntimeError(f"No prefill servers available (decode={len(self.decode_servers)})")
if not self.decode_servers:
raise RuntimeError(f"No decode servers available (prefill={len(self.prefill_servers)})")
pidx = random.randint(0, len(self.prefill_servers) - 1)
didx = random.randint(0, len(self.decode_servers) - 1)
return self.prefill_servers[pidx], self.decode_servers[didx]
available_decode = (
[d for d in self.decode_servers if d is not exclude_decode] if exclude_decode else self.decode_servers
)
if not available_decode:
available_decode = self.decode_servers
didx = random.randint(0, len(available_decode) - 1)
return self.prefill_servers[pidx], available_decode[didx]
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select_pd 的 exclude_decode 通过对象身份(is not)做排除;但 register_instance 的 _update_or_add_instance 可能用新的 InstanceInfo 对象替换列表元素,导致同一个 decode 实例在重试时无法被正确排除。建议改为按实例唯一 key(例如 inst.get_key()/url())比较与排除。

Copilot uses AI. Check for mistakes.
Comment on lines 864 to +868
if data.get("error_msg", None) is not None and "Aborted" in data["error_msg"]:
finish_reason = "abort"

if data.get("error_msg", None) is not None and "PD Error" in data["error_msg"]:
finish_reason = "pd_reschedule"
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_create_chat_completion_choice 里对 finish_reason 的设置是连续 if:如果 error_msg 同时包含 "Aborted" 和 "PD Error"(例如上游拼接/透传了多段信息),后面的 "pd_reschedule" 会覆盖前面的 "abort"。建议用 elif 或优先级明确的逻辑(例如 abort 优先于 pd_reschedule)。

Copilot uses AI. Check for mistakes.
Comment on lines +52 to +59
preempt_retry_count: int = 3
"""
Max retry count when decode instance preempts a request in splitwise mode.
"""
preempt_retry_exclude_decode: bool = False
"""
Whether to exclude the previously used decode instance when retrying after preemption.
"""
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR 标题目前使用了“[PD]”标签,但模板列出的 tag 列表里没有该标签;建议改为列表中的语义标签(例如 [PD Disaggregation] 或 [BugFix])以满足仓库规范。

Copilot uses AI. Check for mistakes.
Comment on lines +52 to +59
preempt_retry_count: int = 3
"""
Max retry count when decode instance preempts a request in splitwise mode.
"""
preempt_retry_exclude_decode: bool = False
"""
Whether to exclude the previously used decode instance when retrying after preemption.
"""
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR 描述基本仍是模板内容,缺少明确的 Motivation/Modifications/Usage/测试信息;建议补充:触发的具体 PD preempt/错误场景、改动点、以及如何复现/验证(含是否有单测或手动测试命令)。

Copilot uses AI. Check for mistakes.
Comment on lines +221 to +260
last_decode_server = None
max_attempts = self.preempt_retry_count + 1

# TODO: unify the disaggregate_info in server and remove redundancy params
is_same_node = prefill_server.host_ip == decode_server.host_ip
is_support_ipc = "ipc" in prefill_server.transfer_protocol and "ipc" in decode_server.transfer_protocol
is_same_tp_size = prefill_server.tp_size == decode_server.tp_size
use_ipc = is_same_node and is_support_ipc and is_same_tp_size

disaggregate_info = {
"prefill_ip": prefill_server.host_ip,
"decode_ip": decode_server.host_ip,
"prefill_connector_port": prefill_server.connector_port,
"decode_connector_port": decode_server.connector_port,
"decode_device_ids": decode_server.device_ids,
"decode_rdma_ports": decode_server.rdma_ports,
"transfer_protocol": "ipc" if use_ipc else "rdma",
"decode_tp_size": decode_server.tp_size,
}
for attempt in range(max_attempts):
prefill_server, decode_server = await self.select_pd(
exclude_decode=last_decode_server if self.preempt_retry_exclude_decode else None
)
logger.debug(f"Selected prefill server: {prefill_server}, decode server: {decode_server}")

modified_request = request_data.copy()
modified_request["disaggregate_info"] = disaggregate_info
if "request_id" not in modified_request:
modified_request["request_id"] = str(uuid4())
if prefill_server.tp_size != decode_server.tp_size and decode_server.tp_size != 1:
raise HTTPException(
status_code=400,
detail="The tp_size of prefill and decode should be equal or the tp_size of decode is 1",
)

logger.debug(f"Modified request: {modified_request}")
# TODO: unify the disaggregate_info in server and remove redundancy params
is_same_node = prefill_server.host_ip == decode_server.host_ip
is_support_ipc = "ipc" in prefill_server.transfer_protocol and "ipc" in decode_server.transfer_protocol
is_same_tp_size = prefill_server.tp_size == decode_server.tp_size
use_ipc = is_same_node and is_support_ipc and is_same_tp_size

disaggregate_info = {
"prefill_ip": prefill_server.host_ip,
"decode_ip": decode_server.host_ip,
"prefill_connector_port": prefill_server.connector_port,
"decode_connector_port": decode_server.connector_port,
"decode_device_ids": decode_server.device_ids,
"decode_rdma_ports": decode_server.rdma_ports,
"transfer_protocol": "ipc" if use_ipc else "rdma",
"decode_tp_size": decode_server.tp_size,
}

modified_request = request_data.copy()
modified_request["disaggregate_info"] = disaggregate_info
# Preserve client request_id on first attempt; append retry suffix on subsequent attempts
base_request_id = request_data.get("request_id") or str(uuid4())
if attempt == 0:
modified_request["request_id"] = base_request_id
else:
modified_request["request_id"] = f"{base_request_id}-retry{attempt}"
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle_splitwise_request 里 base_request_id 在 for 循环内部用 uuid4() 每次重算:当客户端未提供 request_id 且发生重试时,每次 attempt 都会变更 base_request_id,导致重试间请求无法关联、且循环结束后日志/返回使用的 base_request_id 不是第一次的值。建议把 base_request_id 生成移动到循环外(只生成一次),并确保 modified_request 的 request_id 策略在所有 attempt 下都一致可追踪。

Copilot uses AI. Check for mistakes.
Comment on lines +221 to +285
last_decode_server = None
max_attempts = self.preempt_retry_count + 1

# TODO: unify the disaggregate_info in server and remove redundancy params
is_same_node = prefill_server.host_ip == decode_server.host_ip
is_support_ipc = "ipc" in prefill_server.transfer_protocol and "ipc" in decode_server.transfer_protocol
is_same_tp_size = prefill_server.tp_size == decode_server.tp_size
use_ipc = is_same_node and is_support_ipc and is_same_tp_size

disaggregate_info = {
"prefill_ip": prefill_server.host_ip,
"decode_ip": decode_server.host_ip,
"prefill_connector_port": prefill_server.connector_port,
"decode_connector_port": decode_server.connector_port,
"decode_device_ids": decode_server.device_ids,
"decode_rdma_ports": decode_server.rdma_ports,
"transfer_protocol": "ipc" if use_ipc else "rdma",
"decode_tp_size": decode_server.tp_size,
}
for attempt in range(max_attempts):
prefill_server, decode_server = await self.select_pd(
exclude_decode=last_decode_server if self.preempt_retry_exclude_decode else None
)
logger.debug(f"Selected prefill server: {prefill_server}, decode server: {decode_server}")

modified_request = request_data.copy()
modified_request["disaggregate_info"] = disaggregate_info
if "request_id" not in modified_request:
modified_request["request_id"] = str(uuid4())
if prefill_server.tp_size != decode_server.tp_size and decode_server.tp_size != 1:
raise HTTPException(
status_code=400,
detail="The tp_size of prefill and decode should be equal or the tp_size of decode is 1",
)

logger.debug(f"Modified request: {modified_request}")
# TODO: unify the disaggregate_info in server and remove redundancy params
is_same_node = prefill_server.host_ip == decode_server.host_ip
is_support_ipc = "ipc" in prefill_server.transfer_protocol and "ipc" in decode_server.transfer_protocol
is_same_tp_size = prefill_server.tp_size == decode_server.tp_size
use_ipc = is_same_node and is_support_ipc and is_same_tp_size

disaggregate_info = {
"prefill_ip": prefill_server.host_ip,
"decode_ip": decode_server.host_ip,
"prefill_connector_port": prefill_server.connector_port,
"decode_connector_port": decode_server.connector_port,
"decode_device_ids": decode_server.device_ids,
"decode_rdma_ports": decode_server.rdma_ports,
"transfer_protocol": "ipc" if use_ipc else "rdma",
"decode_tp_size": decode_server.tp_size,
}

modified_request = request_data.copy()
modified_request["disaggregate_info"] = disaggregate_info
# Preserve client request_id on first attempt; append retry suffix on subsequent attempts
base_request_id = request_data.get("request_id") or str(uuid4())
if attempt == 0:
modified_request["request_id"] = base_request_id
else:
modified_request["request_id"] = f"{base_request_id}-retry{attempt}"

if request_data.get("stream", False):
return await self._generate_stream(
modified_request, [prefill_server.url(), decode_server.url()], endpoint=endpoint_name
)
else:
return await self._generate(
modified_request, [prefill_server.url(), decode_server.url()], endpoint=endpoint_name
)
logger.debug(f"Modified request: {modified_request}")

async def _generate(
if request_data.get("stream", False):
return await self._generate_stream(
modified_request, [prefill_server.url(), decode_server.url()], endpoint=endpoint_name
)
else:
ret_json, status_code = await self._do_generate(
modified_request, [prefill_server.url(), decode_server.url()], endpoint=endpoint_name
)
logger.debug(f"Get response of req {modified_request['request_id']}: {ret_json}")

if self._is_preempted_response(ret_json):
last_decode_server = decode_server
logger.warning(
f"Preemption detected on attempt {attempt+1}/{max_attempts}, "
f"decode={decode_server.url()}, req_id {modified_request['request_id']},"
f"retrying with new PD instances..."
)
else:
break

logger.error(f"Return response of req_id {base_request_id}: {ret_json}")
return ORJSONResponse(content=ret_json, status_code=status_code)
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preempt_retry_count 没有做非负校验:如果用户传入负数,max_attempts 可能 <=0,for 循环不会执行,后续直接引用 base_request_id/ret_json/status_code 会触发 UnboundLocalError。建议在解析 args 或进入 handle_splitwise_request 时校验 preempt_retry_count >= 0(不合法时直接抛 400/ValueError)。

Copilot uses AI. Check for mistakes.
else:
break

logger.error(f"Return response of req_id {base_request_id}: {ret_json}")
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

循环结束后无论是否成功都用 logger.error 打印“Return response ...”,会把正常请求也记为 error,影响线上告警/排障。建议仅在最终仍为 preempt/失败时记录 error(成功路径用 debug/info),并在成功时避免误导性的文案。

Suggested change
logger.error(f"Return response of req_id {base_request_id}: {ret_json}")
is_failed_response = self._is_preempted_response(ret_json) or status_code >= 400
if is_failed_response:
logger.error(
f"Returning failed response of req_id {base_request_id} "
f"(status_code={status_code}): {ret_json}"
)
else:
logger.info(
f"Returning successful response of req_id {base_request_id} "
f"(status_code={status_code})"
)

Copilot uses AI. Check for mistakes.
Comment on lines +289 to +298
# ChatCompletionResponse format: choices[0].finish_reason == "preempted"
choices = ret_json.get("choices", [])
if choices:
finish_reason = choices[0].get("finish_reason", "")
if finish_reason == "preempted":
return True
# ErrorResponse format compatibility
error = ret_json.get("error", {})
if isinstance(error, dict) and "not enough blocks" in str(error.get("message", "")):
return True
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_is_preempted_response 目前只识别 finish_reason == "preempted" 或 error.message 包含 "not enough blocks";但本 PR 同时在 OpenAI 协议里新增了 finish_reason "pd_reschedule",且错误文案统一为 "PD Error" 前缀。这样 router 的重试逻辑可能无法触发。建议把 "pd_reschedule"(以及/或 error_msg 包含 "PD Error")也纳入 preempt 判定,保证重试生效。

Suggested change
# ChatCompletionResponse format: choices[0].finish_reason == "preempted"
choices = ret_json.get("choices", [])
if choices:
finish_reason = choices[0].get("finish_reason", "")
if finish_reason == "preempted":
return True
# ErrorResponse format compatibility
error = ret_json.get("error", {})
if isinstance(error, dict) and "not enough blocks" in str(error.get("message", "")):
return True
# ChatCompletionResponse format: choices[0].finish_reason indicates a retryable preemption.
choices = ret_json.get("choices", [])
if choices:
finish_reason = choices[0].get("finish_reason", "")
if finish_reason in {"preempted", "pd_reschedule"}:
return True
# ErrorResponse format compatibility for both legacy and current PD error messages.
error = ret_json.get("error", {})
if isinstance(error, dict):
error_message = str(error.get("message", ""))
if "not enough blocks" in error_message or "PD Error" in error_message:
return True

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

@PaddlePaddle-bot PaddlePaddle-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Review | 2026-04-20 13:26 CST

📋 Review 摘要

PR 概述:修复 PD 分离架构中的交互逻辑和错误响应处理,增加 Router 层的抢占重试机制
变更范围:Router、Engine、APIServer、Splitwise Connector、DataProcessor
影响面 Tag[PD Disaggregation] [APIServer] [Engine]

📝 PR 规范检查

PR 描述中 Motivation 和 Modifications 章节未填写具体内容,仅保留了模板占位符。

标题建议(可直接复制):

  • [PD Disaggregation] Fix PD interaction and error response, add preempt retry

描述模板(可直接复制):

## Motivation
修复 PD 分离部署模式下的错误处理逻辑,统一错误消息格式("PD Error" 前缀),并在 Router 层增加 decode 抢占后的自动重试机制。

## Modifications
1. 统一 PD 相关错误消息格式,添加 "PD Error" 前缀便于上游识别
2. Router 增加 preempt_retry_count / preempt_retry_exclude_decode 配置,支持非流式请求抢占重试
3. 新增 finish_reason "pd_reschedule",在 error response 中携带已生成 token
4. 修复 splitwise_connector 中 send_cache_info_to_prefill 逻辑,确保所有情况都发送 cache_info

问题

级别 文件 概述
🔴 Bug router.py:293 _is_preempted_response 检查 finish_reason == "preempted" 但实际值为 "pd_reschedule",重试逻辑永远不会通过此路径触发
🔴 Bug router.py:256 base_request_id 在循环内计算,若 request_data 无 request_id 则每次重试生成不同 UUID
🟡 建议 router.py:284 logger.error 无条件执行,成功请求也会记录 ERROR 级别日志

总体评价

重试机制的设计思路正确,但存在关键逻辑不一致问题:Router 的 _is_preempted_response 检测条件与下游实际设置的 finish_reason 值不匹配,导致重试机制无法正常触发。此外 base_request_id 在循环内重复生成会导致重试请求 ID 不连续。建议修复后补充相关单测。

choices = ret_json.get("choices", [])
if choices:
finish_reason = choices[0].get("finish_reason", "")
if finish_reason == "preempted":
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug finish_reason 检查值与实际值不一致,重试逻辑无法触发

serving_chat.py 中对 PD 错误设置的 finish_reason"pd_reschedule"(第 868 行),但此处检查的是 "preempted",两者不匹配。这意味着 Router 永远无法通过 choices[0].finish_reason 路径检测到抢占,重试机制实质上依赖于下方的 "not enough blocks" 字符串匹配(而该字符串在 common_engine.py 中也已改为 "does not have enough blocks",同样无法匹配)。

建议修复:

if finish_reason == "pd_reschedule":
    return True

同时更新下方 error message 的匹配逻辑,使用与 common_engine.py 一致的关键字(如 "PD Error")。

modified_request = request_data.copy()
modified_request["disaggregate_info"] = disaggregate_info
# Preserve client request_id on first attempt; append retry suffix on subsequent attempts
base_request_id = request_data.get("request_id") or str(uuid4())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug base_request_id 在循环体内计算,导致重试时 ID 不一致

request_data 不包含 request_id 时,str(uuid4()) 在每次迭代都会生成新的 UUID。这导致:

  1. 第二次重试的 base_request_id 与第一次不同
  2. retry 后缀 (-retry1, -retry2) 无法关联到同一个原始请求
  3. 循环结束后 base_request_id 的值是最后一次迭代生成的,日志无法追踪完整链路

建议将 base_request_id 的计算移到 for 循环之前:

base_request_id = request_data.get("request_id") or str(uuid4())
for attempt in range(max_attempts):
    ...

else:
break

logger.error(f"Return response of req_id {base_request_id}: {ret_json}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 logger.error 无条件执行,污染日志

无论请求是否成功,此处都会以 ERROR 级别记录响应。正常成功的请求不应记录为 ERROR,否则在高并发场景下会产生大量噪音日志,影响排查真正的错误。

建议区分成功和失败场景:

if self._is_preempted_response(ret_json):
    logger.error(f"All {max_attempts} attempts exhausted for req_id {base_request_id}: {ret_json}")
else:
    logger.debug(f"Return response of req_id {base_request_id}")

text = ""
data["outputs"] = {
"text": text,
"completion_tokens": text,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 completion_tokens 字段赋值为文本字符串 text

此处 "completion_tokens": text 赋值为 decoded 文本而非 token 数量。请确认这是否符合下游消费方的预期?从字段名语义来看,completion_tokens 通常是整数计数,如果此处确实是文本内容,建议重命名为更明确的字段名以避免歧义。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants