Skip to content

Commit 4060c99

Browse files
authored
Merge branch 'main' into autoIncSyntax
2 parents 1b90085 + 1c6c722 commit 4060c99

File tree

409 files changed

+23142
-3659
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

409 files changed

+23142
-3659
lines changed

.github/ISSUE_TEMPLATE/bug.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ body:
3939
description: What Fluss version are you using?
4040
multiple: false
4141
options:
42-
- "0.7.0 (latest release)"
43-
- "0.6.0"
44-
- "0.5.0"
42+
- "0.8.0 (latest release)"
43+
- "0.7.0"
4544
- "main (development)"
4645
validations:
4746
required: true

.github/workflows/docs-check.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
run: ./build_versioned_docs.sh
4545
- uses: actions/setup-node@v4
4646
with:
47-
node-version: 18
47+
node-version: 20
4848
- name: Install dependencies
4949
run: npm install
5050
- name: Test build website

LICENSE

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,3 +401,7 @@ LightProto
401401
./fluss-protogen/fluss-protogen-generator/src/main/java/org/apache/fluss/protogen/generator/generator/ProtobufMessage.java
402402
./fluss-protogen/fluss-protogen-generator/src/main/java/org/apache/fluss/protogen/generator/generator/ProtobufNumberField.java
403403
./fluss-protogen/fluss-protogen-maven-plugin/src/main/java/org/apache/fluss/protogen/maven/plugin/ProtoGenMojo.java
404+
405+
Apache Maven Wrapper
406+
./mvnw
407+
./mvnw.cmd

NOTICE

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,19 @@ with the following copyright notice:
2121
LightProto
2222
Copyright 2020 Splunk Inc.
2323

24-
----------------------------------------------------------
24+
----------------------------------------------------------
25+
26+
This product contains code from the Apache Maven Wrapper Project:
27+
28+
Apache Maven Wrapper
29+
Copyright 2013-2025 The Apache Software Foundation
30+
31+
This product includes software developed at
32+
The Apache Software Foundation (http://www.apache.org/).
33+
34+
The original idea and initial implementation of the maven-wrapper module is derived
35+
from the Gradle Wrapper which was written originally by Hans Dockter and Adam Murdoch.
36+
Copyright 2007 the original author or authors.
37+
38+
----------------------------------------------------------
39+

README.md

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
119
<p align="center">
220
<picture>
321
<source media="(prefers-color-scheme: dark)" srcset="website/static/img/logo/svg/white_color_logo.svg">
@@ -43,15 +61,12 @@ Prerequisites for building Apache Fluss:
4361
- Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL)
4462
- Git
4563
- Maven (we require version >= 3.8.6)
46-
- Java 8 or 11
64+
- Java 11
4765

4866
```bash
4967
git clone https://github.com/apache/fluss.git
5068
cd fluss
51-
# in case of java 11
5269
./mvnw clean package -DskipTests
53-
# or in case of java 8
54-
./mvnw clean package -DskipTests -Pjava8
5570
```
5671

5772
Apache Fluss is now installed in `build-target`. The build command uses Maven Wrapper (`mvnw`) which ensures the correct Maven version is used.

docker/quickstart-flink/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
119
# Fluss Quickstart Flink Docker
220

321
This directory contains the Docker setup for Fluss Quickstart with Flink integration.

docker/quickstart-flink/prepare_build.sh

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,12 @@ main() {
170170
# Iceberg Support
171171
log_info "Downloading Iceberg connector JARs..."
172172

173-
# Download iceberg-flink-runtime for Flink 1.20 (version 1.9.1)
173+
# Download iceberg-flink-runtime for Flink 1.20 (version 1.10.0)
174174
download_jar \
175-
"https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.20/1.9.1/iceberg-flink-runtime-1.20-1.9.1.jar" \
176-
"./lib/iceberg-flink-runtime-1.20-1.9.1.jar" \
175+
"https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.20/1.10.0/iceberg-flink-runtime-1.20-1.10.0.jar" \
176+
"./lib/iceberg-flink-runtime-1.20-1.10.0.jar" \
177177
"" \
178-
"iceberg-flink-runtime-1.20-1.9.1"
178+
"iceberg-flink-runtime-1.20-1.10.0"
179179

180180

181181
# Prepare lake tiering JAR
@@ -201,7 +201,7 @@ verify_jars() {
201201
"flink-faker-0.5.3.jar"
202202
"hadoop-apache-3.3.5-2.jar"
203203
"paimon-flink-1.20-1.2.0.jar"
204-
"iceberg-flink-runtime-1.20-1.9.1.jar"
204+
"iceberg-flink-runtime-1.20-1.10.0.jar"
205205
)
206206

207207
local opt_jars=(
@@ -250,7 +250,7 @@ show_summary() {
250250
echo " ✓ Fluss Flink 1.20 connector"
251251
echo " ✓ Fluss Lake Paimon connector"
252252
echo " ✓ Fluss Lake Iceberg connector"
253-
echo " ✓ Iceberg Flink runtime 1.20 (v1.9.1)"
253+
echo " ✓ Iceberg Flink runtime 1.20 (v1.10.0)"
254254
echo " ✓ Paimon Flink 1.20 (v1.2.0)"
255255
echo " ✓ Hadoop Apache (v3.3.5-2)"
256256
echo " ✓ Flink Faker (v0.5.3)"

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.fluss.config.Configuration;
3434
import org.apache.fluss.exception.FlussRuntimeException;
3535
import org.apache.fluss.fs.FileSystem;
36-
import org.apache.fluss.metadata.TableInfo;
3736
import org.apache.fluss.metadata.TablePath;
3837
import org.apache.fluss.metrics.registry.MetricRegistry;
3938
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -61,6 +60,7 @@ public final class FlussConnection implements Connection {
6160
private volatile LookupClient lookupClient;
6261
private volatile RemoteFileDownloader remoteFileDownloader;
6362
private volatile SecurityTokenManager securityTokenManager;
63+
private volatile Admin admin;
6464

6565
FlussConnection(Configuration conf) {
6666
this(conf, MetricRegistry.create(conf, null));
@@ -93,19 +93,15 @@ public Configuration getConfiguration() {
9393

9494
@Override
9595
public Admin getAdmin() {
96-
return new FlussAdmin(rpcClient, metadataUpdater);
96+
return getOrCreateAdmin();
9797
}
9898

9999
@Override
100100
public Table getTable(TablePath tablePath) {
101-
// force to update the table info from server to avoid stale data in cache
101+
// force to update the table info from server to avoid stale data in cache.
102102
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
103-
TableInfo tableInfo = metadataUpdater.getTableInfoOrElseThrow(tablePath);
104-
return new FlussTable(this, tablePath, tableInfo);
105-
}
106-
107-
public RpcClient getRpcClient() {
108-
return rpcClient;
103+
Admin admin = getOrCreateAdmin();
104+
return new FlussTable(this, tablePath, admin.getTableInfo(tablePath).join());
109105
}
110106

111107
public MetadataUpdater getMetadataUpdater() {
@@ -140,6 +136,17 @@ public LookupClient getOrCreateLookupClient() {
140136
return lookupClient;
141137
}
142138

139+
public Admin getOrCreateAdmin() {
140+
if (admin == null) {
141+
synchronized (this) {
142+
if (admin == null) {
143+
admin = new FlussAdmin(rpcClient, metadataUpdater);
144+
}
145+
}
146+
}
147+
return admin;
148+
}
149+
143150
public RemoteFileDownloader getOrCreateRemoteFileDownloader() {
144151
if (remoteFileDownloader == null) {
145152
synchronized (this) {

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@
8888
import java.util.List;
8989
import java.util.Map;
9090
import java.util.concurrent.CompletableFuture;
91-
import java.util.stream.Collectors;
9291

92+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
9393
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9494
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9595
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
@@ -248,18 +248,8 @@ public CompletableFuture<Void> createTable(
248248
public CompletableFuture<Void> alterTable(
249249
TablePath tablePath, List<TableChange> tableChanges, boolean ignoreIfNotExists) {
250250
tablePath.validate();
251-
AlterTableRequest request = new AlterTableRequest();
252-
253-
List<PbAlterConfig> pbFlussTableChanges =
254-
tableChanges.stream()
255-
.map(ClientRpcMessageUtils::toPbAlterConfigs)
256-
.collect(Collectors.toList());
257-
258-
request.addAllConfigChanges(pbFlussTableChanges)
259-
.setIgnoreIfNotExists(ignoreIfNotExists)
260-
.setTablePath()
261-
.setDatabaseName(tablePath.getDatabaseName())
262-
.setTableName(tablePath.getTableName());
251+
AlterTableRequest request =
252+
makeAlterTableRequest(tablePath, tableChanges, ignoreIfNotExists);
263253
return gateway.alterTable(request).thenApply(r -> null);
264254
}
265255

@@ -420,15 +410,16 @@ private ListOffsetsResult listOffsets(
420410
OffsetSpec offsetSpec) {
421411
Long partitionId = null;
422412
metadataUpdater.updateTableOrPartitionMetadata(physicalTablePath.getTablePath(), null);
423-
long tableId = metadataUpdater.getTableId(physicalTablePath.getTablePath());
413+
TableInfo tableInfo = getTableInfo(physicalTablePath.getTablePath()).join();
414+
424415
// if partition name is not null, we need to check and update partition metadata
425416
if (physicalTablePath.getPartitionName() != null) {
426417
metadataUpdater.updatePhysicalTableMetadata(Collections.singleton(physicalTablePath));
427418
partitionId = metadataUpdater.getPartitionIdOrElseThrow(physicalTablePath);
428419
}
429420
Map<Integer, ListOffsetsRequest> requestMap =
430421
prepareListOffsetsRequests(
431-
metadataUpdater, tableId, partitionId, buckets, offsetSpec);
422+
metadataUpdater, tableInfo.getTableId(), partitionId, buckets, offsetSpec);
432423
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = MapUtils.newConcurrentHashMap();
433424
for (int bucket : buckets) {
434425
bucketToOffsetMap.put(bucket, new CompletableFuture<>());

fluss-client/src/main/java/org/apache/fluss/client/converter/ConverterCommons.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424

2525
import java.math.BigDecimal;
2626
import java.util.Arrays;
27-
import java.util.HashMap;
27+
import java.util.EnumMap;
2828
import java.util.HashSet;
29+
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Set;
3132

@@ -43,7 +44,7 @@ final class ConverterCommons {
4344
static final Map<DataTypeRoot, Set<Class<?>>> SUPPORTED_TYPES = createSupportedTypes();
4445

4546
private static Map<DataTypeRoot, Set<Class<?>>> createSupportedTypes() {
46-
Map<DataTypeRoot, Set<Class<?>>> map = new HashMap<>();
47+
Map<DataTypeRoot, Set<Class<?>>> map = new EnumMap<>(DataTypeRoot.class);
4748
map.put(DataTypeRoot.BOOLEAN, setOf(Boolean.class));
4849
map.put(DataTypeRoot.TINYINT, setOf(Byte.class));
4950
map.put(DataTypeRoot.SMALLINT, setOf(Short.class));
@@ -67,15 +68,15 @@ private static Map<DataTypeRoot, Set<Class<?>>> createSupportedTypes() {
6768

6869
static void validatePojoMatchesTable(PojoType<?> pojoType, RowType tableSchema) {
6970
Set<String> pojoNames = pojoType.getProperties().keySet();
70-
Set<String> tableNames = new HashSet<>(tableSchema.getFieldNames());
71-
if (!pojoNames.equals(tableNames)) {
71+
List<String> fieldNames = tableSchema.getFieldNames();
72+
if (!pojoNames.containsAll(fieldNames)) {
7273
throw new IllegalArgumentException(
7374
String.format(
7475
"POJO fields %s must exactly match table schema fields %s.",
75-
pojoNames, tableNames));
76+
pojoNames, fieldNames));
7677
}
7778
for (int i = 0; i < tableSchema.getFieldCount(); i++) {
78-
String name = tableSchema.getFieldNames().get(i);
79+
String name = fieldNames.get(i);
7980
DataType dt = tableSchema.getTypeAt(i);
8081
PojoType.Property prop = pojoType.getProperty(name);
8182
validateCompatibility(dt, prop);

0 commit comments

Comments
 (0)