-
Notifications
You must be signed in to change notification settings - Fork 692
Description
Background
Currently, only iceberg-sink supports exactly-once semantic. The logic to support exactly once is embedded inside the implementation of iceberg-sink. It works like the following:
IcebergSinkWriter receives checkpoint barrier from LogReader
-> finish file writer and send CommitRequest to SinkCoordinator
-> SinkCoordinator collects CommitRequests from all parallelisms
-> SinkCoordinator calls IcebergSinkCommitter::commit
-> IcebergSinkCommitter wait for checkpoint of the epoch to commit
-> IcebergSinkCommitter persists the metadata to commit with epoch in a meta store table
-> IcebergSinkCommitter idempotently commit the metadata to external iceberg table
-> IcebergSinkCommitter mark the epoch as committed, and return from IcebergSinkCommitter::commit
-> SinkCoordinator send CommitResponse to IcebergSinkWriter
-> IcebergSinkWriter ack on the checkpoint barrier
In recovery, in IcebergSinkCommitter::init, it checks meta store table for uncommitted metadata, and idempotently commits the pending metadata, and mark the epoch as committed.
The current implementation has the following drawbacks:
- it requires that the sink has enabled sink_decouple, because it acks the checkpoint barrier after the checkpoint has finished, and without the log store enabled by decoupled sink, there will be deadlock.
- the finish of IcebergSinkCommitter::commit should wait for the checkpoint, which greatly increases the latency and the IcebergSinkWriter is actually blocked by the checkpoint of upstream streaming job checkpoint.
- the logic to support exactly-once (e.g., store and update meta store table) is embedded inside the implementation of IcebergSinkCommitter, which makes it hard to be reused for other sinks to support exactly-once.
Design
In the new design, the 2-phase commit semantic will be inherently expressed in the
#[async_trait]
pub trait SinkCommitCoordinator {
async fn init(&mut self) -> Result<Option<u64>>;
async fn pre_commit(
&mut self,
epoch: u64,
metadata: Vec<SinkMetadata>,
add_columns: Option<Vec<Field>>,
) -> Result<Option<Bytes>>;
async fn commit(&mut self, commit_metadata: Bytes) -> Result<()>;
async fn abort(&mut self, commit_metadata: Bytes); // no error
}
We will have a meta store table to store the metadata of pending uncommitted epoch. The meta store table will have the following schema
CREATE TABLE pending_sink_state (
sink_id int,
epoch bigint,
status int, // an enum with variant Pending, Aborted, Committed
metadata bytea,
primary key (sink_id, epoch)
);
The general flow works like the following:
IcebergSinkWriter receives checkpoint barrier from LogReader
-> finish file writer and send CommitRequest to SinkCoordinator
-> SinkCoordinator collects CommitRequests from all parallelisms
-> SinkCoordinator calls IcebergSinkCommitter::pre_commit and get the serialized metadata to commit later
-> SinkCoordinator persists (sink_id, epoch, Status::Pending, metadata) to meta store table
-> SinkCoordinator send CommitResponse to IcebergSinkWriter
-> IcebergSinkWriter ack on the checkpoint barrier
-> SinkCoordinator wait for checkpoint of the epoch to commit
-> SinkCoordinator calls IcebergSinkCommitter::commit
-> SinkCoordinator mark the status of the epoch in meta store table as committed
In recovery, since now we persists the metadata to meta store table before checkpoint, we may have metadata of dirty epochs in the meta store table. Therefore, we need to mark the status of the epochs greater than the latest committed epoch as Aborted in the meta store table.
After recovery, when the sink executors are launched and then launch the SinkCoordinator, SinkCoordinator will retrieve all epochs in the meta store table, and call abort
for epochs with status as Aborted, and call commit
for epochs with status as Pending
.
Future Optimizations
No response
Discussions
No response
Q&A
No response