Skip to content

Commit 5075fec

Browse files
authored
[paimon] Store lake synchronized bucket offsets in Paimon snapshot property (#1405)
1 parent d3102b9 commit 5075fec

File tree

14 files changed

+446
-141
lines changed

14 files changed

+446
-141
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.committer;
19+
20+
import javax.annotation.Nullable;
21+
22+
import java.io.Serializable;
23+
import java.util.Objects;
24+
25+
/** The bucket offset information to be expected to be stored in Lake's snapshot property. */
26+
public class BucketOffset implements Serializable {
27+
28+
private static final long serialVersionUID = 1L;
29+
public static final String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY = "fluss-offsets";
30+
31+
private final long logOffset;
32+
private final int bucket;
33+
private final @Nullable Long partitionId;
34+
private final @Nullable String partitionName;
35+
36+
public BucketOffset(
37+
long logOffset,
38+
int bucket,
39+
@Nullable Long partitionId,
40+
@Nullable String partitionName) {
41+
this.logOffset = logOffset;
42+
this.bucket = bucket;
43+
this.partitionId = partitionId;
44+
this.partitionName = partitionName;
45+
}
46+
47+
public long getLogOffset() {
48+
return logOffset;
49+
}
50+
51+
public int getBucket() {
52+
return bucket;
53+
}
54+
55+
@Nullable
56+
public Long getPartitionId() {
57+
return partitionId;
58+
}
59+
60+
@Nullable
61+
public String getPartitionName() {
62+
return partitionName;
63+
}
64+
65+
@Override
66+
public boolean equals(Object o) {
67+
if (this == o) {
68+
return true;
69+
}
70+
if (o == null || getClass() != o.getClass()) {
71+
return false;
72+
}
73+
BucketOffset that = (BucketOffset) o;
74+
return bucket == that.bucket
75+
&& logOffset == that.logOffset
76+
&& Objects.equals(partitionId, that.partitionId)
77+
&& Objects.equals(partitionName, that.partitionName);
78+
}
79+
}

fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
public class CommittedLakeSnapshot {
3131

3232
private final long lakeSnapshotId;
33-
// <partition_name, bucket> -> log offset, partition_name will be null if it's not a
33+
// <partition_id, bucket> -> log offset, partition_id will be null if it's not a
3434
// partition bucket
35-
private final Map<Tuple2<String, Integer>, Long> logEndOffsets = new HashMap<>();
35+
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new HashMap<>();
3636

3737
public CommittedLakeSnapshot(long lakeSnapshotId) {
3838
this.lakeSnapshotId = lakeSnapshotId;
@@ -46,11 +46,11 @@ public void addBucket(int bucketId, long offset) {
4646
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
4747
}
4848

49-
public void addPartitionBucket(String partitionName, int bucketId, long offset) {
50-
logEndOffsets.put(Tuple2.of(partitionName, bucketId), offset);
49+
public void addPartitionBucket(Long partitionId, int bucketId, long offset) {
50+
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
5151
}
5252

53-
public Map<Tuple2<String, Integer>, Long> getLogEndOffsets() {
53+
public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
5454
return logEndOffsets;
5555
}
5656

fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.io.IOException;
2525
import java.util.List;
26+
import java.util.Map;
2627

2728
/**
2829
* The LakeCommitter interface for committing write results. It extends the AutoCloseable interface
@@ -48,10 +49,12 @@ public interface LakeCommitter<WriteResult, CommittableT> extends AutoCloseable
4849
* Commits the given committable object.
4950
*
5051
* @param committable the committable object
52+
* @param snapshotProperties the properties that lake supported to store in snapshot
5153
* @return the committed snapshot ID
5254
* @throws IOException if an I/O error occurs
5355
*/
54-
long commit(CommittableT committable) throws IOException;
56+
long commit(CommittableT committable, Map<String, String> snapshotProperties)
57+
throws IOException;
5558

5659
/**
5760
* Aborts the given committable object.
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.utils.json;
19+
20+
import com.alibaba.fluss.lake.committer.BucketOffset;
21+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
22+
import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
23+
24+
import java.io.IOException;
25+
26+
/** Json serializer and deserializer for {@link BucketOffset}. */
27+
public class BucketOffsetJsonSerde
28+
implements JsonSerializer<BucketOffset>, JsonDeserializer<BucketOffset> {
29+
30+
public static final BucketOffsetJsonSerde INSTANCE = new BucketOffsetJsonSerde();
31+
private static final String PARTITION_ID = "partition_id";
32+
private static final String BUCKET_ID = "bucket_id";
33+
private static final String PARTITION_NAME = "partition_name";
34+
private static final String LOG_OFFSET = "log_offset";
35+
36+
@Override
37+
public BucketOffset deserialize(JsonNode node) {
38+
JsonNode partitionIdNode = node.get(PARTITION_ID);
39+
Long partitionId = partitionIdNode == null ? null : partitionIdNode.asLong();
40+
int bucketId = node.get(BUCKET_ID).asInt();
41+
42+
// deserialize partition name
43+
JsonNode partitionNameNode = node.get(PARTITION_NAME);
44+
String partitionName = partitionNameNode == null ? null : partitionNameNode.asText();
45+
46+
// deserialize log offset
47+
long logOffset = node.get(LOG_OFFSET).asLong();
48+
49+
return new BucketOffset(logOffset, bucketId, partitionId, partitionName);
50+
}
51+
52+
@Override
53+
public void serialize(BucketOffset bucketOffset, JsonGenerator generator) throws IOException {
54+
generator.writeStartObject();
55+
56+
// write partition id
57+
if (bucketOffset.getPartitionId() != null) {
58+
generator.writeNumberField(PARTITION_ID, bucketOffset.getPartitionId());
59+
}
60+
generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());
61+
62+
// serialize partition name
63+
if (bucketOffset.getPartitionName() != null) {
64+
generator.writeStringField(PARTITION_NAME, bucketOffset.getPartitionName());
65+
}
66+
67+
// serialize bucket offset
68+
generator.writeNumberField(LOG_OFFSET, bucketOffset.getLogOffset());
69+
70+
generator.writeEndObject();
71+
}
72+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.utils.json;
19+
20+
import com.alibaba.fluss.lake.committer.BucketOffset;
21+
22+
/** Test for {@link BucketOffset}. */
23+
public class BucketOffsetJsonSerdeTest extends JsonSerdeTestBase<BucketOffset> {
24+
25+
BucketOffsetJsonSerdeTest() {
26+
super(BucketOffsetJsonSerde.INSTANCE);
27+
}
28+
29+
@Override
30+
protected BucketOffset[] createObjects() {
31+
return new BucketOffset[] {
32+
new BucketOffset(10, 1, 1L, "eu-central$2023$12"), new BucketOffset(20, 2, null, null)
33+
};
34+
}
35+
36+
@Override
37+
protected String[] expectedJsons() {
38+
return new String[] {
39+
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"eu-central$2023$12\",\"log_offset\":10}",
40+
"{\"bucket_id\":2,\"log_offset\":20}"
41+
};
42+
}
43+
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,14 @@ public void commit(
8585
// construct lake snapshot to commit to Fluss
8686
FlussTableLakeSnapshot flussTableLakeSnapshot =
8787
new FlussTableLakeSnapshot(tableId, committedLakeSnapshot.getLakeSnapshotId());
88-
for (Map.Entry<Tuple2<String, Integer>, Long> entry :
88+
for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
8989
committedLakeSnapshot.getLogEndOffsets().entrySet()) {
90-
Tuple2<String, Integer> partitionBucket = entry.getKey();
90+
Tuple2<Long, Integer> partitionBucket = entry.getKey();
9191
TableBucket tableBucket;
9292
if (partitionBucket.f0 == null) {
9393
tableBucket = new TableBucket(tableId, partitionBucket.f1);
9494
} else {
95-
String partitionName = partitionBucket.f0;
96-
// todo: remove this
97-
// in paimon 1.12, we can store this offsets(including partitionId) into snapshot
98-
// properties, then, we won't need to get partitionId from partition name
99-
Long partitionId = partitionIdByName.get(partitionName);
95+
Long partitionId = partitionBucket.f0;
10096
if (partitionId != null) {
10197
tableBucket = new TableBucket(tableId, partitionId, partitionBucket.f1);
10298
} else {

0 commit comments

Comments
 (0)