Skip to content

Commit f56f58d

Browse files
committed
[fluss-lake] Introduce TieringKvSplit and TieringLogSplit for Flink lake tiering service
1 parent 0b6d63f commit f56f58d

File tree

11 files changed

+553
-209
lines changed

11 files changed

+553
-209
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.source.split;
18+
19+
import com.alibaba.fluss.metadata.TableBucket;
20+
import com.alibaba.fluss.metadata.TablePath;
21+
22+
import javax.annotation.Nullable;
23+
24+
import java.util.Objects;
25+
26+
/**
27+
* The table split for tiering service. It's used to describe the snapshot data of a KV table
28+
* bucket.
29+
*/
30+
public class TieringKvSplit extends TieringSplit {
31+
32+
private static final String TIERING_KV_SPLIT_PREFIX = "tiering-kv-split-";
33+
34+
/** The snapshot id. It's used to identify the snapshot for a kv bucket. */
35+
private final long snapshotId;
36+
37+
/** The log offset corresponding to the KV table bucket snapshot finished. */
38+
private final long logOffsetOfSnapshot;
39+
40+
/** The records to skip when reading the snapshot. */
41+
private final long recordsToSkip;
42+
43+
public TieringKvSplit(
44+
TablePath tablePath,
45+
TableBucket tableBucket,
46+
@Nullable String partitionName,
47+
long snapshotId,
48+
long logOffsetOfSnapshot,
49+
long recordsToSkip) {
50+
super(tablePath, tableBucket, partitionName);
51+
this.snapshotId = snapshotId;
52+
this.logOffsetOfSnapshot = logOffsetOfSnapshot;
53+
this.recordsToSkip = recordsToSkip;
54+
}
55+
56+
@Override
57+
public String splitId() {
58+
return toSplitId(TIERING_KV_SPLIT_PREFIX, this.tableBucket);
59+
}
60+
61+
public long getSnapshotId() {
62+
return snapshotId;
63+
}
64+
65+
public long getLogOffsetOfSnapshot() {
66+
return logOffsetOfSnapshot;
67+
}
68+
69+
public long getRecordsToSkip() {
70+
return recordsToSkip;
71+
}
72+
73+
@Override
74+
public String toString() {
75+
return "TieringKvSplit{"
76+
+ "tablePath="
77+
+ tablePath
78+
+ ", tableBucket="
79+
+ tableBucket
80+
+ ", partitionName='"
81+
+ partitionName
82+
+ '\''
83+
+ ", snapshotId="
84+
+ snapshotId
85+
+ ", logOffsetOfSnapshot="
86+
+ logOffsetOfSnapshot
87+
+ ", recordsToSkip="
88+
+ recordsToSkip
89+
+ '}';
90+
}
91+
92+
@Override
93+
public boolean equals(Object object) {
94+
if (!(object instanceof TieringKvSplit)) {
95+
return false;
96+
}
97+
TieringKvSplit that = (TieringKvSplit) object;
98+
return snapshotId == that.snapshotId
99+
&& logOffsetOfSnapshot == that.logOffsetOfSnapshot
100+
&& recordsToSkip == that.recordsToSkip;
101+
}
102+
103+
@Override
104+
public int hashCode() {
105+
return Objects.hash(snapshotId, logOffsetOfSnapshot, recordsToSkip);
106+
}
107+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.tiering.source.split;
18+
19+
import com.alibaba.fluss.metadata.TableBucket;
20+
import com.alibaba.fluss.metadata.TablePath;
21+
22+
import javax.annotation.Nullable;
23+
24+
import java.util.Objects;
25+
26+
/** The table split for tiering service. It's used to describe the log data of a table bucket. */
27+
public class TieringLogSplit extends TieringSplit {
28+
29+
private static final String TIERING_LOG_SPLIT_PREFIX = "tiering-log-split-";
30+
31+
private final long startingOffset;
32+
private final long stoppingOffset;
33+
34+
public TieringLogSplit(
35+
TablePath tablePath,
36+
TableBucket tableBucket,
37+
@Nullable String partitionName,
38+
long startingOffset,
39+
long stoppingOffset) {
40+
super(tablePath, tableBucket, partitionName);
41+
this.startingOffset = startingOffset;
42+
this.stoppingOffset = stoppingOffset;
43+
}
44+
45+
@Override
46+
public String splitId() {
47+
return toSplitId(TIERING_LOG_SPLIT_PREFIX, this.tableBucket);
48+
}
49+
50+
public long getStartingOffset() {
51+
return startingOffset;
52+
}
53+
54+
public long getStoppingOffset() {
55+
return stoppingOffset;
56+
}
57+
58+
@Override
59+
public String toString() {
60+
return "TieringLogSplit{"
61+
+ "tablePath="
62+
+ tablePath
63+
+ ", tableBucket="
64+
+ tableBucket
65+
+ ", partitionName='"
66+
+ partitionName
67+
+ '\''
68+
+ ", startingOffset="
69+
+ startingOffset
70+
+ ", stoppingOffset="
71+
+ stoppingOffset
72+
+ '}';
73+
}
74+
75+
@Override
76+
public boolean equals(Object object) {
77+
if (!(object instanceof TieringLogSplit)) {
78+
return false;
79+
}
80+
TieringLogSplit that = (TieringLogSplit) object;
81+
return startingOffset == that.startingOffset && stoppingOffset == that.stoppingOffset;
82+
}
83+
84+
@Override
85+
public int hashCode() {
86+
return Objects.hash(startingOffset, stoppingOffset);
87+
}
88+
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/split/TieringSplit.java

Lines changed: 37 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -23,103 +23,81 @@
2323

2424
import javax.annotation.Nullable;
2525

26-
import java.util.Objects;
26+
/** The base table split for tiering service. */
27+
public abstract class TieringSplit implements SourceSplit {
2728

28-
/** The table split for tiering service. It's used to describe the log data of a table bucket. */
29-
public class TieringSplit implements SourceSplit {
29+
private static final String TIERING_SPLIT_PREFIX = "tiering-";
30+
public static final byte TIERING_KV_SPLIT_FLAG = 1;
31+
public static final byte TIERING_LOG_SPLIT_FLAG = 2;
3032

31-
private static final String TIERING_SPLIT_PREFIX = "tiering-split-";
32-
33-
private final TablePath tablePath;
33+
protected final TablePath tablePath;
3434
protected final TableBucket tableBucket;
3535
@Nullable protected final String partitionName;
36-
private final long startingOffset;
37-
private final long stoppingOffset;
3836

3937
public TieringSplit(
40-
TablePath tablePath,
41-
TableBucket tableBucket,
42-
@Nullable String partitionName,
43-
long startingOffset,
44-
long stoppingOffset) {
38+
TablePath tablePath, TableBucket tableBucket, @Nullable String partitionName) {
4539
this.tablePath = tablePath;
4640
this.tableBucket = tableBucket;
4741
this.partitionName = partitionName;
48-
this.startingOffset = startingOffset;
49-
this.stoppingOffset = stoppingOffset;
5042
if ((tableBucket.getPartitionId() == null && partitionName != null)
5143
|| (tableBucket.getPartitionId() != null && partitionName == null)) {
5244
throw new IllegalArgumentException(
5345
"Partition name and partition id must be both null or both not null.");
5446
}
5547
}
5648

57-
public long getStartingOffset() {
58-
return startingOffset;
49+
/** Checks whether this split is a kv split to tier. */
50+
public final boolean isTieringKvSplit() {
51+
return getClass() == TieringKvSplit.class;
5952
}
6053

61-
public long getStoppingOffset() {
62-
return stoppingOffset;
54+
/** Casts this split into a {@link TieringKvSplit}. */
55+
public TieringKvSplit asTieringKvSplit() {
56+
return (TieringKvSplit) this;
6357
}
64-
65-
public TablePath getTablePath() {
66-
return tablePath;
58+
/** Checks whether this split is a log split to tier. */
59+
public final boolean isTieringLogSplit() {
60+
return getClass() == TieringLogSplit.class;
6761
}
6862

69-
public TableBucket getTableBucket() {
70-
return tableBucket;
63+
/** Casts this split into a {@link TieringLogSplit}. */
64+
public TieringLogSplit asTieringLogSplit() {
65+
return (TieringLogSplit) this;
7166
}
7267

73-
@Nullable
74-
public String getPartitionName() {
75-
return partitionName;
68+
protected byte splitKind() {
69+
if (isTieringKvSplit()) {
70+
return TIERING_KV_SPLIT_FLAG;
71+
} else if (isTieringLogSplit()) {
72+
return TIERING_LOG_SPLIT_FLAG;
73+
} else {
74+
throw new IllegalArgumentException("Unsupported split kind for " + getClass());
75+
}
7676
}
7777

78-
public String splitId() {
78+
protected static String toSplitId(String splitPrefix, TableBucket tableBucket) {
7979
if (tableBucket.getPartitionId() != null) {
80-
return TIERING_SPLIT_PREFIX
80+
return splitPrefix
8181
+ tableBucket.getTableId()
8282
+ "-p"
8383
+ tableBucket.getPartitionId()
8484
+ "-"
8585
+ tableBucket.getBucket();
8686
} else {
87-
return TIERING_SPLIT_PREFIX + tableBucket.getTableId() + "-" + tableBucket.getBucket();
87+
return splitPrefix + tableBucket.getTableId() + "-" + tableBucket.getBucket();
8888
}
8989
}
9090

91-
@Override
92-
public String toString() {
93-
return "TieringSplit{"
94-
+ "tablePath="
95-
+ tablePath
96-
+ ", tableBucket="
97-
+ tableBucket
98-
+ ", partitionName='"
99-
+ partitionName
100-
+ '\''
101-
+ ", startingOffset="
102-
+ startingOffset
103-
+ ", stoppingOffset="
104-
+ stoppingOffset
105-
+ '}';
91+
public TablePath getTablePath() {
92+
return tablePath;
10693
}
10794

108-
@Override
109-
public boolean equals(Object object) {
110-
if (!(object instanceof TieringSplit)) {
111-
return false;
112-
}
113-
TieringSplit that = (TieringSplit) object;
114-
return startingOffset == that.startingOffset
115-
&& stoppingOffset == that.stoppingOffset
116-
&& Objects.equals(tablePath, that.tablePath)
117-
&& Objects.equals(tableBucket, that.tableBucket)
118-
&& Objects.equals(partitionName, that.partitionName);
95+
public TableBucket getTableBucket() {
96+
return tableBucket;
11997
}
12098

121-
@Override
122-
public int hashCode() {
123-
return Objects.hash(tablePath, tableBucket, partitionName, startingOffset, stoppingOffset);
99+
@Nullable
100+
public String getPartitionName() {
101+
return partitionName;
124102
}
125103
}

0 commit comments

Comments
 (0)