Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;

import static java.util.stream.Collectors.toList;
import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
Expand All @@ -62,6 +63,12 @@ public class IcebergCommitter implements Committer<WriteResultWrapper> {

public static final String TABLE_GROUP_KEY = "table";

/** Snapshot summary key for the batch index; used to resume partial commits on retry. */
static final String FLINK_BATCH_INDEX = "flink.batch-index";

/** Snapshot summary key for the checkpoint ID on intermediate batch commits. */
static final String FLINK_CHECKPOINT_ID_PROP = "flink.checkpoint-id";

private final Catalog catalog;

private final SinkCommitterMetricGroup metricGroup;
Expand Down Expand Up @@ -96,74 +103,140 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
if (writeResultWrappers.isEmpty()) {
return;
}
// all commits a same checkpoint-id
long checkpointId = writeResultWrappers.get(0).getCheckpointId();
String newFlinkJobId = writeResultWrappers.get(0).getJobId();
String operatorId = writeResultWrappers.get(0).getOperatorId();

Map<TableId, List<WriteResult>> tableMap = new HashMap<>();
for (WriteResultWrapper writeResultWrapper : writeResultWrappers) {
List<WriteResult> writeResult =
tableMap.getOrDefault(writeResultWrapper.getTableId(), new ArrayList<>());
writeResult.add(writeResultWrapper.getWriteResult());
tableMap.put(writeResultWrapper.getTableId(), writeResult);
LOGGER.info(writeResultWrapper.buildDescription());
Map<TableId, List<WriteResultWrapper>> tableMap = new HashMap<>();
for (WriteResultWrapper w : writeResultWrappers) {
tableMap.computeIfAbsent(w.getTableId(), k -> new ArrayList<>()).add(w);
}
for (Map.Entry<TableId, List<WriteResult>> entry : tableMap.entrySet()) {

for (Map.Entry<TableId, List<WriteResultWrapper>> entry : tableMap.entrySet()) {
TableId tableId = entry.getKey();

// Group by batchIndex so wrappers from different subtasks for the same batch
// are merged into one snapshot, not committed separately.
TreeMap<Integer, List<WriteResultWrapper>> batchGroups = new TreeMap<>();
for (WriteResultWrapper w : entry.getValue()) {
batchGroups.computeIfAbsent(w.getBatchIndex(), k -> new ArrayList<>()).add(w);
LOGGER.info(w.buildDescription());
}

Comment thread
spoorthibasu marked this conversation as resolved.
Table table =
catalog.loadTable(
TableIdentifier.of(tableId.getSchemaName(), tableId.getTableName()));

int startBatchIndex = 0;
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
Iterable<Snapshot> ancestors =
SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot);
long lastCheckpointId =
long lastCommittedCheckpointId =
getMaxCommittedCheckpointId(ancestors, newFlinkJobId, operatorId);
if (lastCheckpointId == checkpointId) {
if (lastCommittedCheckpointId >= checkpointId) {
LOGGER.warn(
"Checkpoint id {} has been committed to table {}, skipping",
checkpointId,
tableId.identifier());
continue;
}
ancestors = SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot);
startBatchIndex =
getLastCommittedBatchIndex(
ancestors, newFlinkJobId, operatorId, checkpointId)
+ 1;
}
Comment thread
spoorthibasu marked this conversation as resolved.

Optional<TableMetric> tableMetric = getTableMetric(tableId);
tableMetric.ifPresent(TableMetric::increaseCommitTimes);

List<WriteResult> results = entry.getValue();
List<DataFile> dataFiles =
results.stream()
.filter(payload -> payload.dataFiles() != null)
.flatMap(payload -> Arrays.stream(payload.dataFiles()))
.filter(dataFile -> dataFile.recordCount() > 0)
.collect(toList());
List<DeleteFile> deleteFiles =
results.stream()
.filter(payload -> payload.deleteFiles() != null)
.flatMap(payload -> Arrays.stream(payload.deleteFiles()))
.filter(deleteFile -> deleteFile.recordCount() > 0)
.collect(toList());
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
LOGGER.info(String.format("Nothing to commit to table %s, skipping", table.name()));
} else {
int lastNonEmptyBatchIndex = -1;
for (Map.Entry<Integer, List<WriteResultWrapper>> g : batchGroups.entrySet()) {
List<DataFile> df = collectDataFilesFromGroup(g.getValue());
List<DeleteFile> del = collectDeleteFilesFromGroup(g.getValue());
if (!df.isEmpty() || !del.isEmpty()) {
lastNonEmptyBatchIndex = g.getKey();
}
}

// Commit each batch as a separate snapshot so sequence numbers increase per batch.
for (Map.Entry<Integer, List<WriteResultWrapper>> g : batchGroups.entrySet()) {
int batchIdx = g.getKey();
if (batchIdx < startBatchIndex) {
LOGGER.info(
"Batch {} for checkpoint {} of table {} already committed, skipping",
batchIdx,
checkpointId,
tableId.identifier());
continue;
}

List<DataFile> dataFiles = collectDataFilesFromGroup(g.getValue());
List<DeleteFile> deleteFiles = collectDeleteFilesFromGroup(g.getValue());

if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
LOGGER.info(
"Batch {} for checkpoint {} of table {} has nothing to commit, skipping",
batchIdx,
checkpointId,
tableId.identifier());
continue;
}

SnapshotUpdate<?> operation;
if (deleteFiles.isEmpty()) {
AppendFiles append = table.newAppend();
dataFiles.forEach(append::appendFile);
commitOperation(append, newFlinkJobId, operatorId, checkpointId);
operation = append;
} else {
RowDelta delta = table.newRowDelta();
dataFiles.forEach(delta::addRows);
deleteFiles.forEach(delta::addDeletes);
commitOperation(delta, newFlinkJobId, operatorId, checkpointId);
operation = delta;
}

operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
operation.set(SinkUtil.OPERATOR_ID, operatorId);
operation.set(FLINK_BATCH_INDEX, String.valueOf(batchIdx));
operation.set(FLINK_CHECKPOINT_ID_PROP, String.valueOf(checkpointId));
if (batchIdx == lastNonEmptyBatchIndex) {
operation.set(
SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, String.valueOf(checkpointId));
}
operation.commit();
}
}
}

private static List<DataFile> collectDataFilesFromGroup(List<WriteResultWrapper> group) {
return group.stream()
.flatMap(w -> collectDataFiles(w.getWriteResult()).stream())
.collect(toList());
}

private static List<DeleteFile> collectDeleteFilesFromGroup(List<WriteResultWrapper> group) {
return group.stream()
.flatMap(w -> collectDeleteFiles(w.getWriteResult()).stream())
.collect(toList());
}

private static List<DataFile> collectDataFiles(WriteResult result) {
if (result.dataFiles() == null) {
return new ArrayList<>();
}
return Arrays.stream(result.dataFiles()).filter(f -> f.recordCount() > 0).collect(toList());
}

private static List<DeleteFile> collectDeleteFiles(WriteResult result) {
if (result.deleteFiles() == null) {
return new ArrayList<>();
}
return Arrays.stream(result.deleteFiles())
.filter(f -> f.recordCount() > 0)
.collect(toList());
}

private static long getMaxCommittedCheckpointId(
Iterable<Snapshot> ancestors, String flinkJobId, String operatorId) {
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID - 1;
Expand All @@ -185,15 +258,35 @@ private static long getMaxCommittedCheckpointId(
return lastCommittedCheckpointId;
}

private static void commitOperation(
SnapshotUpdate<?> operation,
String newFlinkJobId,
String operatorId,
long checkpointId) {
operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
operation.set(SinkUtil.OPERATOR_ID, operatorId);
operation.commit();
/**
* Returns the highest batch index already committed for the given checkpoint, or -1 if none.
* Used to skip already-persisted batches on retry.
*/
private static int getLastCommittedBatchIndex(
Iterable<Snapshot> ancestors, String flinkJobId, String operatorId, long checkpointId) {
for (Snapshot ancestor : ancestors) {
Map<String, String> summary = ancestor.summary();
if (!flinkJobId.equals(summary.get(SinkUtil.FLINK_JOB_ID))) {
continue;
}
String snapshotOperatorId = summary.get(SinkUtil.OPERATOR_ID);
if (snapshotOperatorId != null && !snapshotOperatorId.equals(operatorId)) {
continue;
}
// Stop once we pass a fully-committed earlier checkpoint; intermediate batch
// snapshots for the current checkpoint lie between it and the current tip.
String maxCommittedStr = summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID);
if (maxCommittedStr != null && Long.parseLong(maxCommittedStr) < checkpointId) {
break;
}
String snapshotCheckpointId = summary.get(FLINK_CHECKPOINT_ID_PROP);
if (snapshotCheckpointId != null
&& Long.parseLong(snapshotCheckpointId) == checkpointId) {
String batchIndexStr = summary.get(FLINK_BATCH_INDEX);
return batchIndexStr != null ? Integer.parseInt(batchIndexStr) : 0;
}
}
return -1;
}

private Optional<TableMetric> getTableMetric(TableId tableId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public class IcebergWriter

private final List<WriteResultWrapper> temporaryWriteResult;

/**
* Per-table batch index; incremented on each schema-change flush, even when no writer exists.
*/
private Map<TableId, Integer> tableBatchIndexMap;

private Catalog catalog;

private final int taskId;
Expand Down Expand Up @@ -102,6 +107,7 @@ public IcebergWriter(
writerFactoryMap = new HashMap<>();
writerMap = new HashMap<>();
schemaMap = new HashMap<>();
tableBatchIndexMap = new HashMap<>();
temporaryWriteResult = new ArrayList<>();
this.taskId = taskId;
this.attemptId = attemptId;
Expand Down Expand Up @@ -129,6 +135,7 @@ public Collection<WriteResultWrapper> prepareCommit() throws IOException {
list.addAll(temporaryWriteResult);
list.addAll(getWriteResult());
temporaryWriteResult.clear();
tableBatchIndexMap.clear();
lastCheckpointId++;
return list;
}
Expand Down Expand Up @@ -166,6 +173,11 @@ public void write(Event event, Context context) throws IOException {
} else {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
TableId tableId = schemaChangeEvent.tableId();
// Flush only when the table is already known; skip on initial CreateTableEvent since
// no data has been written yet and there is nothing to split.
if (schemaMap.containsKey(tableId)) {
flushTableWriter(tableId);
}
TableSchemaWrapper tableSchemaWrapper = schemaMap.get(tableId);

Schema newSchema =
Expand All @@ -179,21 +191,46 @@ public void write(Event event, Context context) throws IOException {

@Override
public void flush(boolean flush) throws IOException {
// Notice: flush method may be called many times during one checkpoint.
temporaryWriteResult.addAll(getWriteResult());
// Clear the factory cache so the next write picks up the latest catalog schema.
// Writers keep running; schema-change splits are handled in flushTableWriter.
writerFactoryMap.clear();
}

private void flushTableWriter(TableId tableId) throws IOException {
TaskWriter<RowData> writer = writerMap.remove(tableId);
// Advance even when no writer exists, to keep batchIndex in sync across subtasks.
int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
tableBatchIndexMap.put(tableId, batchIndex + 1);
if (writer == null) {
return;
}
WriteResultWrapper writeResultWrapper =
new WriteResultWrapper(
writer.complete(),
tableId,
lastCheckpointId + 1,
jobId,
operatorId,
batchIndex);
temporaryWriteResult.add(writeResultWrapper);
LOGGER.info(writeResultWrapper.buildDescription());
writerFactoryMap.remove(tableId);
}

private List<WriteResultWrapper> getWriteResult() throws IOException {
long currentCheckpointId = lastCheckpointId + 1;
List<WriteResultWrapper> writeResults = new ArrayList<>();
for (Map.Entry<TableId, TaskWriter<RowData>> entry : writerMap.entrySet()) {
TableId tableId = entry.getKey();
int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
WriteResultWrapper writeResultWrapper =
new WriteResultWrapper(
entry.getValue().complete(),
entry.getKey(),
tableId,
currentCheckpointId,
jobId,
operatorId);
operatorId,
batchIndex);
writeResults.add(writeResultWrapper);
LOGGER.info(writeResultWrapper.buildDescription());
}
Expand Down Expand Up @@ -225,6 +262,11 @@ public void close() throws Exception {
writerFactoryMap = null;
}

if (tableBatchIndexMap != null) {
tableBatchIndexMap.clear();
tableBatchIndexMap = null;
}

catalog = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,31 @@ public class WriteResultWrapper implements Serializable {

private final String operatorId;

/** Batch index within the checkpoint for this table; increments on each schema-change flush. */
private final int batchIndex;

public WriteResultWrapper(
WriteResult writeResult,
TableId tableId,
long checkpointId,
String jobId,
String operatorId) {
String operatorId,
int batchIndex) {
this.writeResult = writeResult;
this.tableId = tableId;
this.checkpointId = checkpointId;
this.jobId = jobId;
this.operatorId = operatorId;
this.batchIndex = batchIndex;
}

public WriteResultWrapper(
WriteResult writeResult,
TableId tableId,
long checkpointId,
String jobId,
String operatorId) {
this(writeResult, tableId, checkpointId, jobId, operatorId, 0);
}

public WriteResult getWriteResult() {
Expand All @@ -73,6 +87,10 @@ public String getOperatorId() {
return operatorId;
}

public int getBatchIndex() {
return batchIndex;
}

/** Build a simple description for the write result. */
public String buildDescription() {
long addCount = 0;
Expand All @@ -95,6 +113,8 @@ public String buildDescription() {
+ jobId
+ ", OperatorId: "
+ operatorId
+ ", BatchIndex: "
+ batchIndex
+ ", AddCount: "
+ addCount
+ ", DeleteCount: "
Expand Down
Loading
Loading