Skip to content

[STIP-23] [Feature][Zeta] Engine-level timer flush support for Sink connectors #10717

@nzw921rx

Description

@nzw921rx

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

1. Why the Engine Needs to Provide Timer Flush

Currently, if a Sink connector wants to implement time-based flushing (e.g. batch_interval_ms), there are only two approaches, both with significant drawbacks:

Approach A: Connector-internal ScheduledExecutorService background thread

  • Flush failures can only be stored in flushException and are not surfaced until the next writeRecord / prepareCommit call
  • Under low throughput or idle partitions, the task may stay RUNNING long after the sink has already failed
  • Background thread introduces race conditions with checkpoint and schema evolution paths

Approach B: Synchronous interval check inside writeRecord()

  • Safe, fail-fast, no race conditions
  • But the check only runs when new data arrives; after the source stops sending, buffered data can only be flushed at checkpoint (prepareCommit) or close

Core contradiction: At the connector level, it is impossible to achieve both "wall-clock-driven flush independent of data flow" and "fail-fast error propagation" — because the Zeta engine does not provide the ability to execute timed callbacks on the task thread.

2. Current Situation

Connector Approach Problem
MongoDB Sink Approach B — batchIntervalMs only checked in write() path, no background thread No flush during idle periods
JDBC Sink (#10609) Initially used Approach A, switched to Approach B after async failure propagation was blocked in review Same as above
StarRocks / Doris etc. Each implements independently or does not support time-based flush No unified guarantee, each on its own

Summary: Either skip time-based flush entirely, have each connector spawn its own thread (unsafe), or use synchronous checks (no idle coverage). There is no unified, safe solution — the root cause is that the engine lacks this capability.

3. Benefits

If the engine provides unified support, the benefits are:

  • Safety: Connectors no longer need to manage background threads, eliminating async race conditions and delayed error propagation
  • Consistency: batch_interval_ms semantics become uniform across all Sinks (JDBC, MongoDB, StarRocks, Doris, etc.) — users won't encounter different behavior when switching connectors
  • Unified flush standard: All connectors follow the same engine-provided flush mechanism, instead of each reinventing "timer flush + error propagation + lifecycle management" independently
  • Idle scenario coverage: True "flush buffered data on wall-clock time even with zero traffic", filling the gap in the current synchronous approach

4. Proposed Direction

Core idea: The Zeta engine provides a timed callback capability on the task thread, allowing Sink connectors to register periodic actions without managing their own threads.

Expected outcomes:

  • Callbacks execute on the task thread — no extra threads, no race conditions, exceptions propagate immediately (fail-fast)
  • Engine manages the lifecycle uniformly (auto-cancel at close / checkpoint boundaries)
  • Universal for all Sinks, no need for each connector to reinvent the wheel

Compatibility with Exactly-Once / 2PC:

Whether the timer flush callback should fire in transactional mode depends on the connector's transaction model:

Sink Type Meaning of flush Is timer flush safe?
JDBC XA executeBatch() runs within the current XA transaction; data is not visible until commit Safe — only moves in-memory batch to database transaction buffer, does not affect transaction boundaries, reduces memory pressure
Doris / StarRocks 2PC stopLoad() ends the current stream load and produces a new txnId Not suitable — timer flush would interrupt the stream load and create additional transaction boundaries; should only be triggered at checkpoint
MongoDB transactions bulkWrite() runs within the current session transaction Safe — similar to JDBC XA, does not change transaction boundaries

Therefore, the engine's timer callback should be an opt-in capability, not mandatory behavior:

  • In exactly-once mode, connectors can choose not to register a timer (e.g. Doris 2PC), relying entirely on checkpoint-driven flushes
  • Connectors can also choose to register a timer to reduce memory pressure (e.g. JDBC XA, where flush executes safely within the transaction)
  • The decision is left to the connector; the engine makes no assumptions

The specific API shape (whether it lives in SinkWriter.Context or as a separate interface, etc.) should be discussed and agreed upon by the community before entering detailed design.

5. Non-Goals

  • No changes to the existing writeRecord / prepareCommit / close lifecycle — the timer callback is a supplement, not a replacement
  • Not implemented at the connector level — the purpose of this proposal is to elevate the timer capability from connectors to the engine
  • Does not cover the translation layer (Flink / Spark) — not supported in Phase 1; can be discussed separately later
  • No redesign of checkpoint semanticsprepareCommit still performs a normal flush at checkpoint; the timer callback addresses the gap between two checkpoints
  • No unification of batch_size at the engine levelbatch_size is a connector-specific batching threshold; different connectors use different buffer structures (JDBC batch, List<byte[]>, stream load pipe, etc.) and manage it themselves. The timer flush capability is fundamentally different: it requires the engine to provide wall-clock-driven callbacks on the task thread. Only the engine can deliver unified scheduling and lifecycle management, preventing each Sink from spinning up its own threads independently.

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions