Skip to content

Commit de2d9a0

Browse files
authored
[lake] Move parse offsets from snapshot property to common method (apache#2208)
1 parent 0964d86 commit de2d9a0

File tree

10 files changed

+116
-167
lines changed

10 files changed

+116
-167
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,30 @@
1717

1818
package org.apache.fluss.lake.committer;
1919

20-
import org.apache.fluss.utils.types.Tuple2;
21-
22-
import java.util.HashMap;
2320
import java.util.Map;
2421
import java.util.Objects;
2522

2623
/**
27-
* The lake already committed snapshot, containing the lake snapshot id and the bucket end offsets
28-
* in this snapshot.
24+
* The lake already committed snapshot, containing the lake snapshot id and the properties stored in
25+
* this snapshot.
2926
*/
3027
public class CommittedLakeSnapshot {
3128

3229
private final long lakeSnapshotId;
33-
// <partition_id, bucket> -> log offset, partition_id will be null if it's not a
34-
// partition bucket
35-
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new HashMap<>();
3630

37-
public CommittedLakeSnapshot(long lakeSnapshotId) {
31+
private final Map<String, String> snapshotProperties;
32+
33+
public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String> snapshotProperties) {
3834
this.lakeSnapshotId = lakeSnapshotId;
35+
this.snapshotProperties = snapshotProperties;
3936
}
4037

4138
public long getLakeSnapshotId() {
4239
return lakeSnapshotId;
4340
}
4441

45-
public void addBucket(int bucketId, long offset) {
46-
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
47-
}
48-
49-
public void addPartitionBucket(Long partitionId, int bucketId, long offset) {
50-
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
51-
}
52-
53-
public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
54-
return logEndOffsets;
42+
public Map<String, String> getSnapshotProperties() {
43+
return snapshotProperties;
5544
}
5645

5746
@Override
@@ -64,21 +53,21 @@ public boolean equals(Object o) {
6453
}
6554
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
6655
return lakeSnapshotId == that.lakeSnapshotId
67-
&& Objects.equals(logEndOffsets, that.logEndOffsets);
56+
&& Objects.equals(snapshotProperties, that.snapshotProperties);
6857
}
6958

7059
@Override
7160
public int hashCode() {
72-
return Objects.hash(lakeSnapshotId, logEndOffsets);
61+
return Objects.hash(lakeSnapshotId, snapshotProperties);
7362
}
7463

7564
@Override
7665
public String toString() {
7766
return "CommittedLakeSnapshot{"
7867
+ "lakeSnapshotId="
7968
+ lakeSnapshotId
80-
+ ", logEndOffsets="
81-
+ logEndOffsets
69+
+ ", snapshotProperties="
70+
+ snapshotProperties
8271
+ '}';
8372
}
8473
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.client.metadata.MetadataUpdater;
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
23-
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
2423
import org.apache.fluss.metadata.TableBucket;
2524
import org.apache.fluss.metrics.registry.MetricRegistry;
2625
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -31,7 +30,6 @@
3130
import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo;
3231
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
3332
import org.apache.fluss.utils.ExceptionUtils;
34-
import org.apache.fluss.utils.types.Tuple2;
3533

3634
import java.io.IOException;
3735
import java.util.Map;
@@ -75,17 +73,13 @@ void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException {
7573
}
7674
}
7775

78-
public void commit(long tableId, CommittedLakeSnapshot committedLakeSnapshot)
76+
public void commit(long tableId, long snapshotId, Map<TableBucket, Long> logEndOffsets)
7977
throws IOException {
8078
// construct lake snapshot to commit to Fluss
8179
FlussTableLakeSnapshot flussTableLakeSnapshot =
82-
new FlussTableLakeSnapshot(tableId, committedLakeSnapshot.getLakeSnapshotId());
83-
for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
84-
committedLakeSnapshot.getLogEndOffsets().entrySet()) {
85-
Tuple2<Long, Integer> partitionBucket = entry.getKey();
86-
Long partitionId = partitionBucket.f0;
87-
TableBucket tableBucket = new TableBucket(tableId, partitionId, partitionBucket.f1);
88-
flussTableLakeSnapshot.addBucketOffset(tableBucket, entry.getValue());
80+
new FlussTableLakeSnapshot(tableId, snapshotId);
81+
for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
82+
flussTableLakeSnapshot.addBucketOffset(entry.getKey(), entry.getValue());
8983
}
9084
commit(flussTableLakeSnapshot);
9185
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@
3434
import org.apache.fluss.lake.writer.LakeTieringFactory;
3535
import org.apache.fluss.lake.writer.LakeWriter;
3636
import org.apache.fluss.metadata.TableBucket;
37-
import org.apache.fluss.metadata.TableInfo;
3837
import org.apache.fluss.metadata.TablePath;
3938
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
4039
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
40+
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
41+
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
4142
import org.apache.fluss.utils.ExceptionUtils;
4243
import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
4344

@@ -86,6 +87,8 @@ public class TieringCommitOperator<WriteResult, Committable>
8687
implements OneInputStreamOperator<
8788
TableBucketWriteResult<WriteResult>, CommittableMessage<Committable>> {
8889

90+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
91+
8992
private static final long serialVersionUID = 1L;
9093

9194
private final Configuration flussConfig;
@@ -223,6 +226,7 @@ private Committable commitWriteResults(
223226
Committable committable = lakeCommitter.toCommittable(writeResults);
224227
// before commit to lake, check fluss not missing any lake snapshot committed by fluss
225228
checkFlussNotMissingLakeSnapshot(
229+
tableId,
226230
tablePath,
227231
lakeCommitter,
228232
committable,
@@ -303,6 +307,7 @@ private LakeSnapshot getLatestLakeSnapshot(TablePath tablePath) throws Exception
303307
}
304308

305309
private void checkFlussNotMissingLakeSnapshot(
310+
long tableId,
306311
TablePath tablePath,
307312
LakeCommitter<WriteResult, Committable> lakeCommitter,
308313
Committable committable,
@@ -318,10 +323,33 @@ private void checkFlussNotMissingLakeSnapshot(
318323
// known lake snapshot, which means the data already has been committed to lake,
319324
// not to commit to lake to avoid data duplicated
320325
if (missingCommittedSnapshot != null) {
326+
if (missingCommittedSnapshot.getSnapshotProperties() == null
327+
|| missingCommittedSnapshot
328+
.getSnapshotProperties()
329+
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)
330+
== null) {
331+
throw new IllegalStateException(
332+
String.format(
333+
"Missing required log offsets property '%s' in lake snapshot %d for table: ‘tablePath=%s, tableId=%d’. "
334+
+ "This property is required to commit the missing snapshot to Fluss. "
335+
+ "The snapshot may have been created by an older version of Fluss that did not store this information, "
336+
+ "or the snapshot properties may be corrupted.",
337+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
338+
missingCommittedSnapshot.getLakeSnapshotId(),
339+
tablePath,
340+
tableId));
341+
}
342+
343+
String logOffsetsProperty =
344+
missingCommittedSnapshot
345+
.getSnapshotProperties()
346+
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
347+
321348
// commit this missing snapshot to fluss
322-
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
323349
flussTableLakeSnapshotCommitter.commit(
324-
tableInfo.getTableId(), missingCommittedSnapshot);
350+
tableId,
351+
missingCommittedSnapshot.getLakeSnapshotId(),
352+
fromLogOffsetProperty(tableId, logOffsetsProperty));
325353
// abort this committable to delete the written files
326354
lakeCommitter.abort(committable);
327355
throw new IllegalStateException(
@@ -331,12 +359,25 @@ private void checkFlussNotMissingLakeSnapshot(
331359
+ " missing snapshot: %s.",
332360
flussCurrentLakeSnapshot,
333361
missingCommittedSnapshot.getLakeSnapshotId(),
334-
tableInfo.getTablePath(),
335-
tableInfo.getTableId(),
362+
tablePath,
363+
tableId,
336364
missingCommittedSnapshot));
337365
}
338366
}
339367

368+
public static Map<TableBucket, Long> fromLogOffsetProperty(
369+
long tableId, String logOffsetsProperty) throws IOException {
370+
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
371+
for (JsonNode node : OBJECT_MAPPER.readTree(logOffsetsProperty)) {
372+
BucketOffset bucketOffset = BucketOffsetJsonSerde.INSTANCE.deserialize(node);
373+
TableBucket tableBucket =
374+
new TableBucket(
375+
tableId, bucketOffset.getPartitionId(), bucketOffset.getBucket());
376+
logEndOffsets.put(tableBucket, bucketOffset.getLogOffset());
377+
}
378+
return logEndOffsets;
379+
}
380+
340381
private void registerTableBucketWriteResult(
341382
long tableId, TableBucketWriteResult<WriteResult> tableBucketWriteResult) {
342383
collectedTableBucketWriteResults

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.fluss.client.metadata.LakeSnapshot;
2121
import org.apache.fluss.flink.utils.FlinkTestBase;
22-
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
2322
import org.apache.fluss.metadata.TableBucket;
2423
import org.apache.fluss.metadata.TablePath;
2524

@@ -71,7 +70,6 @@ void testCommit(boolean isPartitioned) throws Exception {
7170

7271
List<String> partitions;
7372
Map<String, Long> partitionNameAndIds = new HashMap<>();
74-
Map<Long, String> expectedPartitionNameById = new HashMap<>();
7573
if (!isPartitioned) {
7674
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
7775
partitions = Collections.singletonList(null);
@@ -80,32 +78,27 @@ void testCommit(boolean isPartitioned) throws Exception {
8078
partitions = new ArrayList<>(partitionNameAndIds.keySet());
8179
}
8280

83-
CommittedLakeSnapshot committedLakeSnapshot = new CommittedLakeSnapshot(3);
84-
85-
Map<TableBucket, Long> expectedOffsets = new HashMap<>();
81+
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
8682
for (int bucket = 0; bucket < 3; bucket++) {
8783
long bucketOffset = bucket * bucket;
8884
for (String partitionName : partitions) {
8985
if (partitionName == null) {
90-
committedLakeSnapshot.addBucket(bucket, bucketOffset);
91-
expectedOffsets.put(new TableBucket(tableId, bucket), bucketOffset);
86+
logEndOffsets.put(new TableBucket(tableId, bucket), bucketOffset);
9287
} else {
9388
long partitionId = partitionNameAndIds.get(partitionName);
94-
committedLakeSnapshot.addPartitionBucket(partitionId, bucket, bucketOffset);
95-
expectedOffsets.put(
96-
new TableBucket(tableId, partitionId, bucket), bucketOffset);
97-
expectedPartitionNameById.put(partitionId, partitionName);
89+
logEndOffsets.put(new TableBucket(tableId, partitionId, bucket), bucketOffset);
9890
}
9991
}
10092
}
10193

94+
long snapshotId = 3;
10295
// commit offsets
103-
flussTableLakeSnapshotCommitter.commit(tableId, committedLakeSnapshot);
96+
flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, logEndOffsets);
10497
LakeSnapshot lakeSnapshot = admin.getLatestLakeSnapshot(tablePath).get();
10598
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);
10699

107100
// get and check the offsets
108101
Map<TableBucket, Long> bucketLogOffsets = lakeSnapshot.getTableBucketsOffset();
109-
assertThat(bucketLogOffsets).isEqualTo(expectedOffsets);
102+
assertThat(bucketLogOffsets).isEqualTo(logEndOffsets);
110103
}
111104
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
2929
import org.apache.fluss.metadata.TableBucket;
3030
import org.apache.fluss.metadata.TablePath;
31-
import org.apache.fluss.utils.types.Tuple2;
3231

3332
import org.apache.flink.configuration.Configuration;
3433
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -50,12 +49,16 @@
5049

5150
import javax.annotation.Nullable;
5251

52+
import java.io.IOException;
5353
import java.util.ArrayList;
5454
import java.util.Collections;
5555
import java.util.HashMap;
5656
import java.util.List;
5757
import java.util.Map;
5858

59+
import static org.apache.fluss.flink.tiering.committer.TieringCommitOperator.fromLogOffsetProperty;
60+
import static org.apache.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
61+
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
5962
import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
6063
import static org.assertj.core.api.Assertions.assertThat;
6164
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -254,8 +257,12 @@ void testCommitMeetsEmptyWriteResult() throws Exception {
254257

255258
@Test
256259
void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
260+
TablePath tablePath = TablePath.of("fluss", "test_commit_when_fluss_missing_lake_snapshot");
261+
long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
262+
int numberOfWriteResults = 3;
263+
257264
CommittedLakeSnapshot mockCommittedSnapshot =
258-
mockCommittedLakeSnapshot(Collections.singletonList(null), 2);
265+
mockCommittedLakeSnapshot(Collections.singletonList(null), tableId, 2);
259266
TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter =
260267
new TestingLakeTieringFactory.TestingLakeCommitter(mockCommittedSnapshot);
261268
committerOperator =
@@ -266,10 +273,6 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
266273
new TestingLakeTieringFactory(testingLakeCommitter));
267274
committerOperator.open();
268275

269-
TablePath tablePath = TablePath.of("fluss", "test_commit_when_fluss_missing_lake_snapshot");
270-
long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
271-
int numberOfWriteResults = 3;
272-
273276
for (int bucket = 0; bucket < 3; bucket++) {
274277
TableBucket tableBucket = new TableBucket(tableId, bucket);
275278
committerOperator.processElement(
@@ -316,7 +319,7 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
316319
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
317320

318321
CommittedLakeSnapshot mockCommittedSnapshot =
319-
mockCommittedLakeSnapshot(Collections.singletonList(null), 3);
322+
mockCommittedLakeSnapshot(Collections.singletonList(null), tableId, 3);
320323
TestingLakeTieringFactory.TestingLakeCommitter testingLakeCommitter =
321324
new TestingLakeTieringFactory.TestingLakeCommitter(mockCommittedSnapshot);
322325
committerOperator =
@@ -359,36 +362,24 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
359362
mockCommittedSnapshot));
360363
}
361364

362-
private CommittedLakeSnapshot mockCommittedLakeSnapshot(List<Long> partitions, int snapshotId) {
363-
CommittedLakeSnapshot mockCommittedSnapshot = new CommittedLakeSnapshot(snapshotId);
365+
private CommittedLakeSnapshot mockCommittedLakeSnapshot(
366+
List<Long> partitions, long tableId, int snapshotId) throws IOException {
367+
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
364368
for (Long partition : partitions) {
365369
for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) {
366-
if (partition == null) {
367-
mockCommittedSnapshot.addBucket(bucket, bucket + 1);
368-
} else {
369-
mockCommittedSnapshot.addPartitionBucket(partition, bucket, bucket + 1);
370-
}
370+
logEndOffsets.put(new TableBucket(tableId, partition, bucket), bucket + 1L);
371371
}
372372
}
373-
return mockCommittedSnapshot;
373+
return new CommittedLakeSnapshot(snapshotId, toBucketOffsetsProperty(logEndOffsets));
374374
}
375375

376376
private Map<TableBucket, Long> getExpectedLogEndOffsets(
377-
long tableId, CommittedLakeSnapshot committedLakeSnapshot) {
378-
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
379-
for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
380-
committedLakeSnapshot.getLogEndOffsets().entrySet()) {
381-
Tuple2<Long, Integer> partitionBucket = entry.getKey();
382-
if (partitionBucket.f0 == null) {
383-
expectedLogEndOffsets.put(
384-
new TableBucket(tableId, partitionBucket.f1), entry.getValue());
385-
} else {
386-
expectedLogEndOffsets.put(
387-
new TableBucket(tableId, partitionBucket.f0, partitionBucket.f1),
388-
entry.getValue());
389-
}
390-
}
391-
return expectedLogEndOffsets;
377+
long tableId, CommittedLakeSnapshot committedLakeSnapshot) throws IOException {
378+
return fromLogOffsetProperty(
379+
tableId,
380+
committedLakeSnapshot
381+
.getSnapshotProperties()
382+
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
392383
}
393384

394385
private StreamRecord<TableBucketWriteResult<TestingWriteResult>>

0 commit comments

Comments
 (0)