Skip to content

Commit ce8dca7

Browse files
authored
Merge branch 'apache:main' into lance-writer
2 parents 1875582 + 1c358ef commit ce8dca7

File tree

106 files changed

+3834
-327
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+3834
-327
lines changed

.github/workflows/ci-template.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ jobs:
4848
distribution: 'temurin'
4949
- name: Build
5050
run: |
51-
mvn -T 1C -B clean install -DskipTests
51+
mvn -T 1C -B clean install -DskipTests ${{ inputs.maven-parameters }}
5252
- name: Test
5353
timeout-minutes: 60
5454
run: |

fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.annotation.PublicEvolving;
2121
import com.alibaba.fluss.metadata.TableBucket;
2222

23+
import java.util.Collections;
2324
import java.util.Map;
2425

2526
/**
@@ -36,17 +37,29 @@ public class LakeSnapshot {
3637
// the specific log offset of the snapshot
3738
private final Map<TableBucket, Long> tableBucketsOffset;
3839

39-
public LakeSnapshot(long snapshotId, Map<TableBucket, Long> tableBucketsOffset) {
40+
// the partition name by partition id of this lake snapshot if
41+
// is a partitioned table, empty if not a partitioned table
42+
private final Map<Long, String> partitionNameById;
43+
44+
public LakeSnapshot(
45+
long snapshotId,
46+
Map<TableBucket, Long> tableBucketsOffset,
47+
Map<Long, String> partitionNameById) {
4048
this.snapshotId = snapshotId;
4149
this.tableBucketsOffset = tableBucketsOffset;
50+
this.partitionNameById = partitionNameById;
4251
}
4352

4453
public long getSnapshotId() {
4554
return snapshotId;
4655
}
4756

4857
public Map<TableBucket, Long> getTableBucketsOffset() {
49-
return tableBucketsOffset;
58+
return Collections.unmodifiableMap(tableBucketsOffset);
59+
}
60+
61+
public Map<Long, String> getPartitionNameById() {
62+
return Collections.unmodifiableMap(partitionNameById);
5063
}
5164

5265
@Override
@@ -56,6 +69,8 @@ public String toString() {
5669
+ snapshotId
5770
+ ", tableBucketsOffset="
5871
+ tableBucketsOffset
72+
+ ", partitionNameById="
73+
+ partitionNameById
5974
+ '}';
6075
}
6176
}

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,16 +202,20 @@ public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse
202202
long snapshotId = response.getSnapshotId();
203203
Map<TableBucket, Long> tableBucketsOffset =
204204
new HashMap<>(response.getBucketSnapshotsCount());
205+
Map<Long, String> partitionNameById = new HashMap<>();
205206
for (PbLakeSnapshotForBucket pbLakeSnapshotForBucket : response.getBucketSnapshotsList()) {
206207
Long partitionId =
207208
pbLakeSnapshotForBucket.hasPartitionId()
208209
? pbLakeSnapshotForBucket.getPartitionId()
209210
: null;
210211
TableBucket tableBucket =
211212
new TableBucket(tableId, partitionId, pbLakeSnapshotForBucket.getBucketId());
213+
if (partitionId != null && pbLakeSnapshotForBucket.hasPartitionName()) {
214+
partitionNameById.put(partitionId, pbLakeSnapshotForBucket.getPartitionName());
215+
}
212216
tableBucketsOffset.put(tableBucket, pbLakeSnapshotForBucket.getLogOffset());
213217
}
214-
return new LakeSnapshot(snapshotId, tableBucketsOffset);
218+
return new LakeSnapshot(snapshotId, tableBucketsOffset, partitionNameById);
215219
}
216220

217221
public static List<FsPathAndFileName> toFsPathAndFileName(

fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,17 @@ public class BucketOffset implements Serializable {
3131
private final long logOffset;
3232
private final int bucket;
3333
private final @Nullable Long partitionId;
34-
private final @Nullable String partitionName;
34+
private final @Nullable String partitionQualifiedName;
3535

3636
public BucketOffset(
3737
long logOffset,
3838
int bucket,
3939
@Nullable Long partitionId,
40-
@Nullable String partitionName) {
40+
@Nullable String partitionQualifiedName) {
4141
this.logOffset = logOffset;
4242
this.bucket = bucket;
4343
this.partitionId = partitionId;
44-
this.partitionName = partitionName;
44+
this.partitionQualifiedName = partitionQualifiedName;
4545
}
4646

4747
public long getLogOffset() {
@@ -58,8 +58,8 @@ public Long getPartitionId() {
5858
}
5959

6060
@Nullable
61-
public String getPartitionName() {
62-
return partitionName;
61+
public String getPartitionQualifiedName() {
62+
return partitionQualifiedName;
6363
}
6464

6565
@Override
@@ -74,6 +74,6 @@ public boolean equals(Object o) {
7474
return bucket == that.bucket
7575
&& logOffset == that.logOffset
7676
&& Objects.equals(partitionId, that.partitionId)
77-
&& Objects.equals(partitionName, that.partitionName);
77+
&& Objects.equals(partitionQualifiedName, that.partitionQualifiedName);
7878
}
7979
}

fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public class CommittedLakeSnapshot {
3434
// partition bucket
3535
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new HashMap<>();
3636

37+
// partition id -> partition name, will be empty if is not a partitioned table
38+
// the partition name is a qualified name in the format: key1=value1/key2=value2
39+
private final Map<Long, String> qualifiedPartitionNameById = new HashMap<>();
40+
3741
public CommittedLakeSnapshot(long lakeSnapshotId) {
3842
this.lakeSnapshotId = lakeSnapshotId;
3943
}
@@ -46,27 +50,34 @@ public void addBucket(int bucketId, long offset) {
4650
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
4751
}
4852

49-
public void addPartitionBucket(Long partitionId, int bucketId, long offset) {
53+
public void addPartitionBucket(
54+
Long partitionId, String partitionQualifiedName, int bucketId, long offset) {
5055
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
56+
qualifiedPartitionNameById.put(partitionId, partitionQualifiedName);
5157
}
5258

5359
public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
5460
return logEndOffsets;
5561
}
5662

63+
public Map<Long, String> getQualifiedPartitionNameById() {
64+
return qualifiedPartitionNameById;
65+
}
66+
5767
@Override
5868
public boolean equals(Object o) {
5969
if (o == null || getClass() != o.getClass()) {
6070
return false;
6171
}
6272
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
6373
return lakeSnapshotId == that.lakeSnapshotId
64-
&& Objects.equals(logEndOffsets, that.logEndOffsets);
74+
&& Objects.equals(logEndOffsets, that.logEndOffsets)
75+
&& Objects.equals(qualifiedPartitionNameById, that.qualifiedPartitionNameById);
6576
}
6677

6778
@Override
6879
public int hashCode() {
69-
return Objects.hash(lakeSnapshotId, logEndOffsets);
80+
return Objects.hash(lakeSnapshotId, logEndOffsets, qualifiedPartitionNameById);
7081
}
7182

7283
@Override
@@ -76,6 +87,8 @@ public String toString() {
7687
+ lakeSnapshotId
7788
+ ", logEndOffsets="
7889
+ logEndOffsets
90+
+ ", partitionNameById="
91+
+ qualifiedPartitionNameById
7992
+ '}';
8093
}
8194
}

fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import javax.annotation.Nullable;
2626

27+
import java.util.Map;
28+
2729
/**
2830
* The WriterInitContext interface provides the context needed to create a LakeWriter. It includes
2931
* methods to obtain the table path, table bucket, and an optional partition.
@@ -61,4 +63,11 @@ public interface WriterInitContext {
6163
* @return the table schema
6264
*/
6365
Schema schema();
66+
67+
/**
68+
* Returns the table custom properties.
69+
*
70+
* @return the table custom properties
71+
*/
72+
Map<String, String> customProperties();
6473
}

fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,25 @@ public String getPartitionQualifiedName() {
125125
return sb.toString();
126126
}
127127

128+
public static ResolvedPartitionSpec fromPartitionQualifiedName(String qualifiedPartitionName) {
129+
// convert from qualified name to ResolvedPartitionSpec
130+
List<String> keys = new ArrayList<>();
131+
List<String> values = new ArrayList<>();
132+
133+
String[] keyValuePairs = qualifiedPartitionName.split("/");
134+
135+
for (String pair : keyValuePairs) {
136+
String[] keyValue = pair.split("=", 2);
137+
if (keyValue.length != 2) {
138+
throw new IllegalArgumentException(
139+
"Invalid partition name format. Expected key=value, got: " + pair);
140+
}
141+
keys.add(keyValue[0]);
142+
values.add(keyValue[1]);
143+
}
144+
return new ResolvedPartitionSpec(keys, values);
145+
}
146+
128147
@Override
129148
public boolean equals(Object o) {
130149
if (o == null || getClass() != o.getClass()) {

fluss-common/src/main/java/com/alibaba/fluss/metrics/groups/AbstractMetricGroup.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,13 @@ String getLogicalScope(CharacterFilter filter, char delimiter, int reporterIndex
163163

164164
protected String createLogicalScope(CharacterFilter filter, char delimiter) {
165165
final String groupName = getGroupName(filter);
166-
return parent == null
167-
? groupName
168-
: parent.getLogicalScope(filter, delimiter) + delimiter + groupName;
166+
if (parent == null) {
167+
return groupName;
168+
}
169+
if (groupName == null || groupName.isEmpty()) {
170+
return parent.getLogicalScope(filter, delimiter);
171+
}
172+
return parent.getLogicalScope(filter, delimiter) + delimiter + groupName;
169173
}
170174

171175
/** Return the parent of the metric group. */

fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public void serialize(BucketOffset bucketOffset, JsonGenerator generator) throws
6060
generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());
6161

6262
// serialize partition name
63-
if (bucketOffset.getPartitionName() != null) {
64-
generator.writeStringField(PARTITION_NAME, bucketOffset.getPartitionName());
63+
if (bucketOffset.getPartitionQualifiedName() != null) {
64+
generator.writeStringField(PARTITION_NAME, bucketOffset.getPartitionQualifiedName());
6565
}
6666

6767
// serialize bucket offset

fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ void testResolvedPartitionSpec() {
3838
new PartitionSpec(Collections.singletonMap("a", "1")));
3939
assertThat(resolvedPartitionSpec.getPartitionName()).isEqualTo("1");
4040
assertThat(resolvedPartitionSpec.getPartitionQualifiedName()).isEqualTo("a=1");
41+
assertThat(
42+
ResolvedPartitionSpec.fromPartitionQualifiedName(
43+
resolvedPartitionSpec.getPartitionQualifiedName()))
44+
.isEqualTo(resolvedPartitionSpec);
45+
4146
assertThat(resolvedPartitionSpec.getPartitionKeys())
4247
.isEqualTo(Collections.singletonList("a"));
4348
assertThat(resolvedPartitionSpec.getPartitionValues())
@@ -52,5 +57,9 @@ void testResolvedPartitionSpec() {
5257
assertThat(resolvedPartitionSpec.getPartitionQualifiedName()).isEqualTo("a=1/b=2");
5358
assertThat(resolvedPartitionSpec.getPartitionKeys()).isEqualTo(Arrays.asList("a", "b"));
5459
assertThat(resolvedPartitionSpec.getPartitionValues()).isEqualTo(Arrays.asList("1", "2"));
60+
assertThat(
61+
ResolvedPartitionSpec.fromPartitionQualifiedName(
62+
resolvedPartitionSpec.getPartitionQualifiedName()))
63+
.isEqualTo(resolvedPartitionSpec);
5564
}
5665
}

0 commit comments

Comments
 (0)