-
Notifications
You must be signed in to change notification settings - Fork 57
Open
Labels
type/enhancementNew feature or requestNew feature or request
Description
Enhancement
When I start use flink-tidb-connector-1.14 to sink data to TiDB, refer to README_unified_batch_streaming.md

But insert data Too little,only three rows and tikv.sink.buffer-size default 1000,So can't trigger flush rows.
Code block: TiDBWriteOperator
@Override
public void processElement(StreamRecord<Row> element) throws Exception {
Row row = element.getValue();
if (buffer.isFull()) { // only judge rows size
flushRows();
}
boolean added = buffer.add(row);
if (!added && !sinkOptions.isDeduplicate()) {
throw new IllegalStateException(
"Duplicate index in one batch, please enable deduplicate, row = " + row);
}
}The issue:
Set default commit time,for example: tikv.sink.max.wait.ms: 5000
- When checking the number of rows each time, it is judged whether the current time has expired. If the number of rows has not been reached, but the time has been reached, flush rows
- Separately judge whether to process overtime, set up a single consumption pipeline, regularly check whether the time is overtime, and flush it when the time is reached.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type/enhancementNew feature or requestNew feature or request