diff --git a/docs/en/introduction/configuration/JobEnvConfig.md b/docs/en/introduction/configuration/JobEnvConfig.md index 9320c28ccc2c..40a23cad869b 100644 --- a/docs/en/introduction/configuration/JobEnvConfig.md +++ b/docs/en/introduction/configuration/JobEnvConfig.md @@ -55,6 +55,12 @@ This parameter is used to specify the location of the savemode when the job is e The default value is `CLUSTER`, which means that the savemode is executed on the cluster. If you want to execute the savemode on the client, you can set it to `CLIENT`. Please use `CLUSTER` mode as much as possible, because when there are no problems with `CLUSTER` mode, we will remove `CLIENT` mode. +### sink.flush.interval + +Interval (ms) at which the engine injects a `FlushSignal` into the pipeline to drive a flush at the Sink. `0` or unset (default) means disabled. Only works in the Zeta engine. + +Values below 100ms are not recommended — excessive signals consume pipeline queue capacity, crowding out normal data records, and trigger empty flushes when no data has been buffered yet, increasing Sink I/O overhead. + ## Flink Engine Parameter Here are some SeaTunnel parameter names corresponding to the names in Flink, not all of them. Please refer to the official [Flink Documentation](https://flink.apache.org/). diff --git a/docs/zh/introduction/configuration/JobEnvConfig.md b/docs/zh/introduction/configuration/JobEnvConfig.md index a8443099218d..a90c15dd8b09 100644 --- a/docs/zh/introduction/configuration/JobEnvConfig.md +++ b/docs/zh/introduction/configuration/JobEnvConfig.md @@ -56,6 +56,12 @@ 当值为`CLIENT`时,SaveMode操作在作业提交的过程中执行,使用shell脚本提交作业时,该过程在提交作业的shell进程中执行。使用rest api提交作业时,该过程在http请求的处理线程中执行。 请尽量使用`CLUSTER`模式,因为当`CLUSTER`模式没有问题时,我们将删除`CLIENT`模式。 +### sink.flush.interval + +定时向数据流中注入 `FlushSignal` 的间隔(毫秒),驱动 Sink 刷写缓冲数据。设置为 `0` 或不配置(默认)时不生效。仅适用于 Zeta 引擎。 + +建议不低于 100ms。过于频繁会产生大量无效空刷信号占用数据流队列容量,挤压正常数据记录的传输空间,并在缓冲区尚无数据时触发无意义的写出,增加 Sink I/O 开销。 + ## Flink 引擎参数 这里列出了一些与 Flink 中名称相对应的 SeaTunnel 参数名称,并非全部,更多内容请参考官方 [Flink Documentation](https://flink.apache.org/) for more. diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java index 4ce8d8436b54..fa6ec303669b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java @@ -85,6 +85,14 @@ public class EnvCommonOptions { .noDefaultValue() .withDescription("The timeout (in milliseconds) for a checkpoint."); + public static Option SINK_FLUSH_INTERVAL = + Options.key("sink.flush.interval") + .longType() + .defaultValue(0L) + .withDescription( + "Interval (ms) at which the engine injects a FlushSignal into the pipeline to " + + "drive a flush at the Sink. 0 means disabled. Values below 100ms will log a WARN."); + public static Option CHECKPOINT_MIN_PAUSE = Options.key("min-pause") .intType() diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java index bb025aa4d202..51a1dbb2ab89 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvOptionRule.java @@ -47,7 +47,8 @@ public OptionRule optionRule() { EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND, EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION, EnvCommonOptions.CUSTOM_PARAMETERS, - EnvCommonOptions.NODE_TAG_FILTER) + EnvCommonOptions.NODE_TAG_FILTER, + EnvCommonOptions.SINK_FLUSH_INTERVAL) .build(); } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/signal/FlushSignal.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/signal/FlushSignal.java new file mode 100644 index 000000000000..fed6c3a63451 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/signal/FlushSignal.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.signal; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Objects; + +@Getter +@RequiredArgsConstructor +public final class FlushSignal implements Signal { + + private static final long serialVersionUID = 1L; + + private final long jobId; + private final long taskId; + private final long createdTime; + + public static FlushSignal of(long jobId, long taskId) { + return new FlushSignal(jobId, taskId, System.currentTimeMillis()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof FlushSignal)) { + return false; + } + FlushSignal that = (FlushSignal) o; + return jobId == that.jobId && taskId == that.taskId && createdTime == that.createdTime; + } + + @Override + public int hashCode() { + return Objects.hash(jobId, taskId, createdTime); + } + + @Override + public String toString() { + return "FlushSignal{" + + "jobId=" + + jobId + + ", taskId=" + + taskId + + ", createdTime=" + + createdTime + + '}'; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/signal/Signal.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/signal/Signal.java new file mode 100644 index 000000000000..c0dfade04bcf --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/signal/Signal.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.signal; + +import java.io.Serializable; + +/** + * Interface for control-plane signals that the engine propagates through the data flow. + * + *

A {@code Signal} is not a data record: it carries no user payload and is intended to instruct + * an operator (e.g. a {@link org.apache.seatunnel.api.sink.SinkWriter}) to perform a side-effecting + * action such as flushing buffered data. + * + *

Every signal carries three pieces of metadata: + * + *

+ * + *

Concrete signals should be small and immutable. New signal types are added by introducing new + * implementations of this interface; the engine routes them by type via {@code instanceof}. + */ +public interface Signal extends Serializable { + + /** @return the id of the job that created this signal. */ + long getJobId(); + + /** @return the id of the task that created this signal. */ + long getTaskId(); + + /** @return the wall-clock creation time of this signal, in epoch milliseconds. */ + long getCreatedTime(); +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java index 103b282a24bd..6c6a63c84c0e 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.event.EventListener; import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.common.utils.function.RunnableWithException; import java.io.IOException; import java.io.Serializable; @@ -117,5 +118,28 @@ default int getNumberOfParallelSubtasks() { * @return */ EventListener getEventListener(); + + /** + * Register an action to be invoked by the engine when a periodic flush signal arrives. + * + *

This is the opt-in point for engine-level timer flush. A writer that wants to be + * flushed on a schedule should call this method during its initialization, typically with a + * method reference like {@code context.registerFlushAction(this::flush)}. + * + * @param action the action to invoke on each flush signal, must not be {@code null} + */ + default void registerFlushAction(RunnableWithException action) {} + + /** + * Return the flush action previously registered via {@link + * #registerFlushAction(RunnableWithException)}, or {@code null} if the writer has not opted + * in to engine-level timer flush. + * + *

Callers must null-check the return value; a {@code null} return means the writer will + * silently ignore flush signals. + */ + default RunnableWithException getFlushAction() { + return null; + } } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java index ff13ceeb049a..c458ab2fc463 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java @@ -102,18 +102,22 @@ public SinkWriter createWri SinkWriter.Context context) throws IOException { Map> writers = new HashMap<>(); Map sinkWritersContext = new HashMap<>(); + Map proxyContexts = new HashMap<>(); for (int i = 0; i < replicaNum; i++) { for (TablePath tablePath : sinks.keySet()) { SeaTunnelSink sink = sinks.get(tablePath); int index = context.getIndexOfSubtask() * replicaNum + i; - String tableIdentifier = tablePath.toString(); - writers.put( - SinkIdentifier.of(tableIdentifier, index), - sink.createWriter(new SinkContextProxy(index, replicaNum, context))); - sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context); + SinkIdentifier id = SinkIdentifier.of(tablePath.toString(), index); + SinkContextProxy proxy = new SinkContextProxy(index, replicaNum, context); + writers.put(id, sink.createWriter(proxy)); + proxyContexts.put(id, proxy); + sinkWritersContext.put(id, context); } } - return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext); + MultiTableSinkWriter writer = + new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext); + registerAggregatedFlushIfNeeded(context, writer, proxyContexts); + return writer; } /** @@ -134,12 +138,14 @@ public SinkWriter restoreWr SinkWriter.Context context, List states) throws IOException { Map> writers = new HashMap<>(); Map sinkWritersContext = new HashMap<>(); + Map proxyContexts = new HashMap<>(); for (int i = 0; i < replicaNum; i++) { for (TablePath tablePath : sinks.keySet()) { SeaTunnelSink sink = sinks.get(tablePath); int index = context.getIndexOfSubtask() * replicaNum + i; SinkIdentifier sinkIdentifier = SinkIdentifier.of(tablePath.toString(), index); + SinkContextProxy proxy = new SinkContextProxy(index, replicaNum, context); List state = states.stream() .map( @@ -149,19 +155,36 @@ public SinkWriter restoreWr .flatMap(Collection::stream) .collect(Collectors.toList()); if (state.isEmpty()) { - writers.put( - sinkIdentifier, - sink.createWriter(new SinkContextProxy(index, replicaNum, context))); + writers.put(sinkIdentifier, sink.createWriter(proxy)); } else { - writers.put( - sinkIdentifier, - sink.restoreWriter( - new SinkContextProxy(index, replicaNum, context), state)); + writers.put(sinkIdentifier, sink.restoreWriter(proxy, state)); } + proxyContexts.put(sinkIdentifier, proxy); sinkWritersContext.put(sinkIdentifier, context); } } - return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext); + MultiTableSinkWriter writer = + new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext); + registerAggregatedFlushIfNeeded(context, writer, proxyContexts); + return writer; + } + + /** + * Registers an aggregated flush action on the parent context if any sub-writer registered a + * flush action via its {@link SinkContextProxy}. + * + *

The registered action drains all blocking queues and then calls each sub-writer's flush + * action under the corresponding lock, ensuring safe execution from the engine timer thread. + */ + private void registerAggregatedFlushIfNeeded( + SinkWriter.Context context, + MultiTableSinkWriter writer, + Map proxyContexts) { + boolean anyFlush = + proxyContexts.values().stream().anyMatch(p -> p.getFlushAction() != null); + if (anyFlush) { + context.registerFlushAction(() -> writer.aggregatedFlush(proxyContexts)); + } } @Override diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java index 84837536fbe1..686cce1124cd 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java @@ -485,6 +485,28 @@ public void close() throws IOException { } } + /** + * Aggregated flush triggered by the engine timer. Drains all blocking queues first, then calls + * each sub-writer's flush action under the corresponding {@link MultiTableWriterRunnable} lock + * to prevent concurrent writes. + * + * @param proxyContexts map from sink identifier to its {@link SinkContextProxy} + */ + void aggregatedFlush(Map proxyContexts) throws Exception { + checkQueueRemain(); + subSinkErrorCheck(); + for (int i = 0; i < sinkWritersWithIndex.size(); i++) { + synchronized (runnable.get(i)) { + for (SinkIdentifier id : sinkWritersWithIndex.get(i).keySet()) { + SinkContextProxy proxy = proxyContexts.get(id); + if (proxy != null && proxy.getFlushAction() != null) { + proxy.getFlushAction().run(); + } + } + } + } + } + /** * Busy-waits until all blocking queues are fully drained. * diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java index 5f4bf75f6fff..1e3aed695518 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java @@ -20,14 +20,16 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.event.EventListener; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.common.utils.function.RunnableWithException; + +import java.util.Objects; public class SinkContextProxy implements SinkWriter.Context { private final int index; - private final int replicaNum; - private final SinkWriter.Context context; + private transient volatile RunnableWithException flushAction; public SinkContextProxy(int index, int replicaNum, SinkWriter.Context context) { this.index = index; @@ -54,4 +56,15 @@ public MetricsContext getMetricsContext() { public EventListener getEventListener() { return context.getEventListener(); } + + @Override + public void registerFlushAction(RunnableWithException action) { + Objects.requireNonNull(action, "flushAction"); + this.flushAction = action; + } + + @Override + public RunnableWithException getFlushAction() { + return flushAction; + } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RecordSerializerIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RecordSerializerIT.java new file mode 100644 index 000000000000..dbbc016c2e12 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RecordSerializerIT.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.api.signal.FlushSignal; +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.RowKind; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.execution.TaskLocation; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import com.hazelcast.config.Config; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import com.hazelcast.map.IMap; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +@Slf4j +public class RecordSerializerIT { + + private static final String MAP_NAME = "test-record-serializer"; + private static HazelcastInstanceImpl instance1; + private static HazelcastInstanceImpl instance2; + + @BeforeAll + static void setUp() { + String clusterName = TestUtils.getClusterName("RecordSerializerIT_hzSerializationTest"); + int[] ports = findTwoFreePorts(); + instance1 = createHazelcastInstance(clusterName, ports[0], ports[1]); + instance2 = createHazelcastInstance(clusterName, ports[1], ports[0]); + await().atMost(30, TimeUnit.SECONDS) + .until(() -> instance1.getCluster().getMembers().size() == 2); + } + + @AfterAll + static void tearDown() { + if (instance1 != null) { + instance1.shutdown(); + } + if (instance2 != null) { + instance2.shutdown(); + } + } + + @Test + public void testSeaTunnelRowRoundTrip() { + SeaTunnelRow row = new SeaTunnelRow(3); + row.setTableId("test_db.test_table"); + row.setRowKind(RowKind.INSERT); + row.setField(0, "hello"); + row.setField(1, 42); + row.setField(2, 3.14); + + Record original = new Record<>(row); + + IMap> writerMap = instance1.getMap(MAP_NAME); + IMap> readerMap = instance2.getMap(MAP_NAME); + + String key = "row-insert"; + writerMap.put(key, original); + + await().atMost(10, TimeUnit.SECONDS).until(() -> readerMap.containsKey(key)); + + Record deserialized = readerMap.get(key); + Assertions.assertNotNull(deserialized); + Assertions.assertInstanceOf(SeaTunnelRow.class, deserialized.getData()); + + SeaTunnelRow actual = (SeaTunnelRow) deserialized.getData(); + Assertions.assertEquals("test_db.test_table", actual.getTableId()); + Assertions.assertEquals(RowKind.INSERT, actual.getRowKind()); + Assertions.assertEquals(3, actual.getArity()); + Assertions.assertEquals("hello", actual.getField(0)); + Assertions.assertEquals(42, actual.getField(1)); + Assertions.assertEquals(3.14, actual.getField(2)); + Assertions.assertEquals(row, actual); + + writerMap.remove(key); + } + + @Test + public void testSeaTunnelRowAllRowKinds() { + for (RowKind rowKind : RowKind.values()) { + SeaTunnelRow row = new SeaTunnelRow(1); + row.setTableId("db.tbl"); + row.setRowKind(rowKind); + row.setField(0, rowKind.shortString()); + + Record original = new Record<>(row); + + IMap> writerMap = instance1.getMap(MAP_NAME); + IMap> readerMap = instance2.getMap(MAP_NAME); + + String key = "row-kind-" + rowKind.name(); + writerMap.put(key, original); + + await().atMost(10, TimeUnit.SECONDS).until(() -> readerMap.containsKey(key)); + + Record deserialized = readerMap.get(key); + SeaTunnelRow actual = (SeaTunnelRow) deserialized.getData(); + Assertions.assertEquals("db.tbl", actual.getTableId()); + Assertions.assertEquals(rowKind, actual.getRowKind()); + Assertions.assertEquals(1, actual.getArity()); + Assertions.assertEquals(rowKind.shortString(), actual.getField(0)); + Assertions.assertEquals(row, actual); + + writerMap.remove(key); + } + } + + @Test + public void testCheckpointBarrierRoundTrip() { + CheckpointBarrier barrier = + new CheckpointBarrier( + 100L, + System.currentTimeMillis(), + CheckpointType.CHECKPOINT_TYPE, + Collections.emptySet(), + Collections.emptySet()); + + Record original = new Record<>(barrier); + + IMap> writerMap = instance1.getMap(MAP_NAME); + IMap> readerMap = instance2.getMap(MAP_NAME); + + String key = "barrier-checkpoint"; + writerMap.put(key, original); + + await().atMost(10, TimeUnit.SECONDS).until(() -> readerMap.containsKey(key)); + + Record deserialized = readerMap.get(key); + Assertions.assertNotNull(deserialized); + Assertions.assertInstanceOf(CheckpointBarrier.class, deserialized.getData()); + + CheckpointBarrier actual = (CheckpointBarrier) deserialized.getData(); + Assertions.assertEquals(barrier.getId(), actual.getId()); + Assertions.assertEquals(barrier.getTimestamp(), actual.getTimestamp()); + Assertions.assertEquals(CheckpointType.CHECKPOINT_TYPE, actual.getCheckpointType()); + Assertions.assertEquals(Collections.emptySet(), actual.getPrepareCloseTasks()); + Assertions.assertEquals(Collections.emptySet(), actual.getClosedTasks()); + Assertions.assertEquals(barrier, actual); + + writerMap.remove(key); + } + + @Test + public void testCheckpointBarrierWithTaskLocations() { + TaskGroupLocation groupLoc1 = new TaskGroupLocation(1L, 1, 100L); + TaskGroupLocation groupLoc2 = new TaskGroupLocation(1L, 1, 200L); + TaskLocation taskLoc1 = new TaskLocation(groupLoc1, 1L, 0); + TaskLocation taskLoc2 = new TaskLocation(groupLoc2, 2L, 1); + + Set prepareClose = new HashSet<>(); + prepareClose.add(taskLoc1); + Set closed = new HashSet<>(); + closed.add(taskLoc2); + + CheckpointBarrier barrier = + new CheckpointBarrier( + 300L, + System.currentTimeMillis(), + CheckpointType.SAVEPOINT_TYPE, + prepareClose, + closed); + + Record original = new Record<>(barrier); + + IMap> writerMap = instance1.getMap(MAP_NAME); + IMap> readerMap = instance2.getMap(MAP_NAME); + + String key = "barrier-with-tasks"; + writerMap.put(key, original); + + await().atMost(10, TimeUnit.SECONDS).until(() -> readerMap.containsKey(key)); + + Record deserialized = readerMap.get(key); + Assertions.assertNotNull(deserialized); + Assertions.assertInstanceOf(CheckpointBarrier.class, deserialized.getData()); + + CheckpointBarrier actual = (CheckpointBarrier) deserialized.getData(); + Assertions.assertEquals(300L, actual.getId()); + Assertions.assertEquals(barrier.getTimestamp(), actual.getTimestamp()); + Assertions.assertEquals(CheckpointType.SAVEPOINT_TYPE, actual.getCheckpointType()); + Assertions.assertEquals(1, actual.getPrepareCloseTasks().size()); + Assertions.assertTrue(actual.getPrepareCloseTasks().contains(taskLoc1)); + Assertions.assertEquals(1, actual.getClosedTasks().size()); + Assertions.assertTrue(actual.getClosedTasks().contains(taskLoc2)); + + writerMap.remove(key); + } + + @Test + public void testCheckpointBarrierAllTypes() { + for (CheckpointType type : CheckpointType.values()) { + CheckpointBarrier barrier = + new CheckpointBarrier( + 200L + type.ordinal(), + System.currentTimeMillis(), + type, + Collections.emptySet(), + Collections.emptySet()); + + Record original = new Record<>(barrier); + + IMap> writerMap = instance1.getMap(MAP_NAME); + IMap> readerMap = instance2.getMap(MAP_NAME); + + String key = "barrier-type-" + type.name(); + writerMap.put(key, original); + + await().atMost(10, TimeUnit.SECONDS).until(() -> readerMap.containsKey(key)); + + Record deserialized = readerMap.get(key); + CheckpointBarrier actual = (CheckpointBarrier) deserialized.getData(); + Assertions.assertEquals(type, actual.getCheckpointType()); + Assertions.assertEquals(200L + type.ordinal(), actual.getId()); + Assertions.assertEquals(barrier.getTimestamp(), actual.getTimestamp()); + Assertions.assertEquals(Collections.emptySet(), actual.getPrepareCloseTasks()); + Assertions.assertEquals(Collections.emptySet(), actual.getClosedTasks()); + Assertions.assertEquals(barrier, actual); + + writerMap.remove(key); + } + } + + @Test + public void testFlushSignalRoundTrip() { + FlushSignal flushSignal = new FlushSignal(12345L, 67890L, System.currentTimeMillis()); + + Record original = new Record<>(flushSignal); + + IMap> writerMap = instance1.getMap(MAP_NAME); + IMap> readerMap = instance2.getMap(MAP_NAME); + + String key = "flush-signal"; + writerMap.put(key, original); + + await().atMost(10, TimeUnit.SECONDS).until(() -> readerMap.containsKey(key)); + + Record deserialized = readerMap.get(key); + Assertions.assertNotNull(deserialized); + Assertions.assertInstanceOf(FlushSignal.class, deserialized.getData()); + + FlushSignal actual = (FlushSignal) deserialized.getData(); + Assertions.assertEquals(12345L, actual.getJobId()); + Assertions.assertEquals(67890L, actual.getTaskId()); + Assertions.assertEquals(flushSignal.getCreatedTime(), actual.getCreatedTime()); + Assertions.assertEquals(flushSignal, actual); + + writerMap.remove(key); + } + + @Test + public void testSeaTunnelRowWithNullFields() { + SeaTunnelRow row = new SeaTunnelRow(3); + row.setTableId("db.nullable_table"); + row.setRowKind(RowKind.INSERT); + row.setField(0, null); + row.setField(1, "non-null"); + row.setField(2, null); + + Record original = new Record<>(row); + + IMap> writerMap = instance1.getMap(MAP_NAME); + IMap> readerMap = instance2.getMap(MAP_NAME); + + String key = "row-nullable"; + writerMap.put(key, original); + + await().atMost(10, TimeUnit.SECONDS).until(() -> readerMap.containsKey(key)); + + Record deserialized = readerMap.get(key); + Assertions.assertNotNull(deserialized); + Assertions.assertInstanceOf(SeaTunnelRow.class, deserialized.getData()); + + SeaTunnelRow actual = (SeaTunnelRow) deserialized.getData(); + Assertions.assertEquals("db.nullable_table", actual.getTableId()); + Assertions.assertEquals(RowKind.INSERT, actual.getRowKind()); + Assertions.assertEquals(3, actual.getArity()); + Assertions.assertNull(actual.getField(0)); + Assertions.assertEquals("non-null", actual.getField(1)); + Assertions.assertNull(actual.getField(2)); + Assertions.assertEquals(row, actual); + + writerMap.remove(key); + } + + private static HazelcastInstanceImpl createHazelcastInstance( + String clusterName, int localPort, int peerPort) { + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.getEngineConfig().getHttpConfig().setEnabled(false); + Config hazelcastConfig = + Config.loadFromString(buildHazelcastConfig(clusterName, localPort, peerPort)); + seaTunnelConfig.setHazelcastConfig(hazelcastConfig); + return SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + } + + private static String buildHazelcastConfig(String clusterName, int localPort, int peerPort) { + return "hazelcast:\n" + + " cluster-name: " + + clusterName + + "\n" + + " network:\n" + + " join:\n" + + " tcp-ip:\n" + + " enabled: true\n" + + " member-list:\n" + + " - 127.0.0.1:" + + localPort + + "\n" + + " - 127.0.0.1:" + + peerPort + + "\n" + + " port:\n" + + " auto-increment: false\n" + + " port-count: 1\n" + + " port: " + + localPort + + "\n"; + } + + private static int[] findTwoFreePorts() { + try (ServerSocket first = new ServerSocket(0); + ServerSocket second = new ServerSocket(0)) { + first.setReuseAddress(true); + second.setReuseAddress(true); + return new int[] {first.getLocalPort(), second.getLocalPort()}; + } catch (IOException e) { + throw new RuntimeException("No free Hazelcast ports available", e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/MultiTableFlushTestSink.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/MultiTableFlushTestSink.java new file mode 100644 index 000000000000..73df5bebbf20 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/MultiTableFlushTestSink.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e.timerflush; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +public class MultiTableFlushTestSink extends AbstractSimpleSink { + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) { + return new MultiTableFlushTestSinkWriter(context); + } + + @Override + public String getPluginName() { + return MultiTableFlushTestSinkFactory.IDENTIFIER; + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/MultiTableFlushTestSinkFactory.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/MultiTableFlushTestSinkFactory.java new file mode 100644 index 000000000000..63b46e744bce --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/MultiTableFlushTestSinkFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e.timerflush; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class MultiTableFlushTestSinkFactory implements TableSinkFactory { + + public static final String IDENTIFIER = "MultiTableFlushTest"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().build(); + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + return () -> new MultiTableFlushTestSink(); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/MultiTableFlushTestSinkWriter.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/MultiTableFlushTestSinkWriter.java new file mode 100644 index 000000000000..61909a3a8944 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/MultiTableFlushTestSinkWriter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e.timerflush; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class MultiTableFlushTestSinkWriter extends AbstractSinkWriter { + + public static final ConcurrentMap FLUSH_COUNTS = + new ConcurrentHashMap<>(); + + public static final ConcurrentMap WRITE_COUNTS = new ConcurrentHashMap<>(); + + public static final ConcurrentMap FLUSHED_ROW_TOTALS = + new ConcurrentHashMap<>(); + + public static final CopyOnWriteArrayList FLUSH_SNAPSHOTS = + new CopyOnWriteArrayList<>(); + + private final ConcurrentMap buffer = new ConcurrentHashMap<>(); + + public MultiTableFlushTestSinkWriter(SinkWriter.Context context) { + context.registerFlushAction(this::flush); + } + + @Override + public void write(SeaTunnelRow element) { + String tableId = element.getTableId(); + if (tableId != null) { + buffer.computeIfAbsent(tableId, k -> new AtomicLong(0)).incrementAndGet(); + WRITE_COUNTS.computeIfAbsent(tableId, k -> new AtomicLong(0)).incrementAndGet(); + } + } + + private void flush() { + List tableCounts = new ArrayList<>(); + for (ConcurrentMap.Entry entry : buffer.entrySet()) { + long count = entry.getValue().getAndSet(0); + if (count > 0) { + tableCounts.add(new FlushSnapshot.TableCount(entry.getKey(), count)); + FLUSH_COUNTS + .computeIfAbsent(entry.getKey(), k -> new AtomicInteger(0)) + .incrementAndGet(); + FLUSHED_ROW_TOTALS + .computeIfAbsent(entry.getKey(), k -> new AtomicLong(0)) + .addAndGet(count); + } + } + if (!tableCounts.isEmpty()) { + FLUSH_SNAPSHOTS.add( + new FlushSnapshot( + System.nanoTime(), Thread.currentThread().getName(), tableCounts)); + } + } + + @Override + public void close() {} + + public static void reset() { + FLUSH_COUNTS.clear(); + WRITE_COUNTS.clear(); + FLUSHED_ROW_TOTALS.clear(); + FLUSH_SNAPSHOTS.clear(); + } + + public static class FlushSnapshot { + public final long timestampNanos; + public final String threadName; + public final List tableCounts; + + FlushSnapshot(long timestampNanos, String threadName, List tableCounts) { + this.timestampNanos = timestampNanos; + this.threadName = threadName; + this.tableCounts = tableCounts; + } + + public static class TableCount { + public final String tableId; + public final long rowCount; + + TableCount(String tableId, long rowCount) { + this.tableId = tableId; + this.rowCount = rowCount; + } + } + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushIT.java new file mode 100644 index 000000000000..10ab517fef5d --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushIT.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e.timerflush; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.e2e.TestUtils; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static org.awaitility.Awaitility.await; + +public class TimerFlushIT { + + private static final String CLUSTER_NAME = "TimerFlushIT"; + + private HazelcastInstanceImpl hazelcastInstance; + private SeaTunnelConfig seaTunnelConfig; + + @BeforeEach + void setUp() { + TimerFlushTestSinkWriter.reset(); + MultiTableFlushTestSinkWriter.reset(); + seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(CLUSTER_NAME)); + hazelcastInstance = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + } + + @AfterEach + void tearDown() throws InterruptedException { + if (hazelcastInstance != null) { + hazelcastInstance.shutdown(); + } + } + + @Test + void testFlushTriggeredByEngineTimer() throws Exception { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("timer_flush_enabled.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("timer-flush-enabled"); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName(CLUSTER_NAME)); + + try (SeaTunnelClient client = new SeaTunnelClient(clientConfig)) { + ClientJobExecutionEnvironment env = + client.createExecutionContext(filePath, jobConfig, seaTunnelConfig); + ClientJobProxy jobProxy = env.execute(); + + // Timer flush can run before any rows reach the sink (empty buffer flush still + // increments FLUSH_COUNT). Wait until a flush actually moved rows into FLUSHED_ROWS. + await().atMost(15, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertFalse( + TimerFlushTestSinkWriter.FLUSHED_ROWS.isEmpty(), + "Expected at least one flush with buffered rows; flushCount=" + + TimerFlushTestSinkWriter.FLUSH_COUNT.get())); + + jobProxy.cancelJob(); + } + } + + @Test + void testNoFlushWhenTimerDisabled() throws Exception { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("timer_flush_disabled.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("timer-flush-disabled"); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName(CLUSTER_NAME)); + + try (SeaTunnelClient client = new SeaTunnelClient(clientConfig)) { + ClientJobExecutionEnvironment env = + client.createExecutionContext(filePath, jobConfig, seaTunnelConfig); + ClientJobProxy jobProxy = env.execute(); + + // Assert that flushCount stays at 0 for 8 continuous seconds. + // No timer is registered when sink.flush.interval is unset, so nothing should leak. + Awaitility.await() + .during(8, TimeUnit.SECONDS) + .atMost(9, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .until(() -> TimerFlushTestSinkWriter.FLUSH_COUNT.get() == 0); + + Assertions.assertTrue( + TimerFlushTestSinkWriter.FLUSHED_ROWS.isEmpty(), + "Flushed rows must remain empty when timer flush is disabled"); + + jobProxy.cancelJob(); + } + } + + @Test + void testMultiTableAggregatedFlush() throws Exception { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("timer_flush_multi_table.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("timer-flush-multi-table"); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName(CLUSTER_NAME)); + + try (SeaTunnelClient client = new SeaTunnelClient(clientConfig)) { + ClientJobExecutionEnvironment env = + client.createExecutionContext(filePath, jobConfig, seaTunnelConfig); + ClientJobProxy jobProxy = env.execute(); + + // 1. Wait until all three tables have been flushed at least 3 times + String[] tableIds = {"db1.table_a", "db1.table_b", "db1.table_c"}; + await().atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + for (String tableId : tableIds) { + AtomicInteger count = + MultiTableFlushTestSinkWriter.FLUSH_COUNTS.get(tableId); + Assertions.assertNotNull( + count, tableId + " should have been flushed"); + Assertions.assertTrue( + count.get() >= 3, + tableId + + " flush count should be >= 3, got: " + + count.get()); + } + }); + + // 2. Every flush snapshot must contain at least one row (queue drain before flush) + List snapshots = + MultiTableFlushTestSinkWriter.FLUSH_SNAPSHOTS; + Assertions.assertFalse(snapshots.isEmpty(), "Should have recorded flush snapshots"); + for (int i = 0; i < snapshots.size(); i++) { + MultiTableFlushTestSinkWriter.FlushSnapshot snap = snapshots.get(i); + Assertions.assertFalse( + snap.tableCounts.isEmpty(), + "Flush snapshot #" + i + " should have table counts"); + for (MultiTableFlushTestSinkWriter.FlushSnapshot.TableCount tc : snap.tableCounts) { + Assertions.assertTrue( + tc.rowCount > 0, + "Flush snapshot #" + + i + + " table=" + + tc.tableId + + " should have rowCount > 0, got: " + + tc.rowCount); + } + } + + // 3. Verify concurrency: multiple writer instances participated in flushing + Set flushThreads = + snapshots.stream().map(s -> s.threadName).collect(Collectors.toSet()); + Assertions.assertTrue( + flushThreads.size() >= 1, + "At least one flush thread expected, got: " + flushThreads); + + // 4. No row loss: flushed row totals <= written row totals for each table + for (String tableId : MultiTableFlushTestSinkWriter.WRITE_COUNTS.keySet()) { + AtomicLong written = MultiTableFlushTestSinkWriter.WRITE_COUNTS.get(tableId); + AtomicLong flushed = MultiTableFlushTestSinkWriter.FLUSHED_ROW_TOTALS.get(tableId); + Assertions.assertNotNull( + flushed, "Table " + tableId + " should have flushed row totals"); + Assertions.assertTrue( + flushed.get() <= written.get(), + "Table " + + tableId + + " flushed rows (" + + flushed.get() + + ") should not exceed written rows (" + + written.get() + + ")"); + Assertions.assertTrue( + flushed.get() > 0, + "Table " + tableId + " should have flushed at least some rows"); + } + + jobProxy.cancelJob(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushTestSink.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushTestSink.java new file mode 100644 index 000000000000..35c948e69b87 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushTestSink.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e.timerflush; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +public class TimerFlushTestSink extends AbstractSimpleSink { + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) { + return new TimerFlushTestSinkWriter(context); + } + + @Override + public String getPluginName() { + return TimerFlushTestSinkFactory.IDENTIFIER; + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushTestSinkFactory.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushTestSinkFactory.java new file mode 100644 index 000000000000..c647c1693681 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushTestSinkFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e.timerflush; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class TimerFlushTestSinkFactory implements TableSinkFactory { + + public static final String IDENTIFIER = "TimerFlushTest"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().build(); + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + return () -> new TimerFlushTestSink(); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushTestSinkWriter.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushTestSinkWriter.java new file mode 100644 index 000000000000..bf150c45e5b8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/timerflush/TimerFlushTestSinkWriter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e.timerflush; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +public class TimerFlushTestSinkWriter extends AbstractSinkWriter { + + public static final ConcurrentLinkedQueue FLUSHED_ROWS = + new ConcurrentLinkedQueue<>(); + public static final AtomicInteger FLUSH_COUNT = new AtomicInteger(0); + + private final List buffer = new ArrayList<>(); + + public TimerFlushTestSinkWriter(SinkWriter.Context context) { + context.registerFlushAction(this::flush); + } + + @Override + public void write(SeaTunnelRow element) { + buffer.add(element); + } + + private void flush() { + List snapshot = new ArrayList<>(buffer); + buffer.clear(); + FLUSHED_ROWS.addAll(snapshot); + FLUSH_COUNT.incrementAndGet(); + } + + @Override + public void close() {} + + public static void reset() { + FLUSHED_ROWS.clear(); + FLUSH_COUNT.set(0); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/timer_flush_disabled.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/timer_flush_disabled.conf new file mode 100644 index 000000000000..f1407f03daea --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/timer_flush_disabled.conf @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + job.mode = "STREAMING" + checkpoint.interval = 300000 + # sink.flush.interval is not set — timer flush is disabled +} + +source { + FakeSource { + row.num = 100 + split.num = 1 + parallelism = 1 + schema = { + fields { + id = int + name = string + } + } + } +} + +sink { + TimerFlushTest {} +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/timer_flush_enabled.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/timer_flush_enabled.conf new file mode 100644 index 000000000000..b99a711e39c3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/timer_flush_enabled.conf @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + job.mode = "STREAMING" + checkpoint.interval = 300000 + sink.flush.interval = 200 +} + +source { + FakeSource { + row.num = 10000 + split.num = 1 + parallelism = 1 + schema = { + fields { + id = int + name = string + } + } + } +} + +sink { + TimerFlushTest {} +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/timer_flush_multi_table.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/timer_flush_multi_table.conf new file mode 100644 index 000000000000..a8396f5c34d5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/timer_flush_multi_table.conf @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + job.mode = "STREAMING" + parallelism = 2 + checkpoint.interval = 300000 + sink.flush.interval = 100 +} + +source { + FakeSource { + tables_configs = [ + { + row.num = 10000 + schema = { + table = "db1.table_a" + columns = [ + { name = id, type = int } + { name = val, type = string } + ] + } + }, + { + row.num = 10000 + schema = { + table = "db1.table_b" + columns = [ + { name = id, type = int } + { name = val, type = string } + ] + } + }, + { + row.num = 10000 + schema = { + table = "db1.table_c" + columns = [ + { name = id, type = int } + { name = val, type = string } + ] + } + } + ] + } +} + +sink { + MultiTableFlushTest {} +} diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index 5edb6f83f8ae..361aad1f33e6 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -82,6 +82,9 @@ public class EngineConfig { private QueueType queueType = ServerConfigOptions.WorkerServerConfigOptions.QUEUE_TYPE.defaultValue(); + + private int timerFlushPoolSize = + ServerConfigOptions.WorkerServerConfigOptions.TIMER_FLUSH_POOL_SIZE.defaultValue(); private int historyJobExpireMinutes = ServerConfigOptions.MasterServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.defaultValue(); @@ -177,4 +180,12 @@ public EngineConfig setEventReportHttpHeaders(Map eventReportHtt this.eventReportHttpHeaders = eventReportHttpHeaders; return this; } + + public void setTimerFlushPoolSize(int timerFlushPoolSize) { + checkPositive( + timerFlushPoolSize, + ServerConfigOptions.WorkerServerConfigOptions.TIMER_FLUSH_POOL_SIZE.key() + + " must be > 0"); + this.timerFlushPoolSize = timerFlushPoolSize; + } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index d6aec92f99fa..631570651453 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -206,6 +206,14 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { .key() .equals(name)) { engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node)); + } else if (ServerConfigOptions.WorkerServerConfigOptions.TIMER_FLUSH_POOL_SIZE + .key() + .equals(name)) { + engineConfig.setTimerFlushPoolSize( + getIntegerValue( + ServerConfigOptions.WorkerServerConfigOptions.TIMER_FLUSH_POOL_SIZE + .key(), + getTextContent(node))); } else if (ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT .key() .equals(name)) { diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index d4dd2060237f..2cd8677f8236 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -432,5 +432,11 @@ public static class WorkerServerConfigOptions { // The options for slot end ///////////////////////////////////////////////// + public static final Option TIMER_FLUSH_POOL_SIZE = + Options.key("timer-flush-pool-size") + .intType() + .defaultValue(1) + .withDescription( + "The number of threads in the timer flush worker pool used to inject FlushSignals into the pipeline."); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 593ba3bd1d1c..14094581b1e3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.common.metrics.MetricTags; import org.apache.seatunnel.api.event.Event; import org.apache.seatunnel.api.tracing.MDCExecutorService; +import org.apache.seatunnel.api.tracing.MDCScheduledExecutorService; import org.apache.seatunnel.api.tracing.MDCTracer; import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.common.utils.StringFormatUtils; @@ -91,6 +92,8 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -137,10 +140,16 @@ public class TaskExecutionService implements DynamicMetricsProvider { private final ConcurrentMap> cancellationFutures = new ConcurrentHashMap<>(); + + private final ConcurrentMap>> + timerFlushFutures = new ConcurrentHashMap<>(); + private final SeaTunnelConfig seaTunnelConfig; private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledThreadPoolExecutor timerFlushWorker; + private final ServerConnectorPackageClient serverConnectorPackageClient; private final EventService eventService; @@ -172,6 +181,12 @@ public TaskExecutionService( new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig); this.eventService = eventService; + + int timerPoolSize = seaTunnelConfig.getEngineConfig().getTimerFlushPoolSize(); + timerFlushWorker = + new ScheduledThreadPoolExecutor(timerPoolSize, new TimerFlushThreadFactory()); + timerFlushWorker.setRemoveOnCancelPolicy(true); + timerFlushWorker.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } public void start() { @@ -182,6 +197,7 @@ public void shutdown() { isRunning = false; executorService.shutdownNow(); scheduledExecutorService.shutdown(); + timerFlushWorker.shutdown(); } public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) { @@ -657,6 +673,96 @@ public void printTaskExecutionRuntimeInfo() { taskCount)); } } + /** + * Register or replace a periodic timer-flush task for one source subtask. + * + *

If a timer already exists for the same {@link TaskLocation}, cancel it first. The task is + * scheduled with fixed delay on {@code timerFlushWorker} and stored in {@code + * timerFlushFutures}. + * + * @param taskLocation source subtask location (map key) + * @param callback flush callback to run on each tick + * @param intervalMs flush interval in milliseconds, must be > 0 + * @return scheduled future for later cancellation + * @throws IllegalArgumentException if intervalMs <= 0 + */ + public ScheduledFuture registerTimerFlushTask( + TaskLocation taskLocation, Runnable callback, long intervalMs) { + if (intervalMs <= 0) { + throw new IllegalArgumentException("intervalMs must be positive, got: " + intervalMs); + } + TaskGroupLocation groupLocation = taskLocation.getTaskGroupLocation(); + ConcurrentMap> groupFutures = + timerFlushFutures.computeIfAbsent(groupLocation, k -> new ConcurrentHashMap<>()); + + ScheduledFuture existing = groupFutures.remove(taskLocation); + if (existing != null && !existing.isDone()) { + existing.cancel(false); + } + + MDCScheduledExecutorService mdcTimerFlushWorker = MDCTracer.tracing(timerFlushWorker); + Runnable namedCallback = new NamedTaskWrapper(callback, "TimerFlush-" + taskLocation); + ScheduledFuture future = + mdcTimerFlushWorker.scheduleWithFixedDelay( + namedCallback, intervalMs, intervalMs, TimeUnit.MILLISECONDS); + groupFutures.put(taskLocation, future); + logger.info( + String.format( + "Registered timer-flush task for %s, intervalMs=%d", + taskLocation, intervalMs)); + return future; + } + + /** + * Cancel and remove the timer-flush task for one source subtask. + * + *

No-op if the task group or task entry does not exist. If the task-group bucket becomes + * empty, remove the bucket as well. + * + * @param taskLocation source subtask location + */ + public void closeTimerFlushTask(TaskLocation taskLocation) { + TaskGroupLocation groupLocation = taskLocation.getTaskGroupLocation(); + ConcurrentMap> groupFutures = + timerFlushFutures.get(groupLocation); + if (groupFutures == null) { + return; + } + ScheduledFuture future = groupFutures.remove(taskLocation); + if (future != null && !future.isDone()) { + future.cancel(false); + } + if (groupFutures.isEmpty()) { + timerFlushFutures.remove(groupLocation, groupFutures); + } + logger.info(String.format("Closed timer-flush task for %s", taskLocation)); + } + + /** + * Cancel and remove all timer-flush tasks in one task group. + * + *

No-op if the group has no registered timers. + * + * @param taskGroupLocation task group location + */ + private void cancelTimerFlushForTaskGroup(TaskGroupLocation taskGroupLocation) { + ConcurrentMap> groupFutures = + timerFlushFutures.remove(taskGroupLocation); + if (groupFutures == null) { + return; + } + groupFutures + .values() + .forEach( + f -> { + if (!f.isDone()) { + f.cancel(false); + } + }); + logger.info( + String.format( + "Cancelled all timer-flush tasks for task group %s", taskGroupLocation)); + } public void reportEvent(Event e) { eventService.reportEvent(e); @@ -923,6 +1029,7 @@ private void cancelAllTask(TaskGroupLocation taskGroupLocation) { // ignore } cancelAsyncFunction(taskGroupLocation); + cancelTimerFlushForTaskGroup(taskGroupLocation); } private void cancelAsyncFunction(TaskGroupLocation taskGroupLocation) { @@ -955,6 +1062,11 @@ void taskDone(Task task) { } catch (Throwable t) { logger.severe("cancel async function failed", t); } + try { + cancelTimerFlushForTaskGroup(taskGroupLocation); + } catch (Throwable t) { + logger.severe("cancel timer-flush tasks failed", t); + } try { updateMetricsContextInImap(); } catch (Throwable t) { @@ -1007,6 +1119,19 @@ public ServerConnectorPackageClient getServerConnectorPackageClient() { return serverConnectorPackageClient; } + private final class TimerFlushThreadFactory implements ThreadFactory { + private final AtomicInteger seq = new AtomicInteger(); + + @Override + public Thread newThread(@NonNull Runnable r) { + return new Thread( + r, + String.format( + "hz.%s.seaTunnel.timer-flush-%d", + hzInstanceName, seq.getAndIncrement())); + } + } + public static class NamedTaskWrapper implements Runnable { private final Runnable task; private final String threadName; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java index 975810693d9f..9a1593914460 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.engine.server.serializable; +import org.apache.seatunnel.api.signal.FlushSignal; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -33,7 +34,8 @@ public class RecordSerializer implements StreamSerializer { enum RecordDataType { CHECKPOINT_BARRIER, - SEATUNNEL_ROW; + SEATUNNEL_ROW, + FLUSH_SIGNAL } @Override @@ -56,6 +58,12 @@ public void write(ObjectDataOutput out, Record record) throws IOException { for (Object field : row.getFields()) { out.writeObject(field); } + } else if (data instanceof FlushSignal) { + FlushSignal row = (FlushSignal) data; + out.writeByte(RecordDataType.FLUSH_SIGNAL.ordinal()); + out.writeLong(row.getJobId()); + out.writeLong(row.getTaskId()); + out.writeLong(row.getCreatedTime()); } else { throw new UnsupportedEncodingException( "Unsupported serialize class: " + data.getClass()); @@ -74,6 +82,8 @@ public Record read(ObjectDataInput in) throws IOException { CheckpointType.fromName(in.readString()), in.readObject(), in.readObject()); + } else if (dataType == RecordDataType.FLUSH_SIGNAL.ordinal()) { + data = new FlushSignal(in.readLong(), in.readLong(), in.readLong()); } else if (dataType == RecordDataType.SEATUNNEL_ROW.ordinal()) { String tableId = in.readString(); byte rowKind = in.readByte(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java index 3b5c24fe1524..f35b993e6673 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java @@ -216,6 +216,9 @@ protected void stateProcess() throws Exception { break; case STARTING: currState = RUNNING; + for (FlowLifeCycle cycle : allCycles) { + cycle.hook(); + } break; case RUNNING: collect(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index ba02ec8b72af..df6c4eead7ea 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.engine.server.task; import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; @@ -45,6 +46,8 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.options.EnvCommonOptions.SINK_FLUSH_INTERVAL; + public class SourceSeaTunnelTask extends SeaTunnelTask { private static final ILogger LOGGER = Logger.getLogger(SourceSeaTunnelTask.class); @@ -112,6 +115,15 @@ public void init() throws Exception { SourceConfig config, CompletableFuture completableFuture, MetricsContext metricsContext) { + + long flushIntervalMs = ReadonlyConfig.fromMap(envOption).get(SINK_FLUSH_INTERVAL); + if (flushIntervalMs > 0 && flushIntervalMs < 100) { + LOGGER.warning( + String.format( + "sink.flush.interval=%dms is too small (< 100ms), " + + "this may cause excessive flush overhead.", + flushIntervalMs)); + } return new SourceFlowLifeCycle<>( sourceAction, indexID, @@ -119,7 +131,8 @@ public void init() throws Exception { this, taskLocation, completableFuture, - metricsContext); + metricsContext, + flushIntervalMs); } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java index 431582aa07a9..ff141431cb17 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java @@ -22,6 +22,9 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.event.EventListener; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.common.utils.function.RunnableWithException; + +import java.util.Objects; public class SinkWriterContext implements SinkWriter.Context { @@ -30,6 +33,7 @@ public class SinkWriterContext implements SinkWriter.Context { private final int numberOfParallelSubtasks; private final MetricsContext metricsContext; private final EventListener eventListener; + private transient volatile RunnableWithException flushAction; public SinkWriterContext( int numberOfParallelSubtasks, @@ -64,4 +68,15 @@ public MetricsContext getMetricsContext() { public EventListener getEventListener() { return eventListener; } + + @Override + public void registerFlushAction(RunnableWithException action) { + Objects.requireNonNull(action, "flushAction"); + this.flushAction = action; + } + + @Override + public RunnableWithException getFlushAction() { + return flushAction; + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/FlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/FlowLifeCycle.java index cc30ee62ce72..59ba4db3b10c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/FlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/FlowLifeCycle.java @@ -27,5 +27,7 @@ default void open() throws Exception {} default void close() throws IOException {} + default void hook() throws IOException {} + default void prepareClose() throws IOException {} } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 9eab57e88443..fdfa212f9898 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.event.EventListener; import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.signal.FlushSignal; +import org.apache.seatunnel.api.signal.Signal; import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SinkWriter.Context; @@ -255,6 +257,14 @@ public void received(Record record) { // todo remove deprecated method writer.applySchemaChange(event); } + } else if (record.getData() instanceof Signal) { + if (prepareClose) { + return; + } + Signal signal = (Signal) record.getData(); + if (signal instanceof FlushSignal && writerContext.getFlushAction() != null) { + writerContext.getFlushAction().run(); + } } else { if (prepareClose) { return; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java index 6f473c6c9f48..ecfaf1f9071c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.event.EventListener; import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.signal.FlushSignal; import org.apache.seatunnel.api.source.SourceEvent; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplit; @@ -59,6 +60,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -106,6 +108,10 @@ public class SourceFlowLifeCycle extends ActionFl private final AtomicReference schemaChangePhase = new AtomicReference<>(); + private final long flushIntervalMs; + + private transient volatile ScheduledFuture flushFuture; + public SourceFlowLifeCycle( SourceAction sourceAction, int indexID, @@ -113,13 +119,15 @@ public SourceFlowLifeCycle( SeaTunnelTask runningTask, TaskLocation currentTaskLocation, CompletableFuture completableFuture, - MetricsContext metricsContext) { + MetricsContext metricsContext, + long flushIntervalMs) { super(sourceAction, runningTask, completableFuture); this.sourceAction = sourceAction; this.indexID = indexID; this.enumeratorTaskLocation = enumeratorTaskLocation; this.currentTaskLocation = currentTaskLocation; this.metricsContext = metricsContext; + this.flushIntervalMs = flushIntervalMs; this.eventListener = new JobEventListener(currentTaskLocation, runningTask.getExecutionContext()); } @@ -166,6 +174,28 @@ public void open() throws Exception { register(); } + /** + * Timer callback invoked by the {@code timerFlushWorker} thread pool. + * + *

Acquires the {@code checkpointLock} (the same monitor that {@link #triggerBarrier} uses) + * so that flush signals and barriers are strictly serialized — a FlushSignal either completes + * entirely before a Barrier or queues behind it, never crossing it. + */ + private void onTimerTick() { + if (prepareClose) { + return; + } + try { + FlushSignal flushSignal = + FlushSignal.of(currentTaskLocation.getJobId(), currentTaskLocation.getTaskID()); + log.debug("Broadcasting FlushSignal {} ", flushSignal); + Record flushSignalRecord = new Record<>(flushSignal); + collector.sendRecordToNext(flushSignalRecord); + } catch (Throwable e) { + log.warn("Failed to broadcast FlushSignal from task {}", currentTaskLocation, e); + } + } + private Address getEnumeratorTaskAddress() throws ExecutionException, InterruptedException { return (Address) runningTask @@ -176,9 +206,18 @@ private Address getEnumeratorTaskAddress() throws ExecutionException, Interrupte @Override public void close() throws IOException { - context.getEventListener().onEvent(new ReaderCloseEvent()); - reader.close(); - super.close(); + try { + context.getEventListener().onEvent(new ReaderCloseEvent()); + reader.close(); + super.close(); + } finally { + closeFlushTimer(); + } + } + + @Override + public void hook() throws IOException { + startFlushTimer(); } /** @@ -302,6 +341,38 @@ private void register() { } } + private void startFlushTimer() { + if (flushIntervalMs <= 0) { + return; + } + try { + flushFuture = + runningTask + .getExecutionContext() + .getTaskExecutionService() + .registerTimerFlushTask( + currentTaskLocation, this::onTimerTick, flushIntervalMs); + } catch (Exception e) { + log.warn("Failed to register flush timer for task {}", currentTaskLocation, e); + throw new RuntimeException(e); + } + } + + private void closeFlushTimer() { + if (flushFuture == null) { + return; + } + try { + runningTask + .getExecutionContext() + .getTaskExecutionService() + .closeTimerFlushTask(currentTaskLocation); + } catch (Exception e) { + log.warn("Failed to close flush timer for task {}", currentTaskLocation, e); + } + flushFuture = null; + } + /** * Sends a split request to the remote split enumerator. * diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java index 8eaf1963585f..404279f3ea6a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.engine.server.task.flow; +import org.apache.seatunnel.api.signal.Signal; import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.transform.Collector; @@ -114,6 +115,11 @@ public void received(Record record) { if (event != null) { collector.collect(new Record<>(event)); } + } else if (record.getData() instanceof Signal) { + if (prepareClose) { + return; + } + collector.collect(record); } else { if (prepareClose) { return; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java index 08afa086f642..de1edb40d175 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java @@ -18,16 +18,21 @@ package org.apache.seatunnel.engine.server.task.group.queue; import org.apache.seatunnel.api.common.metrics.Counter; +import org.apache.seatunnel.api.signal.Signal; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.transform.Collector; import org.apache.seatunnel.common.utils.function.ConsumerWithException; +import org.apache.seatunnel.common.utils.function.FunctionWithException; import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier; import org.apache.seatunnel.engine.server.task.record.Barrier; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +@Slf4j public class IntermediateBlockingQueue extends AbstractIntermediateQueue>> { private final Counter intermediateQueueSize; @@ -40,9 +45,16 @@ public IntermediateBlockingQueue( @Override public void received(Record record) { + boolean handled; try { - handleRecord(record, getIntermediateQueue()::put); - intermediateQueueSize.inc(); + if (record.getData() instanceof Signal) { + handled = handleSignalRecord(record, getIntermediateQueue()::offer); + } else { + handled = handleRecord(record, getIntermediateQueue()::put); + } + if (handled) { + intermediateQueueSize.inc(); + } } catch (Exception e) { throw new RuntimeException(e); } @@ -66,7 +78,7 @@ public void close() throws IOException { getIntermediateQueue().clear(); } - private void handleRecord(Record record, ConsumerWithException> consumer) + private boolean handleRecord(Record record, ConsumerWithException> consumer) throws Exception { if (record.getData() instanceof Barrier) { CheckpointBarrier barrier = (CheckpointBarrier) record.getData(); @@ -77,9 +89,28 @@ private void handleRecord(Record record, ConsumerWithException> con consumer.accept(record); } else { if (getIntermediateQueueFlowLifeCycle().getPrepareClose()) { - return; + return false; } consumer.accept(record); } + + return true; + } + + private boolean handleSignalRecord( + Record record, FunctionWithException, Boolean, Exception> function) + throws Exception { + if (getIntermediateQueueFlowLifeCycle().getPrepareClose()) { + return false; + } + boolean result = function.apply(record); + if (!result) { + log.warn( + "Downstream sink consuming too slow, intermediate queue backlogged, " + + "FlushSignal dropped. queueSize={}, remainingCapacity={}", + getIntermediateQueue().size(), + getIntermediateQueue().remainingCapacity()); + } + return result; } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java index e2f3ed69d775..359012a6239b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java @@ -37,7 +37,6 @@ public IntermediateDisruptor(Disruptor queue) { @Override public void received(Record record) { - getIntermediateQueue().getRingBuffer(); RecordEventProducer.onData( record, getIntermediateQueue().getRingBuffer(), diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java index ea47f83a7971..af2dd0e35be1 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java @@ -17,13 +17,16 @@ package org.apache.seatunnel.engine.server.task.group.queue.disruptor; +import org.apache.seatunnel.api.signal.Signal; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier; import org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle; import org.apache.seatunnel.engine.server.task.record.Barrier; import com.lmax.disruptor.RingBuffer; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class RecordEventProducer { public static void onData( @@ -38,12 +41,31 @@ public static void onData( intermediateQueueFlowLifeCycle.getRunningTask().getTaskLocation())) { intermediateQueueFlowLifeCycle.setPrepareClose(true); } + publishRecord(record, ringBuffer); + } else if (record.getData() instanceof Signal) { + if (intermediateQueueFlowLifeCycle.getPrepareClose()) { + return; + } + publishSignalRecord(record, ringBuffer); } else { if (intermediateQueueFlowLifeCycle.getPrepareClose()) { return; } + publishRecord(record, ringBuffer); } + } + + private static void publishSignalRecord(Record record, RingBuffer ringBuffer) { + boolean published = ringBuffer.tryPublishEvent((event, seq) -> event.setRecord(record)); + if (!published) { + log.warn( + "Downstream sink consuming too slow, RingBuffer backlogged, " + + "FlushSignal dropped. bufferSize={}", + ringBuffer.getBufferSize()); + } + } + private static void publishRecord(Record record, RingBuffer ringBuffer) { long sequence = ringBuffer.next(); try { RecordEvent recordEvent = ringBuffer.get(sequence); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java index 5c902bf63384..3b6649d296ed 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java @@ -33,6 +33,7 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; import org.apache.seatunnel.engine.server.execution.TaskGroupType; +import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.execution.TestTask; import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation; @@ -55,6 +56,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -477,4 +479,64 @@ public List buildStopTestTask( } return taskQueue; } + + @Test + public void testRegisterTimerFlushRejectsNonPositiveInterval() { + TaskExecutionService taskExecutionService = server.getTaskExecutionService(); + TaskGroupLocation groupLocation = new TaskGroupLocation(jobId, pipeLineId, 200L); + TaskLocation taskLocation = new TaskLocation(groupLocation, 1L, 1); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> taskExecutionService.registerTimerFlushTask(taskLocation, () -> {}, 0L)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> taskExecutionService.registerTimerFlushTask(taskLocation, () -> {}, -1L)); + } + + @Test + public void testRegisterAndCloseTimerFlushTask() { + TaskExecutionService taskExecutionService = server.getTaskExecutionService(); + TaskGroupLocation groupLocation = new TaskGroupLocation(jobId, pipeLineId, 201L); + TaskLocation taskLocation = new TaskLocation(groupLocation, 1L, 1); + + ScheduledFuture future = + taskExecutionService.registerTimerFlushTask(taskLocation, () -> {}, 1_000L); + Assertions.assertNotNull(future); + Assertions.assertFalse(future.isCancelled()); + + taskExecutionService.closeTimerFlushTask(taskLocation); + Assertions.assertTrue(future.isCancelled()); + + // closing again is idempotent + Assertions.assertDoesNotThrow(() -> taskExecutionService.closeTimerFlushTask(taskLocation)); + } + + @Test + public void testReRegisterTimerFlushCancelsPreviousFuture() { + TaskExecutionService taskExecutionService = server.getTaskExecutionService(); + TaskGroupLocation groupLocation = new TaskGroupLocation(jobId, pipeLineId, 202L); + TaskLocation taskLocation = new TaskLocation(groupLocation, 1L, 1); + + ScheduledFuture first = + taskExecutionService.registerTimerFlushTask(taskLocation, () -> {}, 1_000L); + ScheduledFuture second = + taskExecutionService.registerTimerFlushTask(taskLocation, () -> {}, 2_000L); + + Assertions.assertNotSame(first, second); + Assertions.assertTrue( + first.isCancelled(), "previous future must be cancelled on re-register"); + Assertions.assertFalse(second.isCancelled(), "new future must remain active"); + + taskExecutionService.closeTimerFlushTask(taskLocation); + } + + @Test + public void testCloseTimerFlushOnUnknownLocationIsNoop() { + TaskExecutionService taskExecutionService = server.getTaskExecutionService(); + TaskGroupLocation groupLocation = new TaskGroupLocation(jobId, pipeLineId, 203L); + TaskLocation unknown = new TaskLocation(groupLocation, 1L, 99); + + Assertions.assertDoesNotThrow(() -> taskExecutionService.closeTimerFlushTask(unknown)); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollectorFlushSignalTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollectorFlushSignalTest.java new file mode 100644 index 000000000000..483cfa0753a5 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollectorFlushSignalTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task; + +import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.signal.FlushSignal; +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; +import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +class SeaTunnelSourceCollectorFlushSignalTest { + + @Test + void sendFlushSignalBroadcastsExactlyOneRecordPerOutput() throws Exception { + RecordingOutput a = new RecordingOutput(); + RecordingOutput b = new RecordingOutput(); + SeaTunnelSourceCollector collector = newCollector(a, b); + + FlushSignal flushSignal = FlushSignal.of(7L, 42L); + Record flushSignalRecord = new Record<>(flushSignal); + collector.sendRecordToNext(flushSignalRecord); + + Assertions.assertEquals(1, a.records.size()); + Assertions.assertEquals(1, b.records.size()); + Assertions.assertTrue(a.records.get(0).getData() instanceof FlushSignal); + FlushSignal signal = (FlushSignal) a.records.get(0).getData(); + Assertions.assertEquals(7L, signal.getJobId()); + Assertions.assertEquals(42L, signal.getTaskId()); + Assertions.assertTrue( + signal.getCreatedTime() > 0L, "createdTime should be populated at construction"); + } + + @Test + void sendFlushSignalIsNoopWhenOutputsEmpty() throws Exception { + SeaTunnelSourceCollector collector = newCollector(); + + Assertions.assertDoesNotThrow( + () -> { + FlushSignal flushSignal = FlushSignal.of(7L, 42L); + Record flushSignalRecord = new Record<>(flushSignal); + collector.sendRecordToNext(flushSignalRecord); + }); + } + + private static SeaTunnelSourceCollector newCollector(RecordingOutput... outputs) { + List>> outputList = new ArrayList<>(); + Collections.addAll(outputList, outputs); + return new SeaTunnelSourceCollector<>( + new Object(), + outputList, + Mockito.mock(MetricsContext.class, Mockito.RETURNS_DEEP_STUBS), + FlowControlStrategy.builder().build(), + new SeaTunnelRowType( + new String[0], + new org.apache.seatunnel.api.table.type.SeaTunnelDataType[0]), + Collections.emptyList()); + } + + private static class RecordingOutput implements OneInputFlowLifeCycle> { + private final List> records = new ArrayList<>(); + + @Override + public void received(Record record) { + records.add(record); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SeaTunnelTaskStateTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SeaTunnelTaskStateTest.java new file mode 100644 index 000000000000..af23cedbec6a --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SeaTunnelTaskStateTest.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task; + +import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture; +import org.apache.seatunnel.engine.server.TaskExecutionService; +import org.apache.seatunnel.engine.server.execution.TaskExecutionContext; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.task.flow.AbstractFlowLifeCycle; +import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle; +import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle; +import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledFuture; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SeaTunnelTaskStateTest { + + private SeaTunnelTask task; + private TaskExecutionService mockTaskExecutionService; + private SourceFlowLifeCycle sourceLifeCycle; + + @BeforeEach + void setUp() throws Exception { + task = mock(SeaTunnelTask.class, Mockito.CALLS_REAL_METHODS); + + mockTaskExecutionService = mock(TaskExecutionService.class); + when(mockTaskExecutionService.registerTimerFlushTask(any(), any(), anyLong())) + .thenReturn(mock(ScheduledFuture.class)); + + TaskExecutionContext mockContext = mock(TaskExecutionContext.class); + when(mockContext.getTaskExecutionService()).thenReturn(mockTaskExecutionService); + + sourceLifeCycle = mock(SourceFlowLifeCycle.class); + doCallRealMethod().when(sourceLifeCycle).hook(); + setField(AbstractFlowLifeCycle.class, "runningTask", sourceLifeCycle, task); + setField(SourceFlowLifeCycle.class, "flushIntervalMs", sourceLifeCycle, 200L); + setField( + SourceFlowLifeCycle.class, + "currentTaskLocation", + sourceLifeCycle, + mock(TaskLocation.class)); + + when(task.getExecutionContext()).thenReturn(mockContext); + + List cycles = new ArrayList<>(); + cycles.add(sourceLifeCycle); + + setField(AbstractTask.class, "progress", task, new Progress()); + setField(AbstractTask.class, "startCalled", task, false); + setField(AbstractTask.class, "closeCalled", task, false); + setField(AbstractTask.class, "prepareCloseStatus", task, false); + + CompletableFuture restoreComplete = new CompletableFuture<>(); + restoreComplete.complete(null); + setField(AbstractTask.class, "restoreComplete", task, restoreComplete); + + setField(SeaTunnelTask.class, "currState", task, SeaTunnelTaskState.INIT); + setField(SeaTunnelTask.class, "allCycles", task, cycles); + + doNothing().when(task).reportTaskStatus(any()); + doNothing().when(task).collect(); + doNothing().when(task).close(); + } + + // ==================== State Machine Transition Tests ==================== + + @Test + void testFullStateMachineTransition() throws Exception { + Assertions.assertEquals(SeaTunnelTaskState.INIT, getState()); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.WAITING_RESTORE, getState()); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.READY_START, getState()); + + task.startCall(); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.STARTING, getState()); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.RUNNING, getState()); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.RUNNING, getState()); + + setField(AbstractTask.class, "prepareCloseStatus", task, true); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.PREPARE_CLOSE, getState()); + + setField(AbstractTask.class, "closeCalled", task, true); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.CLOSED, getState()); + } + + @Test + void testInitTransitionsToWaitingRestore() throws Exception { + Assertions.assertEquals(SeaTunnelTaskState.INIT, getState()); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.WAITING_RESTORE, getState()); + verify(task, times(1)).reportTaskStatus(SeaTunnelTaskState.WAITING_RESTORE); + } + + @Test + void testWaitingRestoreBlocksWhenRestoreNotDone() throws Exception { + CompletableFuture pendingRestore = new CompletableFuture<>(); + setField(AbstractTask.class, "restoreComplete", task, pendingRestore); + + advanceTo(SeaTunnelTaskState.WAITING_RESTORE); + + task.stateProcess(); + Assertions.assertEquals( + SeaTunnelTaskState.WAITING_RESTORE, + getState(), + "Should stay in WAITING_RESTORE when restore is not complete"); + verify(sourceLifeCycle, never()).open(); + } + + @Test + void testWaitingRestoreToReadyStartCallsOpen() throws Exception { + advanceTo(SeaTunnelTaskState.WAITING_RESTORE); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.READY_START, getState()); + verify(sourceLifeCycle, times(1)).open(); + verify(task, times(1)).reportTaskStatus(SeaTunnelTaskState.READY_START); + } + + @Test + void testReadyStartBlocksUntilStartCalled() throws Exception { + advanceTo(SeaTunnelTaskState.READY_START); + + task.stateProcess(); + Assertions.assertEquals( + SeaTunnelTaskState.READY_START, + getState(), + "Should stay in READY_START when startCalled is false"); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.READY_START, getState()); + } + + @Test + void testReadyStartToStartingAfterStartCall() throws Exception { + advanceTo(SeaTunnelTaskState.READY_START); + + task.startCall(); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.STARTING, getState()); + } + + @Test + void testStartingToRunningCallsHook() throws Exception { + advanceTo(SeaTunnelTaskState.STARTING); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.RUNNING, getState()); + verify(sourceLifeCycle, times(1)).hook(); + } + + @Test + void testRunningCallsCollect() throws Exception { + advanceTo(SeaTunnelTaskState.RUNNING); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.RUNNING, getState()); + verify(task, times(1)).collect(); + } + + @Test + void testRunningStaysWhenPrepareCloseIsFalse() throws Exception { + advanceTo(SeaTunnelTaskState.RUNNING); + + task.stateProcess(); + task.stateProcess(); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.RUNNING, getState()); + verify(task, times(3)).collect(); + } + + @Test + void testRunningToPrepareCloseWhenFlagSet() throws Exception { + advanceTo(SeaTunnelTaskState.RUNNING); + + setField(AbstractTask.class, "prepareCloseStatus", task, true); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.PREPARE_CLOSE, getState()); + } + + @Test + void testPrepareCloseBlocksUntilCloseCalled() throws Exception { + advanceTo(SeaTunnelTaskState.RUNNING); + setField(AbstractTask.class, "prepareCloseStatus", task, true); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.PREPARE_CLOSE, getState()); + + task.stateProcess(); + Assertions.assertEquals( + SeaTunnelTaskState.PREPARE_CLOSE, + getState(), + "Should stay in PREPARE_CLOSE when closeCalled is false"); + } + + @Test + void testPrepareCloseToClosedWhenCloseCalledSet() throws Exception { + advanceTo(SeaTunnelTaskState.RUNNING); + setField(AbstractTask.class, "prepareCloseStatus", task, true); + task.stateProcess(); + + setField(AbstractTask.class, "closeCalled", task, true); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.CLOSED, getState()); + } + + @Test + void testClosedCallsCloseAndMarksDone() throws Exception { + advanceTo(SeaTunnelTaskState.RUNNING); + setField(AbstractTask.class, "prepareCloseStatus", task, true); + task.stateProcess(); + setField(AbstractTask.class, "closeCalled", task, true); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.CLOSED, getState()); + + Progress progress = getProgress(); + task.stateProcess(); + Assertions.assertTrue(progress.toState().isDone(), "Progress should be marked done"); + verify(task, times(1)).close(); + } + + @Test + void testCancellingToCancel() throws Exception { + setField(SeaTunnelTask.class, "currState", task, SeaTunnelTaskState.CANCELLING); + + Progress progress = getProgress(); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.CANCELED, getState()); + Assertions.assertTrue(progress.toState().isDone(), "Progress should be marked done"); + verify(task, times(1)).close(); + } + + // ==================== Timer Registration Tests ==================== + + /** + * Verifies that {@code SourceFlowLifeCycle.hook()} → {@code startFlushTimer()} → {@code + * registerTimerFlushTask()} is called exactly once, and only during the STARTING → RUNNING + * transition. + */ + @Test + void testTimerRegistrationOnlyAtStartingToRunning() throws Exception { + // INIT → WAITING_RESTORE + task.stateProcess(); + verify(mockTaskExecutionService, never()).registerTimerFlushTask(any(), any(), anyLong()); + + // WAITING_RESTORE → READY_START + task.stateProcess(); + verify(mockTaskExecutionService, never()).registerTimerFlushTask(any(), any(), anyLong()); + + // READY_START stays (startCalled = false) + task.stateProcess(); + verify(mockTaskExecutionService, never()).registerTimerFlushTask(any(), any(), anyLong()); + + task.startCall(); + // READY_START → STARTING + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.STARTING, getState()); + verify(mockTaskExecutionService, never()).registerTimerFlushTask(any(), any(), anyLong()); + + // STARTING → RUNNING (hook → startFlushTimer → registerTimerFlushTask) + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.RUNNING, getState()); + verify(mockTaskExecutionService, times(1)).registerTimerFlushTask(any(), any(), anyLong()); + + // Multiple RUNNING iterations — registerTimerFlushTask stays at 1 + task.stateProcess(); + task.stateProcess(); + task.stateProcess(); + verify(mockTaskExecutionService, times(1)).registerTimerFlushTask(any(), any(), anyLong()); + } + + @Test + void testTimerNotRegisteredBeforeRunning() throws Exception { + advanceTo(SeaTunnelTaskState.READY_START); + + task.stateProcess(); + task.stateProcess(); + + Assertions.assertEquals(SeaTunnelTaskState.READY_START, getState()); + verify(mockTaskExecutionService, never()).registerTimerFlushTask(any(), any(), anyLong()); + } + + @Test + void testTimerNotRegisteredWhenFlushIntervalIsZero() throws Exception { + setField(SourceFlowLifeCycle.class, "flushIntervalMs", sourceLifeCycle, 0L); + + advanceTo(SeaTunnelTaskState.STARTING); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.RUNNING, getState()); + verify(mockTaskExecutionService, never()).registerTimerFlushTask(any(), any(), anyLong()); + } + + @Test + void testTimerNotRegisteredWhenFlushIntervalIsNegative() throws Exception { + setField(SourceFlowLifeCycle.class, "flushIntervalMs", sourceLifeCycle, -1L); + + advanceTo(SeaTunnelTaskState.STARTING); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.RUNNING, getState()); + verify(mockTaskExecutionService, never()).registerTimerFlushTask(any(), any(), anyLong()); + } + + @Test + void testTimerRegisteredWithCorrectInterval() throws Exception { + long expectedInterval = 500L; + setField(SourceFlowLifeCycle.class, "flushIntervalMs", sourceLifeCycle, expectedInterval); + + advanceTo(SeaTunnelTaskState.STARTING); + task.stateProcess(); + + verify(mockTaskExecutionService, times(1)) + .registerTimerFlushTask( + any(TaskLocation.class), any(Runnable.class), Mockito.eq(expectedInterval)); + } + + // ==================== reportTaskStatus Order Tests ==================== + + @Test + void testReportTaskStatusCalledInOrder() throws Exception { + InOrder ordered = inOrder(task); + + // INIT → WAITING_RESTORE + task.stateProcess(); + ordered.verify(task).reportTaskStatus(SeaTunnelTaskState.WAITING_RESTORE); + + // WAITING_RESTORE → READY_START + task.stateProcess(); + ordered.verify(task).reportTaskStatus(SeaTunnelTaskState.READY_START); + } + + @Test + void testReportStatusNotCalledOnReadyStartToStarting() throws Exception { + advanceTo(SeaTunnelTaskState.READY_START); + Mockito.clearInvocations(task); + + task.startCall(); + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.STARTING, getState()); + verify(task, never()).reportTaskStatus(any()); + } + + @Test + void testReportStatusNotCalledOnStartingToRunning() throws Exception { + advanceTo(SeaTunnelTaskState.STARTING); + Mockito.clearInvocations(task); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.RUNNING, getState()); + verify(task, never()).reportTaskStatus(any()); + } + + // ==================== open() / hook() Ordering Tests ==================== + + @Test + void testOpenCalledBeforeHook() throws Exception { + InOrder ordered = inOrder(sourceLifeCycle); + + advanceTo(SeaTunnelTaskState.RUNNING); + + ordered.verify(sourceLifeCycle).open(); + ordered.verify(sourceLifeCycle).hook(); + } + + @Test + void testHookNotCalledDuringOpen() throws Exception { + advanceTo(SeaTunnelTaskState.WAITING_RESTORE); + + task.stateProcess(); + Assertions.assertEquals(SeaTunnelTaskState.READY_START, getState()); + + verify(sourceLifeCycle, times(1)).open(); + verify(sourceLifeCycle, never()).hook(); + } + + // ==================== Multiple FlowLifeCycle Tests ==================== + + @Test + void testMultipleCyclesAllReceiveOpenAndHook() throws Exception { + FlowLifeCycle secondCycle = mock(FlowLifeCycle.class); + List cycles = new ArrayList<>(); + cycles.add(sourceLifeCycle); + cycles.add(secondCycle); + setField(SeaTunnelTask.class, "allCycles", task, cycles); + + advanceTo(SeaTunnelTaskState.RUNNING); + + verify(sourceLifeCycle, times(1)).open(); + verify(secondCycle, times(1)).open(); + verify(sourceLifeCycle, times(1)).hook(); + verify(secondCycle, times(1)).hook(); + } + + private void advanceTo(SeaTunnelTaskState target) throws Exception { + if (target == SeaTunnelTaskState.INIT) { + return; + } + + // INIT → WAITING_RESTORE + task.stateProcess(); + if (target == SeaTunnelTaskState.WAITING_RESTORE) { + return; + } + + // WAITING_RESTORE → READY_START + task.stateProcess(); + if (target == SeaTunnelTaskState.READY_START) { + return; + } + + // READY_START → STARTING + task.startCall(); + task.stateProcess(); + if (target == SeaTunnelTaskState.STARTING) { + return; + } + + // STARTING → RUNNING + task.stateProcess(); + if (target == SeaTunnelTaskState.RUNNING) { + return; + } + + throw new IllegalArgumentException("advanceTo does not support state: " + target); + } + + private SeaTunnelTaskState getState() throws Exception { + Field f = SeaTunnelTask.class.getDeclaredField("currState"); + f.setAccessible(true); + return (SeaTunnelTaskState) f.get(task); + } + + private Progress getProgress() throws Exception { + Field f = AbstractTask.class.getDeclaredField("progress"); + f.setAccessible(true); + return (Progress) f.get(task); + } + + private static void setField(Class clazz, String name, Object target, Object value) + throws Exception { + Field f = clazz.getDeclaredField(name); + f.setAccessible(true); + f.set(target, value); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContextTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContextTest.java new file mode 100644 index 000000000000..e0c78d396e68 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContextTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.context; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +class SinkWriterContextTest { + + @Test + void flushActionIsNullByDefault() { + SinkWriterContext ctx = new SinkWriterContext(1, 0, null, null); + Assertions.assertNull( + ctx.getFlushAction(), + "writer that did not opt in should expose a null flush action"); + } + + @Test + void registerFlushActionStoresLastRegistered() throws Exception { + SinkWriterContext ctx = new SinkWriterContext(2, 0, null, null); + AtomicInteger first = new AtomicInteger(); + AtomicInteger second = new AtomicInteger(); + + ctx.registerFlushAction(first::incrementAndGet); + Assertions.assertSame(ctx.getFlushAction(), ctx.getFlushAction()); + ctx.getFlushAction().run(); + Assertions.assertEquals(1, first.get()); + + ctx.registerFlushAction(second::incrementAndGet); + ctx.getFlushAction().run(); + Assertions.assertEquals(1, first.get(), "old action must not be invoked after replace"); + Assertions.assertEquals(1, second.get()); + } + + @Test + void registerFlushActionRejectsNull() { + SinkWriterContext ctx = new SinkWriterContext(1, 0, null, null); + Assertions.assertThrows(NullPointerException.class, () -> ctx.registerFlushAction(null)); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueueSignalTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueueSignalTest.java new file mode 100644 index 000000000000..00c551fc392c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueueSignalTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.group.queue; + +import org.apache.seatunnel.api.common.metrics.ThreadSafeCounter; +import org.apache.seatunnel.api.signal.FlushSignal; +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.Collector; +import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; +import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.task.SeaTunnelTask; +import org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +class IntermediateBlockingQueueSignalTest { + + private static final TaskLocation TASK_LOCATION = + new TaskLocation(new TaskGroupLocation(1L, 1, 1L), 1L, 1); + + private BlockingQueue> backing; + private ThreadSafeCounter sizeCounter; + private IntermediateBlockingQueue queue; + private IntermediateQueueFlowLifeCycle flow; + + @BeforeEach + void setUp() { + backing = new ArrayBlockingQueue<>(4); + sizeCounter = new ThreadSafeCounter("intermediateQueueSize"); + queue = new IntermediateBlockingQueue(backing, sizeCounter); + + SeaTunnelTask task = Mockito.mock(SeaTunnelTask.class); + Mockito.when(task.getTaskLocation()).thenReturn(TASK_LOCATION); + + flow = Mockito.mock(IntermediateQueueFlowLifeCycle.class); + final boolean[] prepareClose = {false}; + Mockito.when(flow.getPrepareClose()).thenAnswer(inv -> prepareClose[0]); + Mockito.doAnswer( + inv -> { + prepareClose[0] = inv.getArgument(0); + return null; + }) + .when(flow) + .setPrepareClose(Mockito.anyBoolean()); + + queue.setRunningTask(task); + queue.setIntermediateQueueFlowLifeCycle(flow); + } + + @Test + void signalIsEnqueuedAndDeliveredToDownstream() throws Exception { + queue.received(new Record<>(FlushSignal.of(1L, 42L))); + + Assertions.assertEquals(1, backing.size()); + Assertions.assertEquals(1L, sizeCounter.getCount()); + + RecordingCollector downstream = new RecordingCollector(); + queue.collect(downstream); + + Assertions.assertEquals(0, backing.size(), "queue should be drained after collect"); + Assertions.assertEquals( + 0L, sizeCounter.getCount(), "size counter should return to zero after drain"); + Assertions.assertEquals(1, downstream.collected.size(), "signal must reach downstream"); + Assertions.assertTrue(downstream.collected.get(0).getData() instanceof FlushSignal); + } + + @Test + void signalIsDroppedWhenQueueIsFullAndCounterStaysAccurate() throws Exception { + for (int i = 0; i < 4; i++) { + backing.add(new Record<>(new SeaTunnelRow(new Object[] {i}))); + sizeCounter.inc(); + } + Assertions.assertEquals(4, backing.size()); + Assertions.assertEquals(4L, sizeCounter.getCount()); + + queue.received(new Record<>(FlushSignal.of(1L, 42L))); + + Assertions.assertEquals( + 4, backing.size(), "signal must not be enqueued when queue is full"); + Assertions.assertEquals( + 4L, sizeCounter.getCount(), "size counter must not be incremented on drop"); + } + + @Test + void signalIsDroppedInPrepareCloseWithoutMetricInc() throws Exception { + flow.setPrepareClose(true); + + queue.received(new Record<>(FlushSignal.of(1L, 42L))); + + Assertions.assertTrue(backing.isEmpty(), "signal must be dropped in prepareClose"); + Assertions.assertEquals(0L, sizeCounter.getCount()); + } + + @Test + void dataRecordFollowsBlockingPutPath() throws Exception { + queue.received(new Record<>(new SeaTunnelRow(new Object[] {"v"}))); + + Assertions.assertEquals(1, backing.size()); + Assertions.assertEquals(1L, sizeCounter.getCount()); + + RecordingCollector downstream = new RecordingCollector(); + queue.collect(downstream); + + Assertions.assertEquals(1, downstream.collected.size()); + Assertions.assertEquals(0L, sizeCounter.getCount()); + } + + @Test + void prepareCloseBarrierDrainsSubsequentDataRecords() throws Exception { + CheckpointBarrier closeBarrier = + new CheckpointBarrier( + 1L, + System.currentTimeMillis(), + CheckpointType.COMPLETED_POINT_TYPE, + Collections.emptySet(), + Collections.emptySet()); + queue.received(new Record<>(closeBarrier)); + Mockito.verify(flow).setPrepareClose(true); + + queue.received(new Record<>(new SeaTunnelRow(new Object[] {"after-close"}))); + + Assertions.assertEquals(1, backing.size(), "only the barrier should remain in queue"); + Assertions.assertEquals(1L, sizeCounter.getCount()); + } + + private static class RecordingCollector implements Collector> { + private final List> collected = new ArrayList<>(); + + @Override + public void collect(Record record) { + collected.add(record); + } + + @Override + public void close() {} + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducerTest.java new file mode 100644 index 000000000000..fa03459d6077 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducerTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.group.queue.disruptor; + +import org.apache.seatunnel.api.signal.FlushSignal; +import org.apache.seatunnel.api.table.type.Record; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; +import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier; +import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.task.SeaTunnelTask; +import org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.YieldingWaitStrategy; +import com.lmax.disruptor.dsl.ProducerType; + +import java.util.Collections; + +class RecordEventProducerTest { + + private static final TaskLocation TASK_LOCATION = + new TaskLocation(new TaskGroupLocation(1L, 1, 1L), 1L, 1); + + private RingBuffer ringBuffer; + + @SuppressWarnings("rawtypes") + private IntermediateQueueFlowLifeCycle flow; + + @BeforeEach + void setUp() { + ringBuffer = + RingBuffer.create( + ProducerType.SINGLE, RecordEvent::new, 4, new YieldingWaitStrategy()); + + SeaTunnelTask task = Mockito.mock(SeaTunnelTask.class); + Mockito.when(task.getTaskLocation()).thenReturn(TASK_LOCATION); + + flow = Mockito.mock(IntermediateQueueFlowLifeCycle.class); + final boolean[] prepareClose = {false}; + Mockito.when(flow.getRunningTask()).thenReturn(task); + Mockito.when(flow.getPrepareClose()).thenAnswer(inv -> prepareClose[0]); + Mockito.doAnswer( + inv -> { + prepareClose[0] = inv.getArgument(0); + return null; + }) + .when(flow) + .setPrepareClose(Mockito.anyBoolean()); + } + + @Test + void signalIsPublishedWhenCapacityAvailable() { + long seqBefore = ringBuffer.getCursor(); + + RecordEventProducer.onData(new Record<>(FlushSignal.of(1L, 42L)), ringBuffer, flow); + + long seqAfter = ringBuffer.getCursor(); + Assertions.assertEquals(seqBefore + 1, seqAfter, "signal should advance cursor by 1"); + RecordEvent event = ringBuffer.get(seqAfter); + Assertions.assertTrue( + event.getRecord().getData() instanceof FlushSignal, + "published event must carry FlushSignal"); + } + + @Test + void signalIsDroppedSilentlyWhenRingBufferIsFull() { + // Pin a gating sequence at -1 so the ring buffer tracks available capacity correctly. + Sequence gatingSeq = new Sequence(-1L); + ringBuffer.addGatingSequences(gatingSeq); + for (int i = 0; i < 4; i++) { + ringBuffer.publishEvent( + (e, s) -> e.setRecord(new Record<>(new SeaTunnelRow(new Object[] {"x"})))); + } + Assertions.assertEquals(0L, ringBuffer.remainingCapacity()); + long cursorBefore = ringBuffer.getCursor(); + + Assertions.assertDoesNotThrow( + () -> + RecordEventProducer.onData( + new Record<>(FlushSignal.of(1L, 42L)), ringBuffer, flow)); + + Assertions.assertEquals( + cursorBefore, + ringBuffer.getCursor(), + "cursor must not advance when signal was dropped"); + } + + @Test + void dataRecordIsDroppedInPrepareCloseAndNotPublished() { + flow.setPrepareClose(true); + long cursorBefore = ringBuffer.getCursor(); + + RecordEventProducer.onData( + new Record<>(new SeaTunnelRow(new Object[] {"v"})), ringBuffer, flow); + + Assertions.assertEquals( + cursorBefore, + ringBuffer.getCursor(), + "data record must be dropped during prepareClose"); + } + + @Test + void barrierIsAlwaysPublishedAndFlipsPrepareCloseForFinalCheckpoint() { + CheckpointBarrier closeBarrier = + new CheckpointBarrier( + 1L, + System.currentTimeMillis(), + CheckpointType.COMPLETED_POINT_TYPE, + Collections.emptySet(), + Collections.emptySet()); + + RecordEventProducer.onData(new Record<>(closeBarrier), ringBuffer, flow); + + Mockito.verify(flow).setPrepareClose(true); + Mockito.verify(flow.getRunningTask()).ack(closeBarrier); + } +}