Skip to content

Commit 62902ff

Browse files
authored
Merge branch 'apache:main' into lance-writer
2 parents 709b748 + 7f70265 commit 62902ff

File tree

181 files changed

+6924
-2028
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

181 files changed

+6924
-2028
lines changed

.github/workflows/ci-template.yaml

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
# .github/workflows/ci-template.yaml
20+
name: CI Template
21+
22+
on:
23+
workflow_call:
24+
inputs:
25+
java-version:
26+
required: true
27+
type: string
28+
maven-parameters:
29+
description: "Any parameters of the Maven command."
30+
required: false
31+
type: string
32+
default: ""
33+
jobs:
34+
build:
35+
runs-on: self-hosted
36+
strategy:
37+
fail-fast: false
38+
matrix:
39+
module: [ core, flink ]
40+
name: "${{ matrix.module }}"
41+
steps:
42+
- name: Checkout code
43+
uses: actions/checkout@v2
44+
- name: Set up JDK
45+
uses: actions/setup-java@v4
46+
with:
47+
java-version: ${{ inputs.java-version }}
48+
distribution: 'temurin'
49+
- name: Build
50+
run: |
51+
mvn -T 1C -B clean install -DskipTests
52+
- name: Test
53+
timeout-minutes: 60
54+
run: |
55+
TEST_MODULES=$(./.github/workflows/stage.sh ${{ matrix.module }})
56+
echo "github ref: ${{ github.ref }}"
57+
echo "Start testing modules: $TEST_MODULES"
58+
mvn -B verify $TEST_MODULES -Ptest-coverage -Ptest-${{ matrix.module }} -Dlog.dir=${{ runner.temp }}/fluss-logs -Dlog4j.configurationFile=${{ github.workspace }}/tools/ci/log4j.properties ${{ inputs.maven-parameters }}
59+
env:
60+
MAVEN_OPTS: -Xmx4096m
61+
ARTIFACTS_OSS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_ENDPOINT }}
62+
ARTIFACTS_OSS_REGION: ${{ secrets.ARTIFACTS_OSS_REGION }}
63+
ARTIFACTS_OSS_BUCKET: ${{ secrets.ARTIFACTS_OSS_BUCKET }}
64+
ARTIFACTS_OSS_ACCESS_KEY: ${{ secrets.ARTIFACTS_OSS_ACCESS_KEY }}
65+
ARTIFACTS_OSS_SECRET_KEY: ${{ secrets.ARTIFACTS_OSS_SECRET_KEY }}
66+
ARTIFACTS_OSS_STS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_STS_ENDPOINT }}
67+
ARTIFACTS_OSS_ROLE_ARN: ${{ secrets.ARTIFACTS_OSS_ROLE_ARN }}
68+
- name: Upload build logs
69+
uses: actions/upload-artifact@v4
70+
if: ${{ failure() }}
71+
with:
72+
name: logs-test-${{ matrix.module }}-${{ github.run_number}}#${{ github.run_attempt }}
73+
path: ${{ runner.temp }}/fluss-logs/*
74+
- name: Upload JaCoCo coverage report
75+
uses: actions/upload-artifact@v4
76+
if: ${{ success() && github.ref == 'refs/heads/main' }}
77+
with:
78+
name: jacoco-report-${{ matrix.module }}-${{ github.run_number}}#${{ github.run_attempt }}
79+
path: ${{ github.workspace }}/fluss-test-coverage/target/site/jacoco-aggregate/*

.github/workflows/ci.yaml

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,52 +22,36 @@ on:
2222
- main
2323
- release-**
2424
- ci-**
25+
paths-ignore:
26+
- 'website/**'
27+
- '**/*.md'
2528
pull_request:
2629
paths-ignore:
2730
- 'website/**'
2831
- '**/*.md'
32+
2933
concurrency:
3034
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
3135
cancel-in-progress: true
3236

3337
jobs:
34-
build:
35-
runs-on: self-hosted
36-
strategy:
37-
fail-fast: false
38-
matrix:
39-
module: [ core, flink ]
38+
compile-on-jdk8:
39+
name: "Compile Java 8"
40+
runs-on: ubuntu-latest
4041
steps:
4142
- name: Checkout code
4243
uses: actions/checkout@v2
44+
- name: Set up JDK 8
45+
uses: actions/setup-java@v4
46+
with:
47+
java-version: '8'
48+
distribution: 'temurin'
4349
- name: Build
4450
run: |
45-
mvn -T 1C -B clean install -DskipTests
46-
- name: Test
47-
timeout-minutes: 60
48-
run: |
49-
TEST_MODULES=$(./.github/workflows/stage.sh ${{ matrix.module }})
50-
echo "github ref: ${{ github.ref }}"
51-
echo "Start testing modules: $TEST_MODULES"
52-
mvn -B verify $TEST_MODULES -Ptest-coverage -Ptest-${{ matrix.module }} -Dlog.dir=${{ runner.temp }}/fluss-logs -Dlog4j.configurationFile=${{ github.workspace }}/tools/ci/log4j.properties
53-
env:
54-
MAVEN_OPTS: -Xmx4096m
55-
ARTIFACTS_OSS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_ENDPOINT }}
56-
ARTIFACTS_OSS_REGION: ${{ secrets.ARTIFACTS_OSS_REGION }}
57-
ARTIFACTS_OSS_BUCKET: ${{ secrets.ARTIFACTS_OSS_BUCKET }}
58-
ARTIFACTS_OSS_ACCESS_KEY: ${{ secrets.ARTIFACTS_OSS_ACCESS_KEY }}
59-
ARTIFACTS_OSS_SECRET_KEY: ${{ secrets.ARTIFACTS_OSS_SECRET_KEY }}
60-
ARTIFACTS_OSS_STS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_STS_ENDPOINT }}
61-
ARTIFACTS_OSS_ROLE_ARN: ${{ secrets.ARTIFACTS_OSS_ROLE_ARN }}
62-
- name: Upload build logs
63-
uses: actions/upload-artifact@v4
64-
if: ${{ failure() }}
65-
with:
66-
name: logs-test-${{ matrix.module }}-${{ github.run_number}}#${{ github.run_attempt }}
67-
path: ${{ runner.temp }}/fluss-logs/*
68-
- name: Upload JaCoCo coverage report
69-
uses: actions/upload-artifact@v4
70-
if: ${{ success() && github.ref == 'refs/heads/main' }}
71-
with:
72-
name: jacoco-report-${{ matrix.module }}-${{ github.run_number}}#${{ github.run_attempt }}
73-
path: ${{ github.workspace }}/fluss-test-coverage/target/site/jacoco-aggregate/*
51+
mvn -T 1C -B clean install -DskipTests -Pjava8
52+
53+
build-on-jdk11:
54+
name: "Java 11"
55+
uses: ./.github/workflows/ci-template.yaml
56+
with:
57+
java-version: "11"

.github/workflows/license-check.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
- name: Set JDK
3838
uses: actions/setup-java@v4
3939
with:
40-
java-version: 8
40+
java-version: 11
4141
distribution: 'temurin'
4242

4343
- name: Build

.github/workflows/nightly.yaml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
name: Nightly
19+
on:
20+
schedule:
21+
# Run at 20:00 UTC daily which is the lowest traffic time for Fluss project.
22+
- cron: "0 20 * * *"
23+
24+
concurrency:
25+
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
26+
cancel-in-progress: true
27+
28+
jobs:
29+
build-on-jdk8:
30+
name: "Java 8"
31+
uses: ./.github/workflows/ci-template.yaml
32+
with:
33+
java-version: "8"
34+
maven-parameters: "-Pjava8"

LICENSE

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,33 @@ Apache Kafka
359359
./fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java
360360

361361
Apache Paimon
362+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java
363+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/CompareUtils.java
364+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java
365+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java
366+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java
367+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/Equal.java
368+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/FieldRef.java
369+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/FunctionVisitor.java
370+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterOrEqual.java
371+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterThan.java
372+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/In.java
373+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNotNull.java
374+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNull.java
375+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafFunction.java
376+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafPredicate.java
377+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafUnaryFunction.java
378+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LessOrEqual.java
379+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/LessThan.java
380+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/NotEqual.java
381+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/NotIn.java
382+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/NullFalseLeafBinaryFunction.java
383+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/Or.java
384+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/PartitionPredicateVisitor.java
385+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/Predicate.java
386+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateBuilder.java
387+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateVisitor.java
388+
./fluss-common/src/main/java/com/alibaba/fluss/predicate/StartsWith.java
362389
./fluss-common/src/main/java/com/alibaba/fluss/row/encode/paimon/PaimonBinaryRowWriter.java
363390
./fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotScanner.java
364391

fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookuper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public interface Lookuper {
3434
* Lookups certain row from the given lookup key.
3535
*
3636
* <p>The lookup key must be a primary key if the lookuper is a Primary Key Lookuper (created by
37-
* {@code table.newLookuper().create()}), or be the prefix key if the lookuper is a Prefix Key
38-
* Lookuper (created by {@code table.newLookuper().withLookupColumns(prefixKeys).create()}).
37+
* {@code table.newLookup().createLookuper()}), or be the prefix key if the lookuper is a Prefix
38+
* Key Lookuper (created by {@code table.newLookup().lookupBy(prefixKeys).createLookuper()}).
3939
*
4040
* @param lookupKey the lookup key.
4141
* @return the result of lookup.

fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ static BucketingFunction of(@Nullable DataLakeFormat lakeFormat) {
4141
return new PaimonBucketingFunction();
4242
} else if (lakeFormat == DataLakeFormat.LANCE) {
4343
return new FlussBucketingFunction();
44+
} else if (lakeFormat == DataLakeFormat.ICEBERG) {
45+
return new IcebergBucketingFunction();
4446
} else {
4547
throw new UnsupportedOperationException("Unsupported lake format: " + lakeFormat);
4648
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.bucketing;
19+
20+
import com.alibaba.fluss.shaded.guava32.com.google.common.hash.HashFunction;
21+
import com.alibaba.fluss.shaded.guava32.com.google.common.hash.Hashing;
22+
23+
/** An implementation of {@link BucketingFunction} to follow Iceberg's bucketing strategy. */
24+
public class IcebergBucketingFunction implements BucketingFunction {
25+
26+
private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
27+
28+
@Override
29+
public int bucketing(byte[] bucketKey, int numBuckets) {
30+
if (bucketKey == null || bucketKey.length == 0) {
31+
throw new IllegalArgumentException("bucketKey must not be null or empty");
32+
}
33+
if (numBuckets <= 0) {
34+
throw new IllegalArgumentException("numBuckets must be positive");
35+
}
36+
return (MURMUR3.hashBytes(bucketKey).asInt() & Integer.MAX_VALUE) % numBuckets;
37+
}
38+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.committer;
19+
20+
import javax.annotation.Nullable;
21+
22+
import java.io.Serializable;
23+
import java.util.Objects;
24+
25+
/** The bucket offset information to be expected to be stored in Lake's snapshot property. */
26+
public class BucketOffset implements Serializable {
27+
28+
private static final long serialVersionUID = 1L;
29+
public static final String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY = "fluss-offsets";
30+
31+
private final long logOffset;
32+
private final int bucket;
33+
private final @Nullable Long partitionId;
34+
private final @Nullable String partitionName;
35+
36+
public BucketOffset(
37+
long logOffset,
38+
int bucket,
39+
@Nullable Long partitionId,
40+
@Nullable String partitionName) {
41+
this.logOffset = logOffset;
42+
this.bucket = bucket;
43+
this.partitionId = partitionId;
44+
this.partitionName = partitionName;
45+
}
46+
47+
public long getLogOffset() {
48+
return logOffset;
49+
}
50+
51+
public int getBucket() {
52+
return bucket;
53+
}
54+
55+
@Nullable
56+
public Long getPartitionId() {
57+
return partitionId;
58+
}
59+
60+
@Nullable
61+
public String getPartitionName() {
62+
return partitionName;
63+
}
64+
65+
@Override
66+
public boolean equals(Object o) {
67+
if (this == o) {
68+
return true;
69+
}
70+
if (o == null || getClass() != o.getClass()) {
71+
return false;
72+
}
73+
BucketOffset that = (BucketOffset) o;
74+
return bucket == that.bucket
75+
&& logOffset == that.logOffset
76+
&& Objects.equals(partitionId, that.partitionId)
77+
&& Objects.equals(partitionName, that.partitionName);
78+
}
79+
}

fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
public class CommittedLakeSnapshot {
3131

3232
private final long lakeSnapshotId;
33-
// <partition_name, bucket> -> log offset, partition_name will be null if it's not a
33+
// <partition_id, bucket> -> log offset, partition_id will be null if it's not a
3434
// partition bucket
35-
private final Map<Tuple2<String, Integer>, Long> logEndOffsets = new HashMap<>();
35+
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new HashMap<>();
3636

3737
public CommittedLakeSnapshot(long lakeSnapshotId) {
3838
this.lakeSnapshotId = lakeSnapshotId;
@@ -46,11 +46,11 @@ public void addBucket(int bucketId, long offset) {
4646
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
4747
}
4848

49-
public void addPartitionBucket(String partitionName, int bucketId, long offset) {
50-
logEndOffsets.put(Tuple2.of(partitionName, bucketId), offset);
49+
public void addPartitionBucket(Long partitionId, int bucketId, long offset) {
50+
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
5151
}
5252

53-
public Map<Tuple2<String, Integer>, Long> getLogEndOffsets() {
53+
public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
5454
return logEndOffsets;
5555
}
5656

0 commit comments

Comments
 (0)