Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 310 additions & 0 deletions docs/en/concept/error-handling.md

Large diffs are not rendered by default.

310 changes: 310 additions & 0 deletions docs/zh/concept/error-handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
# 错误数据处理(实验性功能)

在 SeaTunnel 中,默认行为:只要某个 Connector 或 Transform 抛出异常,**整个作业就会失败**。

从本实验性能力开始,用户可以改变这一行为,由引擎 **捕获异常数据,将其路由到错误 Sink,并在条件允许时继续推进作业**。

> **状态:实验性(Experimental)**
>
> 目前错误处理 JDBC Sink 是经过验证的;错误处理与行级错误路由默认关闭,配置和语义在后续版本中可能有调整。

## 适用场景

推荐启用错误处理的典型场景包括但不限于:

- 大批量离线任务中存在少量脏数据(例如非法时间、字符串超长等);
- Sink 表偶发出现主键或唯一约束冲突;
- 需要在存在个别异常记录的情况下保持作业整体可用性,并将错误数据单独记录以便后续排查和补数。

不建议或需谨慎启用错误处理的场景包括:

- 对“所有合法数据必须严格写入”具有较强 at-least-once 或 exactly-once 语义要求;
- 使用复杂的多表 Sink 并希望在多个表之间保持严格一致性语义的场景。

## 整体思路

启用错误处理后,引擎对于每条记录的处理逻辑可概括为:

1. 首先按照原有逻辑,由 Transform / Sink 正常处理该记录;
2. 在处理过程中如发生异常,引擎会尝试区分:
- **行级错误**:由于该条数据本身引起的异常(例如数据格式错误、约束冲突等);
- **系统级错误**:例如连接中断、资源不足(OOM)等基础设施问题;
3. 对于系统级错误,行为与默认一致:直接失败作业;
4. 对于被判定为行级错误的情况,引擎会将该记录及异常信息交给 **错误处理器(ErrorHandler)**:
- `mode = LOG`:仅记录日志;
- `mode = ROUTE`:在记录日志的基础上,将错误记录写入单独配置的 **错误 Sink**(例如 JDBC 错误表)。

其余正常记录仍会沿原有链路向下游传递。

错误处理行为通过 **env 配置** 控制:

- **阶段级(env)**:在 `env.transform_error_handler` / `env.sink_error_handler` 中统一配置该阶段默认行为;
- **全局(env)**:在 `env.error_handler` 中给所有阶段提供默认值。

部分 Transform(例如 JsonPath、DataValidator)仍然保留自身早期的 `row_error_handle_way` 等行错误控制选项,这些选项与本文介绍的引擎级错误处理机制可以并行存在,但目前尚未与 `env.*_error_handler` 做自动合并。

## 核心概念

### 模式(mode)

配置中最常见的字段为 `mode`:

- `DISABLE`:关闭该阶段的错误处理(默认行为);
- `LOG`:仅记录行级错误日志,不路由到错误 Sink;
- `ROUTE`:记录并将行级错误路由到错误 Sink。

如果完全不配置上述选项,SeaTunnel 的行为与历史版本保持一致:任意异常都会导致作业失败。

### 错误 Sink

**错误 Sink** 是专门用于接收错误数据的一条 Sink,需要在 `..._error_handler.sink` 下进行配置,例如:

```hocon
env {
sink_error_handler {
mode = "ROUTE"

sink {
plugin_name = "Jdbc"
error_table = "orders_sink_error_basic"
# 这里配置错误表对应的 Jdbc Sink 选项
}
}
}
```

一种常见用法是:

- 主 Sink 写入业务表(例如 `orders_from_sink`);
- 错误 Sink 写入错误表(例如 `orders_sink_error_*`),用于后续排查和补数。

### 行级错误 vs 系统级错误

在大多数情况下,用户无需手动编写逻辑来判断“是否为行级错误”。

引擎会尝试区分:

- **行级错误**:通常由单条数据本身导致,引擎可在配置允许时旁路该条数据并继续作业;
- **系统级错误**:通常是连接中断、资源不足(OOM)等基础设施问题,会直接导致作业失败。

当前版本的默认分类策略:

- **Sink 阶段**:若 Sink Connector 未实现 `SupportRowLevelError`,其异常将被当作系统级错误处理(即使配置了 `sink_error_handler` 也会失败作业)。
- **Transform 阶段**:若 Transform 未实现 `SupportRowLevelError`,其异常将被当作系统级错误处理(即使配置了 `transform_error_handler` 也会失败作业)。

对于部分 Connector(例如 JDBC),Connector 本身会通过接口显式声明“哪些异常属于行级错误”。引擎会优先采用这类显式声明。

只有实现了 `SupportRowLevelError` 的 Connector/Transform,才能触发行级错误;否则所有异常都会被当作系统级错误处理并导致作业失败。

> 说明
>
> 本文描述的是当前版本的**通用引擎级流程**。后续会逐步推动更多内置 Transform 实现 `SupportRowLevelError`,以便更准确地区分“行级错误”与“系统级错误”。

### Transform 阶段发生行级错误时会怎样

当 Transform 被判定为行级错误时,**该条记录会从主链路中被丢弃**,不会进入后续 Transform,也不会进入下游 Sink:

- 对 `map(...)`:返回 `null`,等价于“过滤掉该条记录”;
- 对 `flatMap(...)`:返回空列表,等价于“丢弃该条记录”。

如果同时开启了 `mode = ROUTE` 且配置了错误 Sink,则该条原始记录及异常信息仍可被写入错误表用于排查和补数。

## 配置与参数说明

### 配置位置

错误处理目前主要通过 **env 配置** 生效:

- **阶段级(env)**:在 `env.transform_error_handler` / `env.sink_error_handler` 中统一配置该阶段默认行为,例如:

```hocon
env {
transform_error_handler {
mode = "ROUTE"

sink {
plugin_name = "Jdbc"
error_table = "orders_transform_error_from_env"
}
}

sink_error_handler {
mode = "ROUTE"
queue_capacity = 10000
queue_overflow_policy = "FAIL"

sink {
plugin_name = "Jdbc"
error_table = "orders_sink_error_from_env"
}
}
}
```

- **全局(env)**:在 `env.error_handler` 中为所有阶段提供默认值,例如:

```hocon
env {
error_handler {
mode = "LOG"
include_original_data = true
include_stacktrace = false
}
}
```

同名参数的覆盖顺序(由高到低):

1. 阶段级 `env.transform_error_handler` / `env.sink_error_handler`;
2. 全局 `env.error_handler`(默认 `DISABLE`)。

各个 Transform / Sink 插件自身已有的错误处理选项(例如 JsonPath / DataValidator 的 `row_error_handle_way`)目前与上述 env 配置**相互独立**:插件内部选项仅影响该插件内部行为,而 `env.*_error_handler` 控制的是引擎级的行级错误旁路能力。

### 通用参数一览

| 参数 | 类型 | 默认值 | 说明 / 取值 |
|------------------------|--------|--------|------------------------------------------------------------------------------------------|
| `mode` | String | `DISABLE` | 行级错误处理模式:`DISABLE`(关闭)、`LOG`(只记录)、`ROUTE`(记录并路由到错误 Sink)。 |
| `max_error_ratio` | Double | `0.0` | 允许的错误比例,0.0–1.0;例如 `0.01` 表示错误记录超过 1% 时失败作业;`0.0` 表示不按比例触发失败。 |
| `max_error_ratio_min_records` | Integer | `100` | `max_error_ratio` 的预热阈值:当总处理记录数小于该值时,不进行比例触发,避免在样本很小时误触发失败。 |
| `max_error_records` | Long | `0` | 允许的错误记录总数上限;`0` 表示不按错误条数触发失败。 |
| `queue_capacity` | Integer | `10000` | 内部错误队列(缓冲区)容量上限,队列中最多可同时缓存的错误记录数量。 |
| `queue_overflow_policy`| String | `FAIL` | 错误队列已满时的策略:`FAIL`(失败作业)、`DROP`(丢弃新错误记录)、`BLOCK`(阻塞生产错误的线程,可能影响吞吐)。 |
| `include_original_data`| Boolean | `false` | 是否在错误记录中包含原始数据内容。 |
| `include_stacktrace` | Boolean | `false` | 是否在错误记录中包含完整 Java 异常堆栈;开启会增加单条错误记录的体积。 |
| `original_data_format` | String | `TEXT` | **预留参数**。当前版本仅支持 `TEXT`,内部统一按字符串形式写入错误表(`original_data` 为记录的字符串表示,即 `String.valueOf(row)`)。 |
| `original_data_max_length` | Integer | `8192` | 原始数据序列化后的最大长度,超过部分将被截断,用于控制单条错误记录大小。 |

阈值统计口径:当前版本阈值使用内部计数器:Sink 每次 `write(...)` 计 1;Transform 链中每个 `map(...)`/`flatMap(...)` 调用计 1;同一条 Transform 链上的多个算子共享同一个计数器。

### 错误 Sink 相关参数一览

在 `..._error_handler.sink` 下配置错误记录要写到哪里:

| 参数 | 类型 | 说明 |
|-----------------|------|--------------------------------------------------------------------|
| `plugin_name` | String | 错误 Sink 使用的 Connector 名称,例如 `Jdbc`。 |
| `error_table` | String | (JDBC 专用)错误记录要写入的目标表名,例如 `orders_sink_error_basic`。 |

除此之外,错误 Sink 还需要配置各自 Connector 的常规参数,例如 JDBC 的 `url`、`username`、`password`、`driver` 等,写法与普通 Sink 完全一致。

如果 `mode = ROUTE` 但没有配置 `sink { ... }`(或 `plugin_name` 为空),则行级错误会被识别并记录日志,但由于没有可用的错误 Sink,错误记录不会写入错误表。

### 错误表结构

当前引擎为错误 Sink 构造了一张统一的错误表 Schema(以 JDBC 为例):

- `error_stage`:字符串,错误发生的阶段(例如 `TRANSFORM` / `SINK`);
- `plugin_type`:字符串,插件类型(例如 `TRANSFORM` / `SINK`);
- `plugin_name`:字符串,插件名称(例如 `Jdbc` 等);
- `source_table_path`:字符串,源表路径或标识;
- `error_message`:字符串,异常的简要错误信息(已按照内部上限截断);
- `exception_class`:字符串,异常类名;
- `stacktrace`:字符串,完整堆栈信息(仅在 `include_stacktrace = true` 时填写);
- `original_data`:字符串,原始数据内容(仅在 `include_original_data = true` 时填写,长度受 `original_data_max_length` 控制);
- `occur_time`:时间戳,错误发生时间(UTC)。

上述字段名称在不同错误表中保持一致,便于统一查询和分析。

## JDBC 错误处理如何工作(重点)

JDBC 是当前最主要使用行级错误处理能力的 Connector。

### JDBC 里什么算“行级错误”?

`JdbcSinkWriter` 会检查 `SQLException` 链,如果发现:

- `SQLState` 以 `22` 开头——数据异常(比如数据太长、类型不匹配);
- `SQLState` 以 `23` 开头——完整性约束异常(比如主键/唯一键冲突);

则会将其视为 **行级错误**。否则视为 **系统级错误**,直接让作业失败。

对于其他 Sink,如果未实现 `SupportRowLevelError` 接口,引擎会更保守地将异常视为系统级错误:即使配置了 `sink_error_handler`,这类异常也不会被当作行级错误旁路,而是直接失败作业。

### 发生行级错误时,批处理会怎样?

JDBC Sink 通常会把多条记录放在一个 JDBC batch 里,一次性发送给数据库。

当写入某条记录时发生了**行级错误**:

- Connector 会捕获这个异常;
- 如果判断这是“行级数据错误”,会调用一个帮助方法,**清空当前内存中的 JDBC batch**。

这意味着:

- 当前 batch 中所有“还没有真正发到数据库、但已经加入 batch 的记录”都会被一起清空;
- 这条坏记录会被交给错误处理器(写日志 / 写错误表);
- 同一批次中的其它“好记录”**不会被自动重试**。

从使用者角度可以理解为:

> **一旦这个批次中出现行级错误,这整个批次就被当作“错误批次”处理。**

因此,在“**启用了 batch 且启用了错误处理**”的组合下:

- 可能存在极少量原本合法的记录由于与错误数据处于同一批次而未写入目标库;
- 对“所有合法记录”的严格 at-least-once 语义,在此配置组合下不再具备正式保证。

上述行为属于 Connector 级别的当前实现细节,后续会逐步对不同 Sink 的实现进行优化,针对错误的批量提交进行优化,找出具体错误数据进行错误处理,以降低误伤合法记录的概率并提升可追溯性。

### JDBC 使用建议

- 若更关注作业稳定性,并能够接受少量合法记录在错误批次中被丢弃:
- 可以启用错误处理并保留批写入;
- 可通过错误表和日志对异常数据进行事后分析与补数。

- 若对“任何合法记录都不得丢失”有严格要求:
- 可考虑关闭 JDBC 行级错误处理,或
- 在启用错误处理的同时将 `batch_size` 调小(甚至设置为 `1`),使每个 batch 最多仅包含一条记录;
- 强烈建议在测试环境中结合实际数据库和 JDBC 驱动充分验证后,再在生产环境中启用该能力。

## 多表 Sink 的当前状态

> **实验性能力,尚未完全支持。**

## 基本配置示例(单表 JDBC Sink)

下面给出一个最小示例,用于演示如何在 Sink 阶段将行级错误路由到 JDBC 错误表:

```hocon
env {
sink_error_handler {
mode = "ROUTE" # 或 LOG / DISABLE
max_error_ratio = 0.01 # 错误比例 > 1% 时失败作业
max_error_records = 1000 # 或错误总数 > 1000 时失败作业
queue_capacity = 10000
queue_overflow_policy = "FAIL" # FAIL / DROP / BLOCK

include_original_data = true
include_stacktrace = false
original_data_format = "TEXT"
original_data_max_length = 8192

sink {
plugin_name = "Jdbc"
error_table = "orders_sink_error_basic"
# 这里配置错误表对应的 Jdbc Sink 选项
}
}
}
```

### MySQL 错误表结构

如果您使用 MySQL 作为错误 Sink,需要手动创建如下结构的表:

```sql
CREATE TABLE sink_error_basic (
error_stage VARCHAR(50),
plugin_type VARCHAR(50),
plugin_name VARCHAR(100),
source_table_path VARCHAR(255),
error_message TEXT,
exception_class VARCHAR(255),
stacktrace TEXT,
original_data TEXT,
occur_time TIMESTAMP
);
```

对于 Transform 阶段,可以通过 `transform_error_handler` 进行类似配置。
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.common;

/**
* Marker interface for connectors/transforms that can classify exceptions as row-level or
* system-level errors.
*/
public interface SupportRowLevelError<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommended name SupportRowLevelErrorClassifier


/**
* Determines if the exception is a row-level error that can be bypassed.
*
* @param t the thrown error
* @param row the row being processed
* @return true if row-level error, false if system-level error
*/
boolean isRowError(Throwable t, T row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is recommended to return an enumeration type to deal with different error classifications for subsequent expansion.

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.sink.error.RowErrorCollector;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;

import java.io.IOException;
Expand Down Expand Up @@ -117,5 +118,12 @@ default int getNumberOfParallelSubtasks() {
* @return
*/
EventListener getEventListener();

/**
* Row-level error collector provided by the engine for reporting errors outside write().
*/
default Optional<RowErrorCollector> getRowErrorCollector() {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink.error;

/** Collector for sink row-level errors that occur outside SinkWriter.write(). */
@FunctionalInterface
public interface RowErrorCollector {
void collect(RowErrorEvent event) throws Exception;
}
Loading