Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
dbc912a
[STIP-23][Zeta] Add engine timer flush core flow and non-blocking Flu…
nzw921rx Apr 22, 2026
a95e733
[STIP-23][Zeta] update TaskExecutionService TimerFlush desc
nzw921rx Apr 22, 2026
ab1e6b0
[STIP-23][Zeta] update TaskExecutionService TimerFlush desc
nzw921rx Apr 22, 2026
66b8732
[STIP-23][Zeta] get "sink.flush.interval" use ReadonlyConfig.fromMap
nzw921rx Apr 22, 2026
0d66872
[STIP-23][Zeta] fix field desc error
nzw921rx Apr 22, 2026
4a36239
[Fix][Zeta] Fix barrier not published and FlushTimer not closed on ex…
nzw921rx Apr 22, 2026
86384b5
[Fix][Zeta] Signal extends Serializable
nzw921rx Apr 22, 2026
4e0d36c
fix(api): allow timerFlush to propagate IOException via RunnableWithE…
nzw921rx Apr 23, 2026
19e39ee
[STIP-23][Zeta] SinkContextProxy impl registerFlushAction and getFlus…
nzw921rx Apr 23, 2026
caf9ff9
[STIP-23][Zeta] send flush signal use sendRecordToNext
nzw921rx Apr 23, 2026
3d89dbb
[STIP-23][Zeta] fix code style
nzw921rx Apr 23, 2026
f31cca0
[STIP-23][Zeta] fix unit test not found sendFlushSignal
nzw921rx Apr 23, 2026
c8cdbd9
[STIP-23][Zeta] Optimize signal triggering processing logic
nzw921rx Apr 23, 2026
63007be
[STIP-23][Zeta] onTimerTick Capture Throwable Exception
nzw921rx Apr 23, 2026
e32861e
[Fix][Connector-V2] Increase configuration for timed refresh of threads
nzw921rx Apr 23, 2026
5c728bb
[Fix][Zeta] update timer-flush-pool-size defaultis 1
nzw921rx Apr 23, 2026
3b35661
[STIP-23][Zeta] Fix multi-table aggregated flush, add FlushSignal ser…
nzw921rx Apr 23, 2026
3162701
[STIP-23][Zeta] SinkWriterContext Add empty judgment
nzw921rx Apr 23, 2026
8e6bf80
[STIP-23][Zeta] Fix signal unresolved issues
nzw921rx Apr 23, 2026
12bd599
[STIP-23][Zeta] Fix the issue of dead loop in queue processing
nzw921rx Apr 23, 2026
782f217
[STIP-23][Zeta] rollback
nzw921rx Apr 23, 2026
9b0bce8
[STIP-23][Zeta] rollback
nzw921rx Apr 23, 2026
3b548b7
[STIP-23][Zeta] fix name
nzw921rx Apr 23, 2026
797066f
[STIP-23][Zeta] add Apache License
nzw921rx Apr 23, 2026
2799b1c
[STIP-23][Zeta] TimerFlushTestSinkFactory user AutoService
nzw921rx Apr 23, 2026
5c5747a
[STIP-23][Zeta] onTimerTick log level update to debug
nzw921rx Apr 24, 2026
3eb4df2
[STIP-23][Zeta] Fix timer-flush MDC propagation for scheduleWithFixed…
nzw921rx Apr 24, 2026
f386b52
[STIP-23][Zeta] Fix closeTimerFlushTask print severe log
nzw921rx Apr 24, 2026
e76f3c0
[STIP-23][Zeta] Fix seatunnel-server e2e assert
nzw921rx Apr 24, 2026
81deb40
[STIP-23][Zeta] Add tests for RecordSerializer, MultiTable flush and …
nzw921rx Apr 27, 2026
8cb0784
[STIP-23][Zeta] FlushSignal.java add serialVersionUID
nzw921rx Apr 27, 2026
2139122
[STIP-23][Zeta] trigger CI
nzw921rx Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/introduction/configuration/JobEnvConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ This parameter is used to specify the location of the savemode when the job is e
The default value is `CLUSTER`, which means that the savemode is executed on the cluster. If you want to execute the savemode on the client,
you can set it to `CLIENT`. Please use `CLUSTER` mode as much as possible, because when there are no problems with `CLUSTER` mode, we will remove `CLIENT` mode.

### sink.flush.interval

Interval (ms) at which the engine injects a `FlushSignal` into the pipeline to drive a flush at the Sink. `0` or unset (default) means disabled. Only works in the Zeta engine.

Values below 100ms are not recommended — excessive signals consume pipeline queue capacity, crowding out normal data records, and trigger empty flushes when no data has been buffered yet, increasing Sink I/O overhead.

## Flink Engine Parameter

Here are some SeaTunnel parameter names corresponding to the names in Flink, not all of them. Please refer to the official [Flink Documentation](https://flink.apache.org/).
Expand Down
6 changes: 6 additions & 0 deletions docs/zh/introduction/configuration/JobEnvConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
当值为`CLIENT`时,SaveMode操作在作业提交的过程中执行,使用shell脚本提交作业时,该过程在提交作业的shell进程中执行。使用rest api提交作业时,该过程在http请求的处理线程中执行。
请尽量使用`CLUSTER`模式,因为当`CLUSTER`模式没有问题时,我们将删除`CLIENT`模式。

### sink.flush.interval

定时向数据流中注入 `FlushSignal` 的间隔(毫秒),驱动 Sink 刷写缓冲数据。设置为 `0` 或不配置(默认)时不生效。仅适用于 Zeta 引擎。

建议不低于 100ms。过于频繁会产生大量无效空刷信号占用数据流队列容量,挤压正常数据记录的传输空间,并在缓冲区尚无数据时触发无意义的写出,增加 Sink I/O 开销。

## Flink 引擎参数

这里列出了一些与 Flink 中名称相对应的 SeaTunnel 参数名称,并非全部,更多内容请参考官方 [Flink Documentation](https://flink.apache.org/) for more.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ public class EnvCommonOptions {
.noDefaultValue()
.withDescription("The timeout (in milliseconds) for a checkpoint.");

public static Option<Long> SINK_FLUSH_INTERVAL =
Options.key("sink.flush.interval")
.longType()
.defaultValue(0L)
.withDescription(
"Interval (ms) at which the engine injects a FlushSignal into the pipeline to "
+ "drive a flush at the Sink. 0 means disabled. Values below 100ms will log a WARN.");

public static Option<Integer> CHECKPOINT_MIN_PAUSE =
Options.key("min-pause")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public OptionRule optionRule() {
EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION,
EnvCommonOptions.CUSTOM_PARAMETERS,
EnvCommonOptions.NODE_TAG_FILTER)
EnvCommonOptions.NODE_TAG_FILTER,
EnvCommonOptions.SINK_FLUSH_INTERVAL)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.signal;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.Objects;

@Getter
@RequiredArgsConstructor
public final class FlushSignal implements Signal {

private static final long serialVersionUID = 1L;

private final long jobId;
private final long taskId;
private final long createdTime;

public static FlushSignal of(long jobId, long taskId) {
return new FlushSignal(jobId, taskId, System.currentTimeMillis());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FlushSignal)) {
return false;
}
FlushSignal that = (FlushSignal) o;
return jobId == that.jobId && taskId == that.taskId && createdTime == that.createdTime;
}

@Override
public int hashCode() {
return Objects.hash(jobId, taskId, createdTime);
}

@Override
public String toString() {
return "FlushSignal{"
+ "jobId="
+ jobId
+ ", taskId="
+ taskId
+ ", createdTime="
+ createdTime
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.signal;

import java.io.Serializable;

/**
* Interface for control-plane signals that the engine propagates through the data flow.
*
* <p>A {@code Signal} is not a data record: it carries no user payload and is intended to instruct
* an operator (e.g. a {@link org.apache.seatunnel.api.sink.SinkWriter}) to perform a side-effecting
* action such as flushing buffered data.
*
* <p>Every signal carries three pieces of metadata:
*
* <ul>
* <li>{@link #getJobId()} — the job that produced the signal.
* <li>{@link #getTaskId()} — the source task that produced the signal, useful for logging and
* per-subtask diagnostics.
* <li>{@link #getCreatedTime()} — the wall-clock time (epoch millis) at which the signal was
* created by the engine.
* </ul>
*
* <p>Concrete signals should be small and immutable. New signal types are added by introducing new
* implementations of this interface; the engine routes them by type via {@code instanceof}.
*/
public interface Signal extends Serializable {

/** @return the id of the job that created this signal. */
long getJobId();

/** @return the id of the task that created this signal. */
long getTaskId();

/** @return the wall-clock creation time of this signal, in epoch milliseconds. */
long getCreatedTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.common.utils.function.RunnableWithException;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -117,5 +118,28 @@ default int getNumberOfParallelSubtasks() {
* @return
*/
EventListener getEventListener();

/**
* Register an action to be invoked by the engine when a periodic flush signal arrives.
*
* <p>This is the opt-in point for engine-level timer flush. A writer that wants to be
* flushed on a schedule should call this method during its initialization, typically with a
* method reference like {@code context.registerFlushAction(this::flush)}.
*
* @param action the action to invoke on each flush signal, must not be {@code null}
*/
default void registerFlushAction(RunnableWithException action) {}

/**
* Return the flush action previously registered via {@link
* #registerFlushAction(RunnableWithException)}, or {@code null} if the writer has not opted
* in to engine-level timer flush.
*
* <p>Callers must null-check the return value; a {@code null} return means the writer will
* silently ignore flush signals.
*/
default RunnableWithException getFlushAction() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,22 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> createWri
SinkWriter.Context context) throws IOException {
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new HashMap<>();
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new HashMap<>();
Map<SinkIdentifier, SinkContextProxy> proxyContexts = new HashMap<>();
for (int i = 0; i < replicaNum; i++) {
for (TablePath tablePath : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tablePath);
int index = context.getIndexOfSubtask() * replicaNum + i;
String tableIdentifier = tablePath.toString();
writers.put(
SinkIdentifier.of(tableIdentifier, index),
sink.createWriter(new SinkContextProxy(index, replicaNum, context)));
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context);
SinkIdentifier id = SinkIdentifier.of(tablePath.toString(), index);
SinkContextProxy proxy = new SinkContextProxy(index, replicaNum, context);
writers.put(id, sink.createWriter(proxy));
proxyContexts.put(id, proxy);
sinkWritersContext.put(id, context);
}
}
return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext);
MultiTableSinkWriter writer =
new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext);
registerAggregatedFlushIfNeeded(context, writer, proxyContexts);
return writer;
}

/**
Expand All @@ -134,12 +138,14 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
SinkWriter.Context context, List<MultiTableState> states) throws IOException {
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new HashMap<>();
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new HashMap<>();
Map<SinkIdentifier, SinkContextProxy> proxyContexts = new HashMap<>();

for (int i = 0; i < replicaNum; i++) {
for (TablePath tablePath : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tablePath);
int index = context.getIndexOfSubtask() * replicaNum + i;
SinkIdentifier sinkIdentifier = SinkIdentifier.of(tablePath.toString(), index);
SinkContextProxy proxy = new SinkContextProxy(index, replicaNum, context);
List<?> state =
states.stream()
.map(
Expand All @@ -149,19 +155,36 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (state.isEmpty()) {
writers.put(
sinkIdentifier,
sink.createWriter(new SinkContextProxy(index, replicaNum, context)));
writers.put(sinkIdentifier, sink.createWriter(proxy));
} else {
writers.put(
sinkIdentifier,
sink.restoreWriter(
new SinkContextProxy(index, replicaNum, context), state));
writers.put(sinkIdentifier, sink.restoreWriter(proxy, state));
}
proxyContexts.put(sinkIdentifier, proxy);
sinkWritersContext.put(sinkIdentifier, context);
}
}
return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext);
MultiTableSinkWriter writer =
new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext);
registerAggregatedFlushIfNeeded(context, writer, proxyContexts);
return writer;
}

/**
* Registers an aggregated flush action on the parent context if any sub-writer registered a
* flush action via its {@link SinkContextProxy}.
*
* <p>The registered action drains all blocking queues and then calls each sub-writer's flush
* action under the corresponding lock, ensuring safe execution from the engine timer thread.
*/
private void registerAggregatedFlushIfNeeded(
SinkWriter.Context context,
MultiTableSinkWriter writer,
Map<SinkIdentifier, SinkContextProxy> proxyContexts) {
boolean anyFlush =
proxyContexts.values().stream().anyMatch(p -> p.getFlushAction() != null);
if (anyFlush) {
context.registerFlushAction(() -> writer.aggregatedFlush(proxyContexts));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,28 @@ public void close() throws IOException {
}
}

/**
* Aggregated flush triggered by the engine timer. Drains all blocking queues first, then calls
* each sub-writer's flush action under the corresponding {@link MultiTableWriterRunnable} lock
* to prevent concurrent writes.
*
* @param proxyContexts map from sink identifier to its {@link SinkContextProxy}
*/
void aggregatedFlush(Map<SinkIdentifier, SinkContextProxy> proxyContexts) throws Exception {
checkQueueRemain();
subSinkErrorCheck();
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
synchronized (runnable.get(i)) {
for (SinkIdentifier id : sinkWritersWithIndex.get(i).keySet()) {
SinkContextProxy proxy = proxyContexts.get(id);
if (proxy != null && proxy.getFlushAction() != null) {
proxy.getFlushAction().run();
}
}
}
}
}

/**
* Busy-waits until all blocking queues are fully drained.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.common.utils.function.RunnableWithException;

import java.util.Objects;

public class SinkContextProxy implements SinkWriter.Context {

private final int index;

private final int replicaNum;

private final SinkWriter.Context context;
private transient volatile RunnableWithException flushAction;

public SinkContextProxy(int index, int replicaNum, SinkWriter.Context context) {
this.index = index;
Expand All @@ -54,4 +56,15 @@ public MetricsContext getMetricsContext() {
public EventListener getEventListener() {
return context.getEventListener();
}

@Override
public void registerFlushAction(RunnableWithException action) {
Objects.requireNonNull(action, "flushAction");
this.flushAction = action;
}

@Override
public RunnableWithException getFlushAction() {
return flushAction;
}
}
Loading
Loading