Skip to content

[Feature][Zeta] STIP-23 Phase 1: Add engine timer flush core flow and FlushSignal handling#10800

Open
nzw921rx wants to merge 32 commits intoapache:devfrom
nzw921rx:feature/stip-23-timer-flush-core
Open

[Feature][Zeta] STIP-23 Phase 1: Add engine timer flush core flow and FlushSignal handling#10800
nzw921rx wants to merge 32 commits intoapache:devfrom
nzw921rx:feature/stip-23-timer-flush-core

Conversation

@nzw921rx
Copy link
Copy Markdown
Collaborator

@nzw921rx nzw921rx commented Apr 21, 2026

Purpose of this pull request

Related: #10717

Phase 1 engine-core implementation for STIP-23 (Engine-Level Timer Flush for Sink Connectors). Full design: issue comment.

This PR builds the engine-side data path: timer scheduling → FlushSignal propagation → sink-side flush action invocation. No connector is modified — connector adoption (JDBC) follows in Phase 2.

Changes:

  • New Signal / FlushSignal API in seatunnel-api — control-plane signal abstraction
  • New env config sink.flush.interval (default 0 = disabled)
  • SinkWriter.Context added registerFlushAction(Runnable) / getFlushAction() (default methods, zero impact on existing connectors)
  • TaskExecutionServicetimerFlushWorker pool + timer lifecycle management
  • SourceFlowLifeCycle — timer registration/callback, injects FlushSignal via collector.sendFlushSignal() under checkpointLock
  • TransformFlowLifeCycle — signal passthrough
  • SinkFlowLifeCycle — invokes flushAction.run() on consume thread when FlushSignal arrives
  • IntermediateBlockingQueue / RecordEventProducer — non-blocking delivery for signals (offer() / tryPublishEvent()), drop on backpressure

Does this PR introduce any user-facing change?

Yes, but no impact on existing jobs.

  • New env config sink.flush.interval (Long, default 0). Disabled by default. Only takes effect in the Zeta engine when a connector also registers a flush action.
  • New default methods on SinkWriter.Context — existing connectors require zero changes.
  • Updated docs/en and docs/zh JobEnvConfig pages.

How was this patch tested?

5 new unit test classes:

  • TaskExecutionServiceTest — timer register/close/re-register lifecycle, parameter validation
  • SeaTunnelSourceCollectorFlushSignalTest — signal broadcast to multiple outputs
  • SinkWriterContextTest — flush action register/replace/null-check
  • IntermediateBlockingQueueSignalTest — signal enqueue, backpressure drop, prepareClose drop, counter accuracy
  • RecordEventProducerTest — Disruptor path: signal publish, RingBuffer-full drop, prepareClose behavior

E2E tests and the connector-level option enable_timer_flush will both be introduced in Phase 2 alongside JDBC connector adoption; neither is part of this PR.

@nzw921rx nzw921rx changed the title Feature/stip 23 timer flush core [STIP-23] [Feature][Zeta] Engine-level timer flush support for Sink connectors design discussion Apr 21, 2026
@nzw921rx nzw921rx changed the title [STIP-23] [Feature][Zeta] Engine-level timer flush support for Sink connectors design discussion [Feature][Zeta] STIP-23 Phase 1: Add engine timer flush core flow and FlushSignal handling Apr 21, 2026
@nzw921rx nzw921rx added the don't merge There needs to be a specific reason in the PR, and it cannot be merged for the time being. label Apr 21, 2026
@nzw921rx nzw921rx marked this pull request as draft April 21, 2026 12:33
@nzw921rx nzw921rx removed the don't merge There needs to be a specific reason in the PR, and it cannot be merged for the time being. label Apr 21, 2026
@nzw921rx nzw921rx marked this pull request as ready for review April 21, 2026 13:30
@davidzollo
Copy link
Copy Markdown
Contributor

Here is the review result from GPT, I think you can refer to it

What This PR Solves

The user-facing pain point is that some sinks buffer writes, and in low-throughput or idle-source scenarios, if no data keeps flowing to trigger a flush, users may wait a long time before seeing data land. The fix introduces sink.flush.interval: the Source side injects FlushSignal periodically via a timer, Transforms pass it through transparently, and the Sink executes the registered flush action upon receiving the signal.

In one sentence: this is Phase 1 of STIP-23, wiring up the "engine-level periodic flush signal" main path for the Zeta engine.

A simple example: if a Sink only flushes after buffering 1,000 rows, but the source only produces 10 rows per minute, the old logic could leave data stuck in the buffer indefinitely. This PR makes the engine send a FlushSignal at regular intervals to prompt the Sink to flush its buffered data.


Execution Flow
text
SourceSeaTunnelTask.createSourceFlowLifeCycle()
-> read env.sink.flush.interval
-> SourceFlowLifeCycle.open()
-> reader.open()
-> register()
-> registerFlushTimer()
-> TaskExecutionService.registerTimerFlushTask()

timerFlushWorker tick
-> SourceFlowLifeCycle.onTimerTick()
-> SeaTunnelSourceCollector.sendFlushSignal()
-> sendRecordToNext() under checkpointLock
-> IntermediateBlockingQueue / IntermediateDisruptor
-> TransformFlowLifeCycle passes Signal through
-> SinkFlowLifeCycle receives FlushSignal
-> writerContext.getFlushAction().run()


Key Findings

The main path is triggered when sink.flush.interval > 0, but no connector in the current repository calls registerFlushAction, so existing connectors will receive and silently ignore the signal for now. The non-blocking delivery logic for Signal in the BlockingQueue branch is broadly reasonable — dropping flush signals on a full queue prevents the timer thread from being back-pressured. However, the Disruptor branch introduces a serious regression: checkpoint barriers are no longer published to the ring buffer, which breaks the downstream checkpoint chain. The Source timer's lifecycle cleanup is also fragile: if reader.close() throws, closeFlushTimer() will never execute. Finally, the documentation and Option description reference enable_timer_flush, but no such config key exists in the code — the actual opt-in mechanism is SinkWriter.Context.registerFlushAction(...).


Issue 1: Disruptor Branch Swallows Checkpoint Barriers

Location: RecordEventProducer.java:37
Severity: High

In the baseline, after processing a Barrier, execution falls through to the unified ringBuffer.next() / publish() path. After this PR, the barrier branch only calls ack() and setPrepareClose() without invoking publishRecord(record, ringBuffer). Since RecordEventHandler only forwards records to collector.collect(record) after consuming them from the ring buffer, checkpoint barriers in Source/Transform/Sink chains using INTERMEDIATE_DISRUPTOR_QUEUE will stall on the producer side and never propagate to downstream Transforms or Sinks. This can result in incomplete checkpoints, broken final-checkpoint/prepare-close semantics, and an unreliable recovery path.

Recommended fix: Barriers must still go through the blocking publish path. tryPublishEvent can be kept for Signal, but after the barrier branch handles ack/prepareClose, it must call publishRecord(record, ringBuffer). Tests should be added to assert that the cursor advances and that the handler can collect the barrier.


Issue 2: Flush Timer Leaks When reader.close() Throws During Source Shutdown

Location: SourceFlowLifeCycle.java:204
Severity: Medium

The current close() sequence is reader.close(); closeFlushTimer(); super.close();. If the connector's reader.close() throws an IOException, the newly registered timer is never cancelled.

This means that during an abnormal shutdown or task cancellation, the timer thread may continue holding references to SourceFlowLifeCycle, the collector, and the task context, and keep attempting to send FlushSignal, resulting in incomplete resource cleanup and misleading WARN log noise.

Recommended fix: Move timer cleanup into a finally block. More robustly: cancel the timer first, then close the reader, then call super.close(). At minimum, closeFlushTimer() must execute regardless of the outcome of reader.close().


Issue 3: enable_timer_flush Is a Phantom Config in the Documentation

Location: EnvCommonOptions.java:88, JobEnvConfig.md:58
Severity: Low

Both the Option description and the English/Chinese documentation state that the Sink connector must enable enable_timer_flush. However, a codebase-wide search finds this string only in descriptions and docs — there is no corresponding Option definition or parsing logic. The real enablement mechanism is for the writer to call context.registerFlushAction(...) during initialization.

Users will attempt to configure a parameter that does not exist, and connector authors may mistakenly believe they need to add a new connector-level config rather than registering a flush callback.

Recommended fix: Update the documentation to read: "the connector must register a flush action via SinkWriter.Context.registerFlushAction." If a connector-level toggle is needed in the future, it should be defined in a dedicated connector PR with a documented default value.


Test Coverage

The new tests cover timer registration/cancellation, Signal delivery, SinkWriterContext, BlockingQueue Signal, and Disruptor Signal. However, RecordEventProducerTest.barrierIsAlwaysPublishedAndFlipsPrepareCloseForFinalCheckpoint only asserts ack and setPrepareClose — it does not verify that the barrier is actually published to the ring buffer, which is why Issue 1 was missed. It is recommended to add assertions on barrier cursor advancement and handler collection, as well as a lifecycle test verifying that the timer is cancelled when reader.close() throws.


Compatibility and Side Effects

API compatibility is maintained via a default method on SinkWriter.Context, preserving binary compatibility. The new env option defaults to 0, so existing jobs are unaffected. There is no additional performance overhead by default; when enabled, each source subtask registers a periodic task, and signals are dropped when the queue is full to prevent the timer thread from being stalled by downstream back-pressure. One point worth confirming: whether Signal as a Record payload should implement Serializable, consistent with other control-plane payloads such as CheckpointBarrier and SchemaChangeEvent.


Conclusion: Merge After Fixes

  1. Blocking: Issue 1 must be fixed — without it, checkpoint barrier propagation is broken in any pipeline using the Disruptor queue. Issue 2 is also recommended for inclusion in this PR, as it is a reliability concern directly introduced by the new timer lifecycle.
  2. Recommended: Issue 3 is a documentation/API inconsistency. It does not necessarily block the core code merge, but it is best corrected within this PR to avoid misleading users and connector authors after STIP-23 Phase 1 lands.

Overall assessment: the direction is right, and the abstraction is appropriately restrained. The Signal → Transform passthrough → Sink flushAction design is a solid foundation for the long-term solution. The current implementation should not be merged as-is; fixing the Disruptor barrier regression will make this PR substantially more stable.

@DanielLeens
Copy link
Copy Markdown

Hi @nzw921rx, I rechecked the current PR head locally as seatunnel-review-10800 at f86a217bd9a2. I reviewed the full diff against upstream/dev and did not run local Maven/tests in this batch; this is a source-level review.

This PR wires a real Zeta timer-flush control path:

SourceSeaTunnelTask.createSourceFlowLifeCycle()
  -> read env.sink.flush.interval
  -> SourceFlowLifeCycle.open()
      -> reader.open()
      -> registerFlushTimer()
          -> TaskExecutionService.registerTimerFlushTask(...)

timer tick
  -> SourceFlowLifeCycle.onTimerTick()
      -> SeaTunnelSourceCollector.sendFlushSignal(...)
      -> IntermediateBlockingQueue / IntermediateDisruptor
      -> TransformFlowLifeCycle passes Signal through
      -> SinkFlowLifeCycle receives FlushSignal
          -> writerContext.getFlushAction().run()

The feature direction is good, but I found two blockers in the current head:

  1. RecordEventProducer.onData(...) handles Barrier at RecordEventProducer.java:37-44 but never calls publishRecord(...) for that barrier. In the Disruptor queue path, the checkpoint/final barrier is acked locally but not published into the ring buffer for downstream handling. That can break checkpoint propagation and prepare-close behavior.
  2. SourceFlowLifeCycle.close() calls reader.close() before closeFlushTimer() at SourceFlowLifeCycle.java:205-209. If reader.close() throws, the timer task is not cancelled and can keep references to the source lifecycle/collector.

There is also a documentation/API mismatch: JobEnvConfig.md and EnvCommonOptions mention enable_timer_flush, but I only find it in docs/descriptions; the actual opt-in is SinkWriter.Context.registerFlushAction(...).

Conclusion: can merge after fixes

Blocking items:

  1. Publish checkpoint barriers in the Disruptor branch after ack/prepare-close handling.
  2. Make flush-timer cancellation happen in a finally path during source close.
  3. Replace the phantom enable_timer_flush wording with the real registerFlushAction(...) contract.

CI is currently still running/in progress in the fetched metadata, so this also needs a green build after the fixes.

@nzw921rx nzw921rx force-pushed the feature/stip-23-timer-flush-core branch from 33fbc02 to f9268bc Compare April 21, 2026 15:55
@nzw921rx
Copy link
Copy Markdown
Collaborator Author

Here is the review result from GPT, I think you can refer to it

What This PR Solves

The user-facing pain point is that some sinks buffer writes, and in low-throughput or idle-source scenarios, if no data keeps flowing to trigger a flush, users may wait a long time before seeing data land. The fix introduces sink.flush.interval: the Source side injects FlushSignal periodically via a timer, Transforms pass it through transparently, and the Sink executes the registered flush action upon receiving the signal.

In one sentence: this is Phase 1 of STIP-23, wiring up the "engine-level periodic flush signal" main path for the Zeta engine.

A simple example: if a Sink only flushes after buffering 1,000 rows, but the source only produces 10 rows per minute, the old logic could leave data stuck in the buffer indefinitely. This PR makes the engine send a FlushSignal at regular intervals to prompt the Sink to flush its buffered data.

Execution Flow text SourceSeaTunnelTask.createSourceFlowLifeCycle() -> read env.sink.flush.interval -> SourceFlowLifeCycle.open() -> reader.open() -> register() -> registerFlushTimer() -> TaskExecutionService.registerTimerFlushTask()

timerFlushWorker tick -> SourceFlowLifeCycle.onTimerTick() -> SeaTunnelSourceCollector.sendFlushSignal() -> sendRecordToNext() under checkpointLock -> IntermediateBlockingQueue / IntermediateDisruptor -> TransformFlowLifeCycle passes Signal through -> SinkFlowLifeCycle receives FlushSignal -> writerContext.getFlushAction().run()

Key Findings

The main path is triggered when sink.flush.interval > 0, but no connector in the current repository calls registerFlushAction, so existing connectors will receive and silently ignore the signal for now. The non-blocking delivery logic for Signal in the BlockingQueue branch is broadly reasonable — dropping flush signals on a full queue prevents the timer thread from being back-pressured. However, the Disruptor branch introduces a serious regression: checkpoint barriers are no longer published to the ring buffer, which breaks the downstream checkpoint chain. The Source timer's lifecycle cleanup is also fragile: if reader.close() throws, closeFlushTimer() will never execute. Finally, the documentation and Option description reference enable_timer_flush, but no such config key exists in the code — the actual opt-in mechanism is SinkWriter.Context.registerFlushAction(...).

Issue 1: Disruptor Branch Swallows Checkpoint Barriers

Location: RecordEventProducer.java:37 Severity: High

In the baseline, after processing a Barrier, execution falls through to the unified ringBuffer.next() / publish() path. After this PR, the barrier branch only calls ack() and setPrepareClose() without invoking publishRecord(record, ringBuffer). Since RecordEventHandler only forwards records to collector.collect(record) after consuming them from the ring buffer, checkpoint barriers in Source/Transform/Sink chains using INTERMEDIATE_DISRUPTOR_QUEUE will stall on the producer side and never propagate to downstream Transforms or Sinks. This can result in incomplete checkpoints, broken final-checkpoint/prepare-close semantics, and an unreliable recovery path.

Recommended fix: Barriers must still go through the blocking publish path. tryPublishEvent can be kept for Signal, but after the barrier branch handles ack/prepareClose, it must call publishRecord(record, ringBuffer). Tests should be added to assert that the cursor advances and that the handler can collect the barrier.

Issue 2: Flush Timer Leaks When reader.close() Throws During Source Shutdown

Location: SourceFlowLifeCycle.java:204 Severity: Medium

The current close() sequence is reader.close(); closeFlushTimer(); super.close();. If the connector's reader.close() throws an IOException, the newly registered timer is never cancelled.

This means that during an abnormal shutdown or task cancellation, the timer thread may continue holding references to SourceFlowLifeCycle, the collector, and the task context, and keep attempting to send FlushSignal, resulting in incomplete resource cleanup and misleading WARN log noise.

Recommended fix: Move timer cleanup into a finally block. More robustly: cancel the timer first, then close the reader, then call super.close(). At minimum, closeFlushTimer() must execute regardless of the outcome of reader.close().

Issue 3: enable_timer_flush Is a Phantom Config in the Documentation

Location: EnvCommonOptions.java:88, JobEnvConfig.md:58 Severity: Low

Both the Option description and the English/Chinese documentation state that the Sink connector must enable enable_timer_flush. However, a codebase-wide search finds this string only in descriptions and docs — there is no corresponding Option definition or parsing logic. The real enablement mechanism is for the writer to call context.registerFlushAction(...) during initialization.

Users will attempt to configure a parameter that does not exist, and connector authors may mistakenly believe they need to add a new connector-level config rather than registering a flush callback.

Recommended fix: Update the documentation to read: "the connector must register a flush action via SinkWriter.Context.registerFlushAction." If a connector-level toggle is needed in the future, it should be defined in a dedicated connector PR with a documented default value.

Test Coverage

The new tests cover timer registration/cancellation, Signal delivery, SinkWriterContext, BlockingQueue Signal, and Disruptor Signal. However, RecordEventProducerTest.barrierIsAlwaysPublishedAndFlipsPrepareCloseForFinalCheckpoint only asserts ack and setPrepareClose — it does not verify that the barrier is actually published to the ring buffer, which is why Issue 1 was missed. It is recommended to add assertions on barrier cursor advancement and handler collection, as well as a lifecycle test verifying that the timer is cancelled when reader.close() throws.

Compatibility and Side Effects

API compatibility is maintained via a default method on SinkWriter.Context, preserving binary compatibility. The new env option defaults to 0, so existing jobs are unaffected. There is no additional performance overhead by default; when enabled, each source subtask registers a periodic task, and signals are dropped when the queue is full to prevent the timer thread from being stalled by downstream back-pressure. One point worth confirming: whether Signal as a Record payload should implement Serializable, consistent with other control-plane payloads such as CheckpointBarrier and SchemaChangeEvent.

Conclusion: Merge After Fixes

  1. Blocking: Issue 1 must be fixed — without it, checkpoint barrier propagation is broken in any pipeline using the Disruptor queue. Issue 2 is also recommended for inclusion in this PR, as it is a reliability concern directly introduced by the new timer lifecycle.
  2. Recommended: Issue 3 is a documentation/API inconsistency. It does not necessarily block the core code merge, but it is best corrected within this PR to avoid misleading users and connector authors after STIP-23 Phase 1 lands.

Overall assessment: the direction is right, and the abstraction is appropriately restrained. The Signal → Transform passthrough → Sink flushAction design is a solid foundation for the long-term solution. The current implementation should not be merged as-is; fixing the Disruptor barrier regression will make this PR substantially more stable.

@davidzollo Thank you for your review. The suggestions were great and have been fixed

@nzw921rx
Copy link
Copy Markdown
Collaborator Author

@zhangshenghang @corgy-w @liunaijie @dybyte PTAL when you have time. Thanks

…ception in SourceFlowLifeCycle

- RecordEventProducer: barrier branch was missing publishRecord call, causing checkpoint barriers to be silently dropped
- SourceFlowLifeCycle#close: wrap in try/finally so closeFlushTimer always runs even if reader.close or super.close throws
- RecordEventProducerTest: add gating sequence so remainingCapacity correctly reflects ring buffer fullness
@nzw921rx nzw921rx force-pushed the feature/stip-23-timer-flush-core branch from bffc467 to 86384b5 Compare April 22, 2026 02:39
…xception

- Replace Runnable with RunnableWithException in SinkWriter.Context#registerFlushAction / getFlushAction
- Update SinkWriterContext field and implementation accordingly
- Remove the need for connector-level try-catch wrapping of IOException in timerFlush()
- Fix SinkWriterContextTest to declare throws Exception on affected test method
@DanielLeens
Copy link
Copy Markdown

Hi @nzw921rx, thanks for the careful update here. I re-reviewed the latest head locally on seatunnel-review-10800 (f31cca006, compared against base 60f1007ab) and walked the full Zeta path again rather than only checking the incremental diff.

What This PR Fixes

  • User pain point: when a streaming source is idle or very low throughput, buffered sink data may wait until checkpoint or close before being flushed, which can make end-to-end latency look much worse than expected.
  • Fix approach: Zeta now creates an engine-level FlushSignal from source subtasks according to env.sink.flush.interval; sink writers opt in through SinkWriter.Context.registerFlushAction(...).
  • One-sentence value: this PR adds the engine foundation for scheduled sink flushing during idle periods.

Core Flow Reviewed

Zeta source task creation
  -> SourceSeaTunnelTask.createSourceFlowLifeCycle() reads env.sink.flush.interval [SourceSeaTunnelTask.java:119-127]
  -> SourceFlowLifeCycle.open() opens reader, registers with enumerator, then registers the flush timer [SourceFlowLifeCycle.java:171-175]
  -> TaskExecutionService.registerTimerFlushTask() schedules a fixed-delay callback [TaskExecutionService.java:688-707]
  -> SourceFlowLifeCycle.onTimerTick() creates and emits FlushSignal [SourceFlowLifeCycle.java:185-194]
  -> SeaTunnelSourceCollector.sendRecordToNext() forwards it under checkpointLock [SeaTunnelSourceCollector.java:191-196]
  -> RecordEventProducer publishes Signal non-blockingly; it drops with WARN if the queue is full [RecordEventProducer.java:45-65]
  -> TransformFlowLifeCycle forwards Signal
  -> SinkFlowLifeCycle.received() calls writerContext.getFlushAction().run() for FlushSignal [SinkFlowLifeCycle.java:260-266]

The two earlier risks I cared about are now addressed:

  • Barrier is still published after ACK/prepare-close handling (RecordEventProducer.java:37-45).
  • Source timer cleanup now runs from SourceFlowLifeCycle.close() in a finally block (SourceFlowLifeCycle.java:209-216).

Findings

Issue 1: Phase 1 documents a connector option that is not actually available in this PR

  • Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java:88, docs/en/introduction/configuration/JobEnvConfig.md:58, docs/zh/introduction/configuration/JobEnvConfig.md:58
  • Severity: Medium

The engine contract introduced by this PR is SinkWriter.Context.registerFlushAction(...) (SinkWriter.java:122-131). However, the option description and JobEnvConfig docs say the sink connector must support and enable enable_timer_flush. That user-facing connector option is not introduced in Phase 1, so after this PR alone users would see documentation for a setting they cannot actually configure.

Suggested fix: in Phase 1, describe only the engine-level contract and say sinks must opt in by registering a flush action. If you want to mention enable_timer_flush, please mark it clearly as a connector-side option introduced by the follow-up adoption PR, not as a currently available setting in this PR.

Compatibility

  • API: compatible; SinkWriter.Context uses default methods.
  • Config: compatible; sink.flush.interval defaults to 0, so existing jobs keep the old behavior.
  • Protocol/serialization: internal Signal/FlushSignal is added, with no user data format change.
  • Historical behavior: unchanged unless users explicitly enable the interval.

Performance And Side Effects

  • CPU/object allocation scale with source parallelism and interval; default is disabled.
  • SourceSeaTunnelTask.java:119-125 warns when the interval is below 100 ms.
  • Flush signals are sent under the checkpoint lock, which preserves ordering with barriers.
  • Disruptor publication intentionally drops flush signals under backpressure, protecting the data path.
  • Timer cleanup is now covered by close/finally and TaskExecutionService.closeTimerFlushTask(...).

Tests And Docs

The unit coverage around timer registration, signal forwarding, sink writer context, blocking queue, and Disruptor publication is meaningful. The remaining gap is connector adoption, which belongs to the follow-up PR. The docs need the wording fix above.

Merge Conclusion

Conclusion: merge after fixes

  1. Blocking items:
  • Issue 1: adjust the Phase 1 option/docs wording so it does not expose enable_timer_flush as a currently available connector option.
  • CI: the current Build check is still QUEUED (72652367894), so it still needs to pass before merge.
  1. Non-blocking suggestions:
  • Keep the current drop-with-WARN behavior for flush signals when the queue is full; it is the right trade-off for protecting the main data path.

Overall, the engine-side design is solid and the previous concurrency/resource-lifecycle concerns are in much better shape now. Once the documentation contract is tightened and CI is green, this should be in good shape.

@DanielLeens
Copy link
Copy Markdown

Hi @nzw921rx, I rechecked the latest head locally.

What changed after Daniel's previous review

  • The remaining blocker from my last review was the per-tick production log amplification in SourceFlowLifeCycle.onTimerTick().
  • That is fixed now:
    • seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java:189-193 changed the timer-flush log from INFO to DEBUG.
    • TaskExecutionService.java:700-703 now wraps the scheduled timer worker with MDCTracer.tracing(...), which is a good follow-up for log context consistency.

Runtime chain I rechecked

SourceFlowLifeCycle.hook()
  -> TaskExecutionService.registerTimerFlushTask(...)
  -> scheduled timer tick
  -> SourceFlowLifeCycle.onTimerTick()
      -> collector.sendRecordToNext(new Record<>(FlushSignal))
  -> serializer / queue
  -> SinkFlowLifeCycle.received(FlushSignal)
      -> registered flush action

Findings

  1. The earlier production-noise blocker is fixed on the current head.
  2. I do not see a reopened code blocker in the timer-flush control path.
  3. The current GitHub Build is green.

Merge conclusion

Conclusion: can merge

Blocking items:

  • None from my side.

Non-blocking note:

  • If you want more observability later, I would still prefer metrics over per-tick higher-level logs.

Thanks for the follow-up iterations here. The current head looks ready to merge from my side.

@davidzollo
Copy link
Copy Markdown
Contributor

Thank you very much for quickly implementing so many improvements, which have yielded remarkable results!

The E2E environment adopts a single-node setup with a dedicated test sink. It can verify that the new hook registerFlushAction is triggered, yet it cannot guarantee effective coverage across task-group and Hazelcast serializer boundaries, nor can it confirm the stability of MultiTable scenarios.

Potential Risks:
This PR involves modifications to protocols, lifecycle management, message queues and multi-table writers, rather than minor adjustments to common connectors. Without underlying tests for these core modules, subsequent modifications to RecordSerializer, FlowLifeCycle.hook(), or flush aggregation logic of MultiTableSink by other developers may easily break the existing link. Meanwhile, E2E tests may not capture such issues in a timely manner.

Given the substantial scope of this change, I propose the following suggestions:

  • Add a RecordSerializer round-trip test to explicitly cover the FLUSH_SIGNAL logic.
  • Add an aggregated flush test for MultiTableSink / MultiTableSinkWriter to verify the execution of multiple proxy actions.
  • Add a source lifecycle test to confirm that timer registration is completed strictly after the RUNNING state.

@DanielLeens
Copy link
Copy Markdown

Hi @nzw921rx, I rechecked the latest head locally after the latest follow-up comment.

The earlier issues that needed blocking review are now fixed on the current head:

  • the Disruptor barrier path still publishes the barrier downstream
  • the flush timer is cleaned up in finally
  • the misleading enable_timer_flush wording is gone from the docs/config description

Runtime chain I rechecked

SourceFlowLifeCycle.hook()
  -> register timer flush task
  -> timer tick
      -> FlushSignal enters the pipeline
      -> queue / transform forward it
  -> SinkFlowLifeCycle.received(FlushSignal)
      -> registered flush action runs

Current findings:

  • I do not see a reopened code blocker in the latest head.
  • The current Build check is green.

Conclusion: can merge

Blocking items:

  • None from my side.

Non-blocking note:

  • If you want more observability later, I would still prefer metrics over raising the timer-flush log level again.

Thanks for the follow-up iterations here. The current head looks ready to merge from my side.

@nzw921rx
Copy link
Copy Markdown
Collaborator Author

Thank you very much for quickly implementing so many improvements, which have yielded remarkable results!

The E2E environment adopts a single-node setup with a dedicated test sink. It can verify that the new hook registerFlushAction is triggered, yet it cannot guarantee effective coverage across task-group and Hazelcast serializer boundaries, nor can it confirm the stability of MultiTable scenarios.

Potential Risks: This PR involves modifications to protocols, lifecycle management, message queues and multi-table writers, rather than minor adjustments to common connectors. Without underlying tests for these core modules, subsequent modifications to RecordSerializer, FlowLifeCycle.hook(), or flush aggregation logic of MultiTableSink by other developers may easily break the existing link. Meanwhile, E2E tests may not capture such issues in a timely manner.

Given the substantial scope of this change, I propose the following suggestions:

  • Add a RecordSerializer round-trip test to explicitly cover the FLUSH_SIGNAL logic.
  • Add an aggregated flush test for MultiTableSink / MultiTableSinkWriter to verify the execution of multiple proxy actions.
  • Add a source lifecycle test to confirm that timer registration is completed strictly after the RUNNING state.

@davidzollo Thanks for the thorough review — all three risk areas are spot on. I've added the corresponding tests:

  1. RecordSerializer round-trip test

Added RecordSerializerIT, which spins up a 2-node Hazelcast cluster and verifies round-trip serialization/deserialization through IMap for all three Record<T> data types (SeaTunnelRow, CheckpointBarrier, FlushSignal), with strict full-field comparison and non-empty TaskLocation set scenarios.

  1. MultiTableSink aggregated flush test

Created MultiTableFlushTestSinkWriter/Sink/Factory test components with 3 tables × 10,000 rows, parallelism=2, flush.interval=100ms. Assertions verify:

  • Each table flushed ≥ 3 times
  • Every flush snapshot has rowCount > 0 (queues drained before flush)
  • flushed ≤ written (no double-counting)
  1. Source lifecycle + state machine test

Added SeaTunnelTaskStateTest (19 test methods) using Mockito partial mock of SeaTunnelTask + real SourceFlowLifeCycle.hook(), covering:

  • State machine (14): full INIT→CLOSED flow, boundary conditions for each transition (blocks when restore incomplete, blocks when startCalled is false, CANCELLING→CANCELED), open/close/collect invocation timing, progress.done marking
  • Timer registration (5): registerTimerFlushTask called exactly once at STARTING→RUNNING, skipped when flushInterval=0/-1, correct interval value passed
  • Call ordering (4): reportTaskStatus order, open() strictly before hook(), multiple FlowLifeCycles all receive open/hook

@DanielLeens
Copy link
Copy Markdown

Hi @nzw921rx, I pulled the latest head locally again and rechecked the new follow-up after your latest comment.

What this PR solves

  • User pain: when a Zeta source goes idle, buffered sink data can sit until checkpoint/close unless the engine has an explicit timer-driven flush path.
  • Fix approach: Phase 1 wires FlushSignal through the engine path, and this latest follow-up adds the missing lower-level regression coverage that was raised in discussion.
  • One-line summary: the code-side blockers still look closed on the current head, and the new delta mainly strengthens the test boundary.

What I rechecked in this follow-up

1. Cross-worker serializer boundary
  -> RecordSerializerIT now does a 2-node Hazelcast round-trip for SeaTunnelRow / CheckpointBarrier / FlushSignal

2. Multi-table aggregated flush
  -> TimerFlushIT.testMultiTableAggregatedFlush()
  -> verifies repeated flushes, non-empty drained snapshots, and flushed <= written

3. Source lifecycle timing
  -> SeaTunnelTaskStateTest
  -> verifies registerTimerFlushTask() is called exactly at STARTING -> RUNNING, not earlier

I do not see a reopened code blocker from these additions. FlushSignal.equals/hashCode() is only supporting the strict serializer assertions and does not change the production contract.

Conclusion: merge after fixes

  1. Blocking items
  • The current GitHub Build is still QUEUED, so I would still wait for the latest CI result before merging this engine/API-level PR.
  1. Suggested follow-up
  • None newly blocking from my side. The latest test additions are aligned with the discussion points and improve confidence in the Phase 1 foundation.

From Daniel’s side, the current head still looks technically sound; the remaining gate is CI on the newest head.

@nzw921rx nzw921rx requested a review from davidzollo April 27, 2026 07:29
@davidzollo
Copy link
Copy Markdown
Contributor

I found some new issues as follows,

1. Critical Concurrency Risk: Missing checkpointLock in onTimerTick()

File: SourceFlowLifeCycle.java

Currently, onTimerTick() is triggered asynchronously by the timerFlushWorker thread pool, and it directly pushes the FlushSignal into the collector without acquiring the checkpointLock.

// Current Implementation
private void onTimerTick() {
    if (prepareClose) return;
    try {
        FlushSignal flushSignal = FlushSignal.of(currentTaskLocation.getJobId(), currentTaskLocation.getTaskID());
        Record<FlushSignal> flushSignalRecord = new Record<>(flushSignal);
        collector.sendRecordToNext(flushSignalRecord); // <-- DANGER: No lock
    }

Since the Checkpoint Barrier generation and downstream dispatch (triggerBarrier()) are also processed within this flow, failing to synchronize onTimerTick with the checkpointLock will cause FlushSignal to arbitrarily inject itself into the data stream, potentially interleaving with the Checkpoint Barrier. This strictly unstructured ordering will corrupt state machines or offset alignments in future stateful connectors.

Suggestion: Must wrap the signal emission logic inside synchronized (checkpointLock).

private void onTimerTick() {
    if (prepareClose) return;
    try {
        synchronized (checkpointLock) { // Ensure total ordering with Checkpoint Barriers
            if (prepareClose) return;
            FlushSignal flushSignal = FlushSignal.of(currentTaskLocation.getJobId(), currentTaskLocation.getTaskID());
            collector.sendRecordToNext(new Record<>(flushSignal));
        }
    } catch (Throwable e) {
        log.warn("Failed to broadcast FlushSignal from task {}", currentTaskLocation, e);
    }
}

2. High Risk: Sink Uncaught Exception Avalanche

File: SinkFlowLifeCycle.java inside received()

if (signal instanceof FlushSignal && writerContext.getFlushAction() != null) {
    writerContext.getFlushAction().run(); // <-- No exception handling
}

The actual flush actions (Phase 2, e.g., JDBC flush or Iceberg commit) will likely involve network or disk I/O, which frequently throw exceptions (like connection timeouts). Without a try-catch block here, an exception thrown during the flush action will bubble up and crash the entire Sink Task Worker thread, leading to task failure and unnecessary job restarts. A helper flush signal should never be allowed to crash the main data processing thread.

Suggestion:

if (signal instanceof FlushSignal && writerContext.getFlushAction() != null) {
    try {
        writerContext.getFlushAction().run();
    } catch (Exception e) {
        log.warn("Flush action failed for FlushSignal from task {}, continuing processing data...", signal, e);
    }
}

3. Minor Improvement: serialVersionUID

For FlushSignal.java, since it implements Serializable, it’s strongly recommended to define a serialVersionUID:

private static final long serialVersionUID = 1L;

This guarantees backward compatibility for Checkpoint state persistence if the FlushSignal fields are changed in future phases.

Looking forward to the fixes.

@nzw921rx
Copy link
Copy Markdown
Collaborator Author

nzw921rx commented Apr 27, 2026

I found some new issues as follows,

1. Critical Concurrency Risk: Missing checkpointLock in onTimerTick()

File: SourceFlowLifeCycle.java

Currently, onTimerTick() is triggered asynchronously by the timerFlushWorker thread pool, and it directly pushes the FlushSignal into the collector without acquiring the checkpointLock.

// Current Implementation
private void onTimerTick() {
    if (prepareClose) return;
    try {
        FlushSignal flushSignal = FlushSignal.of(currentTaskLocation.getJobId(), currentTaskLocation.getTaskID());
        Record<FlushSignal> flushSignalRecord = new Record<>(flushSignal);
        collector.sendRecordToNext(flushSignalRecord); // <-- DANGER: No lock
    }

Since the Checkpoint Barrier generation and downstream dispatch (triggerBarrier()) are also processed within this flow, failing to synchronize onTimerTick with the checkpointLock will cause FlushSignal to arbitrarily inject itself into the data stream, potentially interleaving with the Checkpoint Barrier. This strictly unstructured ordering will corrupt state machines or offset alignments in future stateful connectors.

Suggestion: Must wrap the signal emission logic inside synchronized (checkpointLock).

private void onTimerTick() {
    if (prepareClose) return;
    try {
        synchronized (checkpointLock) { // Ensure total ordering with Checkpoint Barriers
            if (prepareClose) return;
            FlushSignal flushSignal = FlushSignal.of(currentTaskLocation.getJobId(), currentTaskLocation.getTaskID());
            collector.sendRecordToNext(new Record<>(flushSignal));
        }
    } catch (Throwable e) {
        log.warn("Failed to broadcast FlushSignal from task {}", currentTaskLocation, e);
    }
}

2. High Risk: Sink Uncaught Exception Avalanche

File: SinkFlowLifeCycle.java inside received()

if (signal instanceof FlushSignal && writerContext.getFlushAction() != null) {
    writerContext.getFlushAction().run(); // <-- No exception handling
}

The actual flush actions (Phase 2, e.g., JDBC flush or Iceberg commit) will likely involve network or disk I/O, which frequently throw exceptions (like connection timeouts). Without a try-catch block here, an exception thrown during the flush action will bubble up and crash the entire Sink Task Worker thread, leading to task failure and unnecessary job restarts. A helper flush signal should never be allowed to crash the main data processing thread.

Suggestion:

if (signal instanceof FlushSignal && writerContext.getFlushAction() != null) {
    try {
        writerContext.getFlushAction().run();
    } catch (Exception e) {
        log.warn("Flush action failed for FlushSignal from task {}, continuing processing data...", signal, e);
    }
}

3. Minor Improvement: serialVersionUID

For FlushSignal.java, since it implements Serializable, it’s strongly recommended to define a serialVersionUID:

private static final long serialVersionUID = 1L;

This guarantees backward compatibility for Checkpoint state persistence if the FlushSignal fields are changed in future phases.

Looking forward to the fixes.

@davidzollo Thank you for your detailed suggestions

  1. sendRecordToNext The underlying layer has already used checkpointLock to obtain the lock
 public void sendRecordToNext(Record<?> record) throws IOException {
        synchronized (checkpointLock) {
            for (OneInputFlowLifeCycle<Record<?>> output : outputs) {
                output.received(record);
            }
        }
    }
  1. If the writeRecord JDBC timeout occurs in the normal link, this problem will also occur. It is necessary to ensure that the downstream flushSignal is truly processed. After swallowing the exception, the subsequent CK will also restart the task due to unstable JDBC timeout and other situations
if (signal instanceof FlushSignal && writerContext.getFlushAction() != null) {
    writerContext.getFlushAction().run(); // <-- No exception handling
}
  1. fixed

@nzw921rx
Copy link
Copy Markdown
Collaborator Author

@dybyte Please help review it. Any suggestions would be greatly appreciated

@nzw921rx nzw921rx closed this Apr 27, 2026
@nzw921rx nzw921rx reopened this Apr 27, 2026
Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gabby1996, first on the earlier thread: on the newest head, I agree the checkpointLock concern is already covered indirectly because SeaTunnelSourceCollector.sendRecordToNext() synchronizes on that same lock. I also would keep flush-action failures task-failing rather than swallowing them, because a failed flush is still a real sink write/visibility failure, not something the engine should silently treat as success.

I pulled the latest head locally on seatunnel-review-10800, rechecked the full source -> transform -> sink signal path against upstream/dev, and reviewed the current head commit 213912224292031cf7e9e1f73a712f35525e95f5. I did not run local Maven in this batch; this is a source-level review plus current GitHub check metadata.

What this PR fixes

  • User pain: buffered sinks can leave data sitting too long in low-throughput or idle-source cases.
  • Fix approach: add sink.flush.interval, emit FlushSignal from the source side on a timer, pass it through transforms, and let the sink execute the registered flush action.
  • One-line summary: the signal path is much healthier now, but the current flush cadence semantics are still wrong on multi-source fan-in topologies.

Runtime chain I checked

job startup
  -> SourceSeaTunnelTask.createSourceFlowLifeCycle() [119-135]
      -> read sink.flush.interval
      -> create one SourceFlowLifeCycle per source subtask

timer registration
  -> SourceFlowLifeCycle.startFlushTimer() [344-358]
      -> TaskExecutionService.registerTimerFlushTask(...) [689-714]
      -> one scheduled task per source subtask

timer tick
  -> SourceFlowLifeCycle.onTimerTick() [184-196]
      -> collector.sendRecordToNext(new Record<>(FlushSignal))
          -> SeaTunnelSourceCollector.sendRecordToNext() [191-196]
              -> broadcast to every downstream output under checkpointLock

middle stages
  -> TransformFlowLifeCycle.received(...) [81-93,115-120]
      -> Signal is passed through unchanged

sink execution
  -> SinkFlowLifeCycle.received(...) [260-267]
      -> every received FlushSignal runs writerContext.getFlushAction().run()

Key review findings

  1. The latest head fixed the earlier barrier-publish concern and tightened timer cleanup, which is good progress.
  2. The remaining blocker is the trigger dimension: the timer is attached to each source subtask, not to each sink writer.
  3. That means a sink task receives one flush signal per upstream source timer tick, so the real flush rate scales with source parallelism.
  4. Example: with sink.flush.interval=1000ms, 10 source subtasks feeding 1 sink task can drive roughly 10 sink flushes per second instead of the user-expected ~1.
  5. Current GitHub Build is still pending on the latest head, but my blocking conclusion here is code-level and does not depend on that check result.

Blocking issue

Issue 1: sink.flush.interval does not currently mean “one sink flush cadence”

  • Location: SourceSeaTunnelTask.java:119, SourceFlowLifeCycle.java:184, SinkFlowLifeCycle.java:260
  • Why this is a real problem:
    the current design makes the source side define the sink flush frequency. In fan-in topologies, the sink sees N independent periodic signals rather than one cadence.
  • Risk:
    excessive flush frequency, more small-batch I/O, more sink overhead, more timeout/limit exposure, and user-visible behavior that does not match the config meaning.
  • Suggested fix:
    option A is the cleaner one: move the timer ownership to the sink writer / sink task side so one sink writer owns one flush cadence.
    Option B would be to deduplicate signals at the sink side within a time window, but that is more of a mitigation than a clean design.

Conclusion

Conclusion: merge after fixes

  1. Blocking items
  • Issue 1 must be fixed first. The core semantics of sink.flush.interval are not right yet on the normal multi-source path.
  1. Suggested follow-up
  • Please add a regression test for multi-source fan-in to one sink, and assert that one interval produces only one effective sink flush.

The direction here is promising, and the latest head is clearly stronger than the earlier revisions. The remaining blocker is just important enough that I would not merge before it is corrected.

@nzw921rx
Copy link
Copy Markdown
Collaborator Author

nzw921rx commented Apr 27, 2026

Hi @gabby1996, first on the earlier thread: on the newest head, I agree the checkpointLock concern is already covered indirectly because SeaTunnelSourceCollector.sendRecordToNext() synchronizes on that same lock. I also would keep flush-action failures task-failing rather than swallowing them, because a failed flush is still a real sink write/visibility failure, not something the engine should silently treat as success.

I pulled the latest head locally on seatunnel-review-10800, rechecked the full source -> transform -> sink signal path against upstream/dev, and reviewed the current head commit 213912224292031cf7e9e1f73a712f35525e95f5. I did not run local Maven in this batch; this is a source-level review plus current GitHub check metadata.

What this PR fixes

  • User pain: buffered sinks can leave data sitting too long in low-throughput or idle-source cases.
  • Fix approach: add sink.flush.interval, emit FlushSignal from the source side on a timer, pass it through transforms, and let the sink execute the registered flush action.
  • One-line summary: the signal path is much healthier now, but the current flush cadence semantics are still wrong on multi-source fan-in topologies.

Runtime chain I checked

job startup
  -> SourceSeaTunnelTask.createSourceFlowLifeCycle() [119-135]
      -> read sink.flush.interval
      -> create one SourceFlowLifeCycle per source subtask

timer registration
  -> SourceFlowLifeCycle.startFlushTimer() [344-358]
      -> TaskExecutionService.registerTimerFlushTask(...) [689-714]
      -> one scheduled task per source subtask

timer tick
  -> SourceFlowLifeCycle.onTimerTick() [184-196]
      -> collector.sendRecordToNext(new Record<>(FlushSignal))
          -> SeaTunnelSourceCollector.sendRecordToNext() [191-196]
              -> broadcast to every downstream output under checkpointLock

middle stages
  -> TransformFlowLifeCycle.received(...) [81-93,115-120]
      -> Signal is passed through unchanged

sink execution
  -> SinkFlowLifeCycle.received(...) [260-267]
      -> every received FlushSignal runs writerContext.getFlushAction().run()

Key review findings

  1. The latest head fixed the earlier barrier-publish concern and tightened timer cleanup, which is good progress.
  2. The remaining blocker is the trigger dimension: the timer is attached to each source subtask, not to each sink writer.
  3. That means a sink task receives one flush signal per upstream source timer tick, so the real flush rate scales with source parallelism.
  4. Example: with sink.flush.interval=1000ms, 10 source subtasks feeding 1 sink task can drive roughly 10 sink flushes per second instead of the user-expected ~1.
  5. Current GitHub Build is still pending on the latest head, but my blocking conclusion here is code-level and does not depend on that check result.

Blocking issue

Issue 1: sink.flush.interval does not currently mean “one sink flush cadence”

  • Location: SourceSeaTunnelTask.java:119, SourceFlowLifeCycle.java:184, SinkFlowLifeCycle.java:260
  • Why this is a real problem:
    the current design makes the source side define the sink flush frequency. In fan-in topologies, the sink sees N independent periodic signals rather than one cadence.
  • Risk:
    excessive flush frequency, more small-batch I/O, more sink overhead, more timeout/limit exposure, and user-visible behavior that does not match the config meaning.
  • Suggested fix:
    option A is the cleaner one: move the timer ownership to the sink writer / sink task side so one sink writer owns one flush cadence.
    Option B would be to deduplicate signals at the sink side within a time window, but that is more of a mitigation than a clean design.

Conclusion

Conclusion: merge after fixes

  1. Blocking items
  • Issue 1 must be fixed first. The core semantics of sink.flush.interval are not right yet on the normal multi-source path.
  1. Suggested follow-up
  • Please add a regression test for multi-source fan-in to one sink, and assert that one interval produces only one effective sink flush.

The direction here is promising, and the latest head is clearly stronger than the earlier revisions. The remaining blocker is just important enough that I would not merge before it is corrected.

@DanielLeens Thanks for the detailed review — I rechecked this with a targeted physical-plan test and the current engine topology does not match the assumed fan-in shape.

I added a PhysicalPlanGenerator test for source=10, transform=4, sink=4 and printed the generated runtime task counts:

[source=10, transform=4, sink=4] generated counts => tasks=20, sourceTasks=10, transformTasks=10, sinkTasks=10

This indicates that in the current execution model, sink tasks are expanded along the same per-subtask task-chain pattern, i.e., we do not get a 10 source -> 1 sink writer runtime fan-in in this shape. In other words, the specific “N source timers amplify one sink writer flush rate by N” scenario is not reproduced on the current plan generation path.

That said, if we later introduce a true cross-chain fan-in topology (multiple upstream subtasks converging on a single sink writer), this concern would become valid and should be re-evaluated then.

case:

@Test
    @SetEnvironmentVariable(key = SKIP_CHECK_JAR, value = "true")
    public void testSource10Transform4Sink4PhysicalExpand() throws MalformedURLException {
        IdGenerator idGenerator = new IdGenerator();

        Action sourceAction =
                new SourceAction<>(
                        idGenerator.getNextId(),
                        "fake-source",
                        createFakeSource(),
                        Sets.newHashSet(new URL("file:///fake.jar")),
                        Collections.emptySet());
        LogicalVertex sourceVertex = new LogicalVertex(sourceAction.getId(), sourceAction, 10);

        CatalogTable table = createSimpleCatalogTable("default_table");
        Action transformAction =
                new TransformAction(
                        idGenerator.getNextId(),
                        "noop-transform",
                        new ArrayList<>(Collections.singleton(sourceAction)),
                        createNoopTransform(table),
                        Sets.newHashSet(new URL("file:///transform.jar")),
                        Collections.emptySet());
        LogicalVertex transformVertex =
                new LogicalVertex(transformAction.getId(), transformAction, 4);

        Action sinkAction =
                new SinkAction<>(
                        idGenerator.getNextId(),
                        "console-sink",
                        new ArrayList<>(Collections.singleton(transformAction)),
                        new ConsoleSink(table, ReadonlyConfig.fromMap(new HashMap<>())),
                        Sets.newHashSet(new URL("file:///console.jar")),
                        Collections.emptySet());
        LogicalVertex sinkVertex = new LogicalVertex(sinkAction.getId(), sinkAction, 4);

        JobConfig config = new JobConfig();
        config.setName("source10-transform4-sink4");
        LogicalDag logicalDag = new LogicalDag(config, idGenerator);
        logicalDag.addLogicalVertex(sourceVertex);
        logicalDag.addLogicalVertex(transformVertex);
        logicalDag.addLogicalVertex(sinkVertex);
        logicalDag.addEdge(new LogicalEdge(sourceVertex, transformVertex));
        logicalDag.addEdge(new LogicalEdge(transformVertex, sinkVertex));

        JobImmutableInformation jobImmutableInformation =
                new JobImmutableInformation(
                        2L,
                        "source10-transform4-sink4",
                        nodeEngine.getSerializationService(),
                        logicalDag,
                        Collections.emptyList(),
                        Collections.emptyList());
        IMap<Object, Object> runningJobState =
                nodeEngine.getHazelcastInstance().getMap("testRunningJobState_source10_t4");
        IMap<Object, Long[]> runningJobStateTimestamp =
                nodeEngine
                        .getHazelcastInstance()
                        .getMap("testRunningJobStateTimestamp_source10_t4");

        PhysicalPlan physicalPlan =
                PlanUtils.fromLogicalDAG(
                                logicalDag,
                                nodeEngine,
                                jobImmutableInformation,
                                System.currentTimeMillis(),
                                Executors.newCachedThreadPool(),
                                server.getClassLoaderService(),
                                instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
                                runningJobState,
                                runningJobStateTimestamp,
                                QueueType.BLOCKINGQUEUE,
                                new EngineConfig())
                        .f0();

        SubPlan subPlan = physicalPlan.getPipelineList().get(0);
        Assertions.assertEquals(10, subPlan.getPhysicalVertexList().size());
        Assertions.assertTrue(
                subPlan.getPhysicalVertexList().stream()
                        .allMatch(v -> v.getTaskGroupImmutableInformation().getTasksData().size() == 2));

        int taskCount = 0;
        int sourceTaskCount = 0;
        int transformTaskCount = 0;
        int sinkTaskCount = 0;
        for (PhysicalVertex physicalVertex : subPlan.getPhysicalVertexList()) {
            for (Task task : physicalVertex.getTaskGroup().getTasks()) {
                taskCount++;
                if (task instanceof SeaTunnelTask) {
                    SeaTunnelTask seaTunnelTask = (SeaTunnelTask) task;
                    boolean containsSource = false;
                    boolean containsTransform = false;
                    boolean containsSink = false;
                    for (ActionStateKey actionStateKey : seaTunnelTask.getActionStateKeys()) {
                        String keyName = actionStateKey.getName();
                        containsSource = containsSource || keyName.contains("fake-source");
                        containsTransform = containsTransform || keyName.contains("noop-transform");
                        containsSink = containsSink || keyName.contains("console-sink");
                    }
                    if (containsSource) {
                        sourceTaskCount++;
                    }
                    if (containsTransform) {
                        transformTaskCount++;
                    }
                    if (containsSink) {
                        sinkTaskCount++;
                    }
                }
            }
        }
        System.out.println(
                "[source=10, transform=4, sink=4] generated counts => tasks="
                        + taskCount
                        + ", sourceTasks="
                        + sourceTaskCount
                        + ", transformTasks="
                        + transformTaskCount
                        + ", sinkTasks="
                        + sinkTaskCount);
    }

result:
[source=10, transform=4, sink=4] generated counts => tasks=20, sourceTasks=10, transformTasks=10, sinkTasks=10

@davidzollo
Copy link
Copy Markdown
Contributor

I found some new issues as follows,

1. Critical Concurrency Risk: Missing checkpointLock in onTimerTick()

File: SourceFlowLifeCycle.java
Currently, onTimerTick() is triggered asynchronously by the timerFlushWorker thread pool, and it directly pushes the FlushSignal into the collector without acquiring the checkpointLock.

// Current Implementation
private void onTimerTick() {
    if (prepareClose) return;
    try {
        FlushSignal flushSignal = FlushSignal.of(currentTaskLocation.getJobId(), currentTaskLocation.getTaskID());
        Record<FlushSignal> flushSignalRecord = new Record<>(flushSignal);
        collector.sendRecordToNext(flushSignalRecord); // <-- DANGER: No lock
    }

Since the Checkpoint Barrier generation and downstream dispatch (triggerBarrier()) are also processed within this flow, failing to synchronize onTimerTick with the checkpointLock will cause FlushSignal to arbitrarily inject itself into the data stream, potentially interleaving with the Checkpoint Barrier. This strictly unstructured ordering will corrupt state machines or offset alignments in future stateful connectors.
Suggestion: Must wrap the signal emission logic inside synchronized (checkpointLock).

private void onTimerTick() {
    if (prepareClose) return;
    try {
        synchronized (checkpointLock) { // Ensure total ordering with Checkpoint Barriers
            if (prepareClose) return;
            FlushSignal flushSignal = FlushSignal.of(currentTaskLocation.getJobId(), currentTaskLocation.getTaskID());
            collector.sendRecordToNext(new Record<>(flushSignal));
        }
    } catch (Throwable e) {
        log.warn("Failed to broadcast FlushSignal from task {}", currentTaskLocation, e);
    }
}

2. High Risk: Sink Uncaught Exception Avalanche

File: SinkFlowLifeCycle.java inside received()

if (signal instanceof FlushSignal && writerContext.getFlushAction() != null) {
    writerContext.getFlushAction().run(); // <-- No exception handling
}

The actual flush actions (Phase 2, e.g., JDBC flush or Iceberg commit) will likely involve network or disk I/O, which frequently throw exceptions (like connection timeouts). Without a try-catch block here, an exception thrown during the flush action will bubble up and crash the entire Sink Task Worker thread, leading to task failure and unnecessary job restarts. A helper flush signal should never be allowed to crash the main data processing thread.
Suggestion:

if (signal instanceof FlushSignal && writerContext.getFlushAction() != null) {
    try {
        writerContext.getFlushAction().run();
    } catch (Exception e) {
        log.warn("Flush action failed for FlushSignal from task {}, continuing processing data...", signal, e);
    }
}

3. Minor Improvement: serialVersionUID

For FlushSignal.java, since it implements Serializable, it’s strongly recommended to define a serialVersionUID:

private static final long serialVersionUID = 1L;

This guarantees backward compatibility for Checkpoint state persistence if the FlushSignal fields are changed in future phases.
Looking forward to the fixes.

@davidzollo Thank you for your detailed suggestions

  1. sendRecordToNext The underlying layer has already used checkpointLock to obtain the lock
 public void sendRecordToNext(Record<?> record) throws IOException {
        synchronized (checkpointLock) {
            for (OneInputFlowLifeCycle<Record<?>> output : outputs) {
                output.received(record);
            }
        }
    }
  1. If the writeRecord JDBC timeout occurs in the normal link, this problem will also occur. It is necessary to ensure that the downstream flushSignal is truly processed. After swallowing the exception, the subsequent CK will also restart the task due to unstable JDBC timeout and other situations
if (signal instanceof FlushSignal && writerContext.getFlushAction() != null) {
    writerContext.getFlushAction().run(); // <-- No exception handling
}
  1. fixed

You're right! Good job.

Copy link
Copy Markdown
Contributor

@davidzollo davidzollo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
LGTM

@nzw921rx
Copy link
Copy Markdown
Collaborator Author

+1 LGTM

@davidzollo Thanks a lot for the thorough review and approval!

@DanielLeens
Copy link
Copy Markdown

Thanks for the follow-up discussion. I re-reviewed the latest head locally and traced the timer-flush path end to end.

What this PR solves

  • User pain: sinks that only flush on checkpoint can hold buffered data for too long in long-running jobs.
  • Fix approach: emit a FlushSignal from the source side on a timer and let the sink run writerContext.getFlushAction() when the signal arrives.
  • One-line value: the goal is valid, but the current implementation still attaches flush cadence to source parallelism instead of sink-writer cadence.

Runtime path

Task startup
  -> SourceSeaTunnelTask.createSourceFlowLifeCycle() [SourceSeaTunnelTask.java:113-135]
     -> reads sink.flush.interval from env [119]
     -> creates one SourceFlowLifeCycle per source subtask

Timer tick
  -> SourceFlowLifeCycle.onTimerTick() [SourceFlowLifeCycle.java:184-196]
     -> builds FlushSignal
     -> sends it downstream

Sink side
  -> SinkFlowLifeCycle.received() [SinkFlowLifeCycle.java:260-267]
     -> if signal is FlushSignal
        -> writerContext.getFlushAction().run()

Result
  -> source parallelism = N
  -> sink flushes N times per interval, not once per sink writer

Review findings

Issue 1: Flush cadence is multiplied by source parallelism on the normal fan-in path

  • Location: SourceSeaTunnelTask.java:119; SourceFlowLifeCycle.java:184; SinkFlowLifeCycle.java:265
  • Why this matters: every source subtask starts its own timer and emits its own FlushSignal, and the sink executes a flush for every signal it receives.
  • Risk: higher flush frequency than configured, reduced batching efficiency, throughput loss, and unnecessary load on external systems.
  • Better fix: move the timer to the sink-writer side, or deduplicate and throttle FlushSignal at the sink-writer boundary so each writer flushes at most once per interval.

Merge conclusion

Conclusion: Merge after fixes

Blocking items:

  • Issue 1 should be fixed first.
  • CI is not stable yet. The Apache-side Build check is red (73208205703), while the linked fork run 25000271118 was still in_progress at my review snapshot.

Non-blocking suggestions:

  • Please add a fan-in regression test with multiple source subtasks feeding one sink writer.

CI status:

  • The PR check is red, and the underlying fork run state was still moving during this review.

@nzw921rx
Copy link
Copy Markdown
Collaborator Author

Thanks for the follow-up discussion. I re-reviewed the latest head locally and traced the timer-flush path end to end.

What this PR solves

  • User pain: sinks that only flush on checkpoint can hold buffered data for too long in long-running jobs.
  • Fix approach: emit a FlushSignal from the source side on a timer and let the sink run writerContext.getFlushAction() when the signal arrives.
  • One-line value: the goal is valid, but the current implementation still attaches flush cadence to source parallelism instead of sink-writer cadence.

Runtime path

Task startup
  -> SourceSeaTunnelTask.createSourceFlowLifeCycle() [SourceSeaTunnelTask.java:113-135]
     -> reads sink.flush.interval from env [119]
     -> creates one SourceFlowLifeCycle per source subtask

Timer tick
  -> SourceFlowLifeCycle.onTimerTick() [SourceFlowLifeCycle.java:184-196]
     -> builds FlushSignal
     -> sends it downstream

Sink side
  -> SinkFlowLifeCycle.received() [SinkFlowLifeCycle.java:260-267]
     -> if signal is FlushSignal
        -> writerContext.getFlushAction().run()

Result
  -> source parallelism = N
  -> sink flushes N times per interval, not once per sink writer

Review findings

Issue 1: Flush cadence is multiplied by source parallelism on the normal fan-in path

  • Location: SourceSeaTunnelTask.java:119; SourceFlowLifeCycle.java:184; SinkFlowLifeCycle.java:265
  • Why this matters: every source subtask starts its own timer and emits its own FlushSignal, and the sink executes a flush for every signal it receives.
  • Risk: higher flush frequency than configured, reduced batching efficiency, throughput loss, and unnecessary load on external systems.
  • Better fix: move the timer to the sink-writer side, or deduplicate and throttle FlushSignal at the sink-writer boundary so each writer flushes at most once per interval.

Merge conclusion

Conclusion: Merge after fixes

Blocking items:

  • Issue 1 should be fixed first.
  • CI is not stable yet. The Apache-side Build check is red (73208205703), while the linked fork run 25000271118 was still in_progress at my review snapshot.

Non-blocking suggestions:

  • Please add a fan-in regression test with multiple source subtasks feeding one sink writer.

CI status:

  • The PR check is red, and the underlying fork run state was still moving during this review.

@DanielLeens The core design of the source injection remains unchanged. Please refer to #10717 for specific design details

@nzw921rx nzw921rx requested a review from dybyte April 28, 2026 02:22
Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @nzw921rx, thanks for the follow-up. I also went back and read #10717 before re-checking the current head locally, so I’m responding with the design background in mind rather than only the current diff.

I agree with the core goal in #10717: Zeta should provide a task-thread timer callback so sink connectors do not have to create their own background scheduler threads, and flush failures can still fail fast on the task path.

However, after re-tracing the current implementation, I still do not think the remaining blocker is resolved by that design note.

What this PR solves:

  • User pain: buffered sinks can hold data too long in low-throughput / idle scenarios.
  • Fix approach: source-side timers emit FlushSignal, transforms pass it through, and sinks run writerContext.getFlushAction() when the signal arrives.
  • One-line summary: the direction is good, but the current cadence owner is still the source side, so the runtime semantics of sink.flush.interval are wrong on normal fan-in topologies.

Runtime chain I re-verified:

task startup
  -> SourceSeaTunnelTask.createSourceFlowLifeCycle() [SourceSeaTunnelTask.java:113-135]
     -> read sink.flush.interval [119]
     -> create one SourceFlowLifeCycle per source subtask

timer registration
  -> SourceFlowLifeCycle.startFlushTimer() [344-355]
     -> register one scheduled task per source subtask

timer tick
  -> SourceFlowLifeCycle.onTimerTick() [184-193]
     -> build FlushSignal
     -> collector.sendRecordToNext(...)

sink side
  -> SinkFlowLifeCycle.received() [260-267]
     -> each received FlushSignal runs writerContext.getFlushAction().run()

The remaining blocker is still:

  1. sink.flush.interval is currently multiplied by source parallelism on the normal fan-in path.
    • Every source subtask owns its own timer.
    • Every timer emits its own FlushSignal.
    • The sink executes one flush per signal.
    • So with N source subtasks feeding one sink writer, the sink can flush roughly N times per interval instead of once per sink-writer cadence.

Why I do not think #10717 removes this concern:

  • #10717 explains why the engine should provide timed callbacks on the task thread.
  • It does not prove that the timer owner must be the source side.
  • The current code still makes the sink flush cadence depend on source topology, not sink-writer semantics.

That is why I still consider this a code-level blocker rather than just a design-style disagreement.

Suggested fix order:

  • Preferred: move timer ownership to the sink writer / sink task boundary, so each sink writer owns one flush cadence.
  • Smaller mitigation: keep source-side signal injection, but add sink-side dedup/throttling so each writer flushes at most once per interval window.

Test gap that still matters:

  • I still do not see a regression test for multiple source subtasks -> one sink writer, proving that one interval causes only one effective sink flush.

CI note:

  • The current Apache-side Build is red, and the linked fork-run metadata was still moving when I checked. That said, my blocking conclusion here is code semantics, not CI status.

Conclusion: merge after fixes

Blocking items:

  • Fix the cadence ownership problem so sink.flush.interval really means one sink-writer flush cadence, not one source-subtask signal cadence.

Non-blocking suggestion:

  • Add the multi-source fan-in regression test once the cadence semantics are corrected.

Overall, I still like the direction and the STIP motivation. The remaining gap is specifically about where the timer semantics are anchored in the runtime path.

@nzw921rx
Copy link
Copy Markdown
Collaborator Author

Hi @nzw921rx, thanks for the follow-up. I also went back and read #10717 before re-checking the current head locally, so I’m responding with the design background in mind rather than only the current diff.

I agree with the core goal in #10717: Zeta should provide a task-thread timer callback so sink connectors do not have to create their own background scheduler threads, and flush failures can still fail fast on the task path.

However, after re-tracing the current implementation, I still do not think the remaining blocker is resolved by that design note.

What this PR solves:

  • User pain: buffered sinks can hold data too long in low-throughput / idle scenarios.
  • Fix approach: source-side timers emit FlushSignal, transforms pass it through, and sinks run writerContext.getFlushAction() when the signal arrives.
  • One-line summary: the direction is good, but the current cadence owner is still the source side, so the runtime semantics of sink.flush.interval are wrong on normal fan-in topologies.

Runtime chain I re-verified:

task startup
  -> SourceSeaTunnelTask.createSourceFlowLifeCycle() [SourceSeaTunnelTask.java:113-135]
     -> read sink.flush.interval [119]
     -> create one SourceFlowLifeCycle per source subtask

timer registration
  -> SourceFlowLifeCycle.startFlushTimer() [344-355]
     -> register one scheduled task per source subtask

timer tick
  -> SourceFlowLifeCycle.onTimerTick() [184-193]
     -> build FlushSignal
     -> collector.sendRecordToNext(...)

sink side
  -> SinkFlowLifeCycle.received() [260-267]
     -> each received FlushSignal runs writerContext.getFlushAction().run()

The remaining blocker is still:

  1. sink.flush.interval is currently multiplied by source parallelism on the normal fan-in path.

    • Every source subtask owns its own timer.
    • Every timer emits its own FlushSignal.
    • The sink executes one flush per signal.
    • So with N source subtasks feeding one sink writer, the sink can flush roughly N times per interval instead of once per sink-writer cadence.

Why I do not think #10717 removes this concern:

  • #10717 explains why the engine should provide timed callbacks on the task thread.
  • It does not prove that the timer owner must be the source side.
  • The current code still makes the sink flush cadence depend on source topology, not sink-writer semantics.

That is why I still consider this a code-level blocker rather than just a design-style disagreement.

Suggested fix order:

  • Preferred: move timer ownership to the sink writer / sink task boundary, so each sink writer owns one flush cadence.
  • Smaller mitigation: keep source-side signal injection, but add sink-side dedup/throttling so each writer flushes at most once per interval window.

Test gap that still matters:

  • I still do not see a regression test for multiple source subtasks -> one sink writer, proving that one interval causes only one effective sink flush.

CI note:

  • The current Apache-side Build is red, and the linked fork-run metadata was still moving when I checked. That said, my blocking conclusion here is code semantics, not CI status.

Conclusion: merge after fixes

Blocking items:

  • Fix the cadence ownership problem so sink.flush.interval really means one sink-writer flush cadence, not one source-subtask signal cadence.

Non-blocking suggestion:

  • Add the multi-source fan-in regression test once the cadence semantics are corrected.

Overall, I still like the direction and the STIP motivation. The remaining gap is specifically about where the timer semantics are anchored in the runtime path.

@DanielLeens

In the current operational topology of Zeta, [source=10, transform=4, sink=4] will generate source tasks=10, transform tasks=10, sink tasks=10, and there is no "multiple source subtasks ->one sink writer" fan in relationship that you assume. Therefore, sink. flush. interval will not be linearly amplified by the source parallelism. If there is a reproducible fan in to single writer DAG, please provide specific job configurations before continuing the evaluation.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @nzw921rx, thanks for pushing back with the concrete topology example. I went back through the current Zeta planner/runtime chain end to end instead of continuing from my earlier assumption, and your latest point is correct: on the current topology I cannot reproduce the multiple source subtasks -> one shared sink writer fan-in path that my previous blocker relied on. So I am withdrawing that blocker.

I re-reviewed the latest head locally against upstream/dev as a source-level review. I did not run local Maven in this batch.

What this PR solves

  • User pain: buffered sinks can hold visible data for too long in low-throughput or briefly idle streaming jobs.
  • Fix approach: add an engine-level timer-flush capability. A source-side timer emits FlushSignal, transforms pass it through, and sinks execute writerContext.getFlushAction() when the writer explicitly registers one.
  • One-line summary: this PR lands the Zeta core path for STIP-23 Phase 1 and moves periodic flush handling onto the engine task path instead of connector-owned background threads.

Runtime chain I re-verified

union / fan-in planning
  -> PipelineGenerator.checkCanSplit() [PipelineGenerator.java:136-138]
      -> union inputs enter split logic
  -> PipelineGenerator.splitUnionVertex() [PipelineGenerator.java:140-183]
      -> rebuilds downstream vertices per source pipeline
      -> downstream parallelism follows the upstream pipeline branch

source -> sink physical plan
  -> PhysicalPlanGenerator.getSourceTask() [PhysicalPlanGenerator.java:378-508]
      -> creates one task group per source parallelism index
      -> if sourceWithSink(flow)=true, calls splitSinkFromFlow(flow)
  -> PhysicalPlanGenerator.splitSinkFromFlow() [PhysicalPlanGenerator.java:569-600]
      -> rewrites sink behind IntermediateQueue
      -> puts the queue -> sink subflow into the same source task group
  -> TaskGroupWithIntermediateBlockingQueue.getQueueCache() [TaskGroupWithIntermediateBlockingQueue.java:49-70]
  -> TaskGroupWithIntermediateDisruptor.getQueueCache() [TaskGroupWithIntermediateDisruptor.java:49-73]
      -> queue/disruptor cache is task-group local, not shared across task groups

task runtime
  -> SeaTunnelTask.stateProcess() STARTING -> RUNNING [SeaTunnelTask.java:217-221]
      -> calls hook() on all cycles
  -> SourceFlowLifeCycle.hook() [SourceFlowLifeCycle.java:218-220]
      -> startFlushTimer() [344-355]
  -> timer tick
      -> SourceFlowLifeCycle.onTimerTick() [184-195]
          -> collector.sendRecordToNext(new Record<>(FlushSignal))
  -> SeaTunnelSourceCollector.sendRecordToNext() [SeaTunnelSourceCollector.java:191-196]
      -> broadcasts under checkpointLock within the current task-group outputs
  -> SeaTunnelTask.convertFlowToActionLifeCycle() [SeaTunnelTask.java:279-337]
      -> queue -> sink subflow becomes IntermediateQueueFlowLifeCycle -> SinkFlowLifeCycle
  -> TransformFlowLifeCycle.received(Signal) [TransformFlowLifeCycle.java:118-122]
      -> passes Signal through unchanged
  -> SinkFlowLifeCycle.received() [SinkFlowLifeCycle.java:260-267]
      -> runs writerContext.getFlushAction().run() only for FlushSignal and only when a flush action is registered

Key findings

  1. After re-tracing the current planner/runtime chain, the earlier cadence-multiplied-by-source-parallelism blocker does not hit the reachable runtime topology of the current Zeta implementation.
  2. The sink flow is cloned into source task groups through splitSinkFromFlow(...), and the intermediate queue/disruptor instances are task-group local, so the current runtime is a paired path rather than a shared sink-writer fan-in path.
  3. The remaining issue I see is non-blocking and documentation-facing: the option/docs wording currently reads like sink.flush.interval is immediately effective for sinks in general, while the runtime only flushes when a sink writer explicitly registers flushAction.

Review conclusion

Conclusion: can merge

  1. Blocking items
  • No code-level blocking issue remains in the current head after re-checking the real runtime topology.
  1. Suggested follow-up
  • Please tighten the wording in:
    • docs/en/introduction/configuration/JobEnvConfig.md:58-62
    • docs/zh/introduction/configuration/JobEnvConfig.md:59-63
    • seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java:88-94
  • Right now the runtime only flushes when SinkFlowLifeCycle.received() finds writerContext.getFlushAction() != null (SinkFlowLifeCycle.java:260-267), and the opt-in happens through SinkWriter.Context.registerFlushAction(...) (SinkWriter.java:122-143). Clarifying that this PR introduces the engine capability first, while connector adoption is follow-up work, would make the user-facing contract much clearer.
  1. CI note
  • The current Apache-side Build check is red. The linked fork workflow run failed in connector-jdbc-e2e-ddl on JDK 8, specifically org.apache.seatunnel.connectors.jdbc.SqlServerSchemaChangeIT with expected: <hawk9821> but was: <scooter>.
  • I checked the current upstream/dev changes in that JDBC E2E module path from this PR's merge-base to the latest head and did not find a corresponding fix there, so this does not currently look like a simple "rebase to pick up an existing dev-side fix" case.
  • Because of that, I treat the red CI as a separate follow-up rather than a blocker on the code-review conclusion above.

Overall, the direction here is good, and after re-checking the real task-planning/runtime topology, I am comfortable with the core engine path in this PR. Thanks again for the detailed clarification in the discussion; it helped narrow the review back to the code paths that are actually reachable today.

@nzw921rx
Copy link
Copy Markdown
Collaborator Author

Hi @nzw921rx, thanks for pushing back with the concrete topology example. I went back through the current Zeta planner/runtime chain end to end instead of continuing from my earlier assumption, and your latest point is correct: on the current topology I cannot reproduce the multiple source subtasks -> one shared sink writer fan-in path that my previous blocker relied on. So I am withdrawing that blocker.

I re-reviewed the latest head locally against upstream/dev as a source-level review. I did not run local Maven in this batch.

What this PR solves

  • User pain: buffered sinks can hold visible data for too long in low-throughput or briefly idle streaming jobs.
  • Fix approach: add an engine-level timer-flush capability. A source-side timer emits FlushSignal, transforms pass it through, and sinks execute writerContext.getFlushAction() when the writer explicitly registers one.
  • One-line summary: this PR lands the Zeta core path for STIP-23 Phase 1 and moves periodic flush handling onto the engine task path instead of connector-owned background threads.

Runtime chain I re-verified

union / fan-in planning
  -> PipelineGenerator.checkCanSplit() [PipelineGenerator.java:136-138]
      -> union inputs enter split logic
  -> PipelineGenerator.splitUnionVertex() [PipelineGenerator.java:140-183]
      -> rebuilds downstream vertices per source pipeline
      -> downstream parallelism follows the upstream pipeline branch

source -> sink physical plan
  -> PhysicalPlanGenerator.getSourceTask() [PhysicalPlanGenerator.java:378-508]
      -> creates one task group per source parallelism index
      -> if sourceWithSink(flow)=true, calls splitSinkFromFlow(flow)
  -> PhysicalPlanGenerator.splitSinkFromFlow() [PhysicalPlanGenerator.java:569-600]
      -> rewrites sink behind IntermediateQueue
      -> puts the queue -> sink subflow into the same source task group
  -> TaskGroupWithIntermediateBlockingQueue.getQueueCache() [TaskGroupWithIntermediateBlockingQueue.java:49-70]
  -> TaskGroupWithIntermediateDisruptor.getQueueCache() [TaskGroupWithIntermediateDisruptor.java:49-73]
      -> queue/disruptor cache is task-group local, not shared across task groups

task runtime
  -> SeaTunnelTask.stateProcess() STARTING -> RUNNING [SeaTunnelTask.java:217-221]
      -> calls hook() on all cycles
  -> SourceFlowLifeCycle.hook() [SourceFlowLifeCycle.java:218-220]
      -> startFlushTimer() [344-355]
  -> timer tick
      -> SourceFlowLifeCycle.onTimerTick() [184-195]
          -> collector.sendRecordToNext(new Record<>(FlushSignal))
  -> SeaTunnelSourceCollector.sendRecordToNext() [SeaTunnelSourceCollector.java:191-196]
      -> broadcasts under checkpointLock within the current task-group outputs
  -> SeaTunnelTask.convertFlowToActionLifeCycle() [SeaTunnelTask.java:279-337]
      -> queue -> sink subflow becomes IntermediateQueueFlowLifeCycle -> SinkFlowLifeCycle
  -> TransformFlowLifeCycle.received(Signal) [TransformFlowLifeCycle.java:118-122]
      -> passes Signal through unchanged
  -> SinkFlowLifeCycle.received() [SinkFlowLifeCycle.java:260-267]
      -> runs writerContext.getFlushAction().run() only for FlushSignal and only when a flush action is registered

Key findings

  1. After re-tracing the current planner/runtime chain, the earlier cadence-multiplied-by-source-parallelism blocker does not hit the reachable runtime topology of the current Zeta implementation.
  2. The sink flow is cloned into source task groups through splitSinkFromFlow(...), and the intermediate queue/disruptor instances are task-group local, so the current runtime is a paired path rather than a shared sink-writer fan-in path.
  3. The remaining issue I see is non-blocking and documentation-facing: the option/docs wording currently reads like sink.flush.interval is immediately effective for sinks in general, while the runtime only flushes when a sink writer explicitly registers flushAction.

Review conclusion

Conclusion: can merge

  1. Blocking items
  • No code-level blocking issue remains in the current head after re-checking the real runtime topology.
  1. Suggested follow-up
  • Please tighten the wording in:

    • docs/en/introduction/configuration/JobEnvConfig.md:58-62
    • docs/zh/introduction/configuration/JobEnvConfig.md:59-63
    • seatunnel-api/src/main/java/org/apache/seatunnel/api/options/EnvCommonOptions.java:88-94
  • Right now the runtime only flushes when SinkFlowLifeCycle.received() finds writerContext.getFlushAction() != null (SinkFlowLifeCycle.java:260-267), and the opt-in happens through SinkWriter.Context.registerFlushAction(...) (SinkWriter.java:122-143). Clarifying that this PR introduces the engine capability first, while connector adoption is follow-up work, would make the user-facing contract much clearer.

  1. CI note
  • The current Apache-side Build check is red. The linked fork workflow run failed in connector-jdbc-e2e-ddl on JDK 8, specifically org.apache.seatunnel.connectors.jdbc.SqlServerSchemaChangeIT with expected: <hawk9821> but was: <scooter>.
  • I checked the current upstream/dev changes in that JDBC E2E module path from this PR's merge-base to the latest head and did not find a corresponding fix there, so this does not currently look like a simple "rebase to pick up an existing dev-side fix" case.
  • Because of that, I treat the red CI as a separate follow-up rather than a blocker on the code-review conclusion above.

Overall, the direction here is good, and after re-checking the real task-planning/runtime topology, I am comfortable with the core engine path in this PR. Thanks again for the detailed clarification in the discussion; it helped narrow the review back to the code paths that are actually reachable today.

@DanielLeens Thanks for the suggestion.

Please see the Phase 3 (#10802 ) sink capability tables where timer flush support is listed per connector.

@DanielLeens
Copy link
Copy Markdown

Thanks for the follow-up. Pointing me to #10802 is helpful, because that connector capability table is exactly the kind of visibility follow-up I was asking for on the docs side.

I rechecked the current head locally again before replying. There is still no new source delta after my earlier approval, so this is a follow-up discussion review rather than a re-review of changed code.

What this PR solves

  • User pain: buffered sinks can keep data invisible for too long in low-throughput or briefly idle jobs.
  • Fix approach: Zeta emits a timed FlushSignal, transforms pass it through, and sinks only flush when the writer explicitly opts in through registerFlushAction(...).
  • One-line summary: this PR lands the engine-side Phase 1 path for STIP-23.

Runtime chain I rechecked

Zeta task creation
  -> SourceSeaTunnelTask.createSourceFlowLifeCycle()
      -> reads env.sink.flush.interval
      -> creates SourceFlowLifeCycle per source task

Runtime hook
  -> SourceFlowLifeCycle.hook()
      -> startFlushTimer()
      -> TaskExecutionService.registerTimerFlushTask(...)

Timer tick
  -> SourceFlowLifeCycle.onTimerTick()
      -> collector.sendRecordToNext(new Record<>(FlushSignal))

Middle stages
  -> TransformFlowLifeCycle.received(Signal)
      -> passes FlushSignal through unchanged

Sink side
  -> SinkFlowLifeCycle.received()
      -> runs writerContext.getFlushAction().run() only when a flush action was registered
  -> opt-in point
      -> SinkWriter.Context.registerFlushAction(...)

Current conclusion after this follow-up

  1. Your pointer to #10802 addresses the visibility concern I had about showing connector-by-connector timer-flush adoption.
  2. I do not see a reopened code-level blocker on the current head.
  3. My source-level conclusion on this revision is unchanged from the earlier approval.

One remaining practical note

  • gh pr checks 10800 --repo apache/seatunnel still shows the current Apache-side Build as red in fetched metadata, so the branch is not practically merge-ready yet even though I am not reopening a code blocker here.

Conclusion: merge after fixes

Blocking item:

  • No new source-level blocking issue from Daniel’s side.
  • The remaining merge gate is the current red Build check.

Non-blocking follow-up:

  • When #10802 lands, please keep the wording between this Phase 1 env option and the Phase 3 connector capability table aligned, so users do not read sink.flush.interval as “all sinks flush automatically” before the connector adoption table is in place.

Overall, thanks for the clarification. The Phase 3 docs pointer answers the follow-up I had, and my code-review conclusion on this head stays the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants