Skip to content

Commit 6598ddf

Browse files
authored
Merge branch 'main' into lakesense
2 parents bf03205 + 49ce39d commit 6598ddf

File tree

369 files changed

+21182
-3335
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

369 files changed

+21182
-3335
lines changed

.github/ISSUE_TEMPLATE/bug.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ body:
3939
description: What Fluss version are you using?
4040
multiple: false
4141
options:
42-
- "0.7.0 (latest release)"
43-
- "0.6.0"
44-
- "0.5.0"
42+
- "0.8.0 (latest release)"
43+
- "0.7.0"
4544
- "main (development)"
4645
validations:
4746
required: true

.github/workflows/docs-check.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
run: ./build_versioned_docs.sh
4545
- uses: actions/setup-node@v4
4646
with:
47-
node-version: 18
47+
node-version: 20
4848
- name: Install dependencies
4949
run: npm install
5050
- name: Test build website

README.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,12 @@ Prerequisites for building Apache Fluss:
6161
- Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL)
6262
- Git
6363
- Maven (we require version >= 3.8.6)
64-
- Java 8 or 11
64+
- Java 11
6565

6666
```bash
6767
git clone https://github.com/apache/fluss.git
6868
cd fluss
69-
# in case of java 11
7069
./mvnw clean package -DskipTests
71-
# or in case of java 8
72-
./mvnw clean package -DskipTests -Pjava8
7370
```
7471

7572
Apache Fluss is now installed in `build-target`. The build command uses Maven Wrapper (`mvnw`) which ensures the correct Maven version is used.

docker/quickstart-flink/prepare_build.sh

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,12 @@ main() {
170170
# Iceberg Support
171171
log_info "Downloading Iceberg connector JARs..."
172172

173-
# Download iceberg-flink-runtime for Flink 1.20 (version 1.9.1)
173+
# Download iceberg-flink-runtime for Flink 1.20 (version 1.10.0)
174174
download_jar \
175-
"https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.20/1.9.1/iceberg-flink-runtime-1.20-1.9.1.jar" \
176-
"./lib/iceberg-flink-runtime-1.20-1.9.1.jar" \
175+
"https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.20/1.10.0/iceberg-flink-runtime-1.20-1.10.0.jar" \
176+
"./lib/iceberg-flink-runtime-1.20-1.10.0.jar" \
177177
"" \
178-
"iceberg-flink-runtime-1.20-1.9.1"
178+
"iceberg-flink-runtime-1.20-1.10.0"
179179

180180

181181
# Prepare lake tiering JAR
@@ -201,7 +201,7 @@ verify_jars() {
201201
"flink-faker-0.5.3.jar"
202202
"hadoop-apache-3.3.5-2.jar"
203203
"paimon-flink-1.20-1.2.0.jar"
204-
"iceberg-flink-runtime-1.20-1.9.1.jar"
204+
"iceberg-flink-runtime-1.20-1.10.0.jar"
205205
)
206206

207207
local opt_jars=(
@@ -250,7 +250,7 @@ show_summary() {
250250
echo " ✓ Fluss Flink 1.20 connector"
251251
echo " ✓ Fluss Lake Paimon connector"
252252
echo " ✓ Fluss Lake Iceberg connector"
253-
echo " ✓ Iceberg Flink runtime 1.20 (v1.9.1)"
253+
echo " ✓ Iceberg Flink runtime 1.20 (v1.10.0)"
254254
echo " ✓ Paimon Flink 1.20 (v1.2.0)"
255255
echo " ✓ Hadoop Apache (v3.3.5-2)"
256256
echo " ✓ Flink Faker (v0.5.3)"

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@
2929
import org.apache.fluss.client.token.SecurityTokenManager;
3030
import org.apache.fluss.client.token.SecurityTokenProvider;
3131
import org.apache.fluss.client.write.WriterClient;
32+
import org.apache.fluss.cluster.ServerNode;
3233
import org.apache.fluss.config.ConfigOptions;
3334
import org.apache.fluss.config.Configuration;
3435
import org.apache.fluss.exception.FlussRuntimeException;
3536
import org.apache.fluss.fs.FileSystem;
36-
import org.apache.fluss.metadata.TableInfo;
3737
import org.apache.fluss.metadata.TablePath;
3838
import org.apache.fluss.metrics.registry.MetricRegistry;
3939
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -43,6 +43,7 @@
4343

4444
import java.time.Duration;
4545
import java.util.HashMap;
46+
import java.util.HashSet;
4647
import java.util.List;
4748

4849
import static org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
@@ -61,6 +62,7 @@ public final class FlussConnection implements Connection {
6162
private volatile LookupClient lookupClient;
6263
private volatile RemoteFileDownloader remoteFileDownloader;
6364
private volatile SecurityTokenManager securityTokenManager;
65+
private volatile Admin admin;
6466

6567
FlussConnection(Configuration conf) {
6668
this(conf, MetricRegistry.create(conf, null));
@@ -93,19 +95,15 @@ public Configuration getConfiguration() {
9395

9496
@Override
9597
public Admin getAdmin() {
96-
return new FlussAdmin(rpcClient, metadataUpdater);
98+
return getOrCreateAdmin();
9799
}
98100

99101
@Override
100102
public Table getTable(TablePath tablePath) {
101-
// force to update the table info from server to avoid stale data in cache
103+
// force to update the table info from server to avoid stale data in cache.
102104
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
103-
TableInfo tableInfo = metadataUpdater.getTableInfoOrElseThrow(tablePath);
104-
return new FlussTable(this, tablePath, tableInfo);
105-
}
106-
107-
public RpcClient getRpcClient() {
108-
return rpcClient;
105+
Admin admin = getOrCreateAdmin();
106+
return new FlussTable(this, tablePath, admin.getTableInfo(tablePath).join());
109107
}
110108

111109
public MetadataUpdater getMetadataUpdater() {
@@ -140,6 +138,17 @@ public LookupClient getOrCreateLookupClient() {
140138
return lookupClient;
141139
}
142140

141+
public Admin getOrCreateAdmin() {
142+
if (admin == null) {
143+
synchronized (this) {
144+
if (admin == null) {
145+
admin = new FlussAdmin(rpcClient, metadataUpdater);
146+
}
147+
}
148+
}
149+
return admin;
150+
}
151+
143152
public RemoteFileDownloader getOrCreateRemoteFileDownloader() {
144153
if (remoteFileDownloader == null) {
145154
synchronized (this) {
@@ -155,9 +164,17 @@ public RemoteFileDownloader getOrCreateRemoteFileDownloader() {
155164
// todo: may add retry logic when no any available tablet server?
156165
AdminReadOnlyGateway gateway =
157166
GatewayClientProxy.createGatewayProxy(
158-
() ->
159-
getOneAvailableTabletServerNode(
160-
metadataUpdater.getCluster()),
167+
() -> {
168+
ServerNode serverNode =
169+
getOneAvailableTabletServerNode(
170+
metadataUpdater.getCluster(),
171+
new HashSet<>());
172+
if (serverNode == null) {
173+
throw new FlussRuntimeException(
174+
"no available tablet server");
175+
}
176+
return serverNode;
177+
},
161178
rpcClient,
162179
AdminReadOnlyGateway.class);
163180
SecurityTokenProvider securityTokenProvider =

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@
8888
import java.util.List;
8989
import java.util.Map;
9090
import java.util.concurrent.CompletableFuture;
91-
import java.util.stream.Collectors;
9291

92+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
9393
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9494
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9595
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
@@ -248,18 +248,8 @@ public CompletableFuture<Void> createTable(
248248
public CompletableFuture<Void> alterTable(
249249
TablePath tablePath, List<TableChange> tableChanges, boolean ignoreIfNotExists) {
250250
tablePath.validate();
251-
AlterTableRequest request = new AlterTableRequest();
252-
253-
List<PbAlterConfig> pbFlussTableChanges =
254-
tableChanges.stream()
255-
.map(ClientRpcMessageUtils::toPbAlterConfigs)
256-
.collect(Collectors.toList());
257-
258-
request.addAllConfigChanges(pbFlussTableChanges)
259-
.setIgnoreIfNotExists(ignoreIfNotExists)
260-
.setTablePath()
261-
.setDatabaseName(tablePath.getDatabaseName())
262-
.setTableName(tablePath.getTableName());
251+
AlterTableRequest request =
252+
makeAlterTableRequest(tablePath, tableChanges, ignoreIfNotExists);
263253
return gateway.alterTable(request).thenApply(r -> null);
264254
}
265255

@@ -420,15 +410,21 @@ private ListOffsetsResult listOffsets(
420410
OffsetSpec offsetSpec) {
421411
Long partitionId = null;
422412
metadataUpdater.updateTableOrPartitionMetadata(physicalTablePath.getTablePath(), null);
423-
long tableId = metadataUpdater.getTableId(physicalTablePath.getTablePath());
413+
TableInfo tableInfo = getTableInfo(physicalTablePath.getTablePath()).join();
414+
424415
// if partition name is not null, we need to check and update partition metadata
425416
if (physicalTablePath.getPartitionName() != null) {
426417
metadataUpdater.updatePhysicalTableMetadata(Collections.singleton(physicalTablePath));
427418
partitionId = metadataUpdater.getPartitionIdOrElseThrow(physicalTablePath);
428419
}
429420
Map<Integer, ListOffsetsRequest> requestMap =
430421
prepareListOffsetsRequests(
431-
metadataUpdater, tableId, partitionId, buckets, offsetSpec);
422+
metadataUpdater,
423+
tableInfo.getTableId(),
424+
partitionId,
425+
buckets,
426+
offsetSpec,
427+
tableInfo.getTablePath());
432428
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = MapUtils.newConcurrentHashMap();
433429
for (int bucket : buckets) {
434430
bucketToOffsetMap.put(bucket, new CompletableFuture<>());
@@ -545,10 +541,13 @@ private static Map<Integer, ListOffsetsRequest> prepareListOffsetsRequests(
545541
long tableId,
546542
@Nullable Long partitionId,
547543
Collection<Integer> buckets,
548-
OffsetSpec offsetSpec) {
544+
OffsetSpec offsetSpec,
545+
TablePath tablePath) {
549546
Map<Integer, List<Integer>> nodeForBucketList = new HashMap<>();
550547
for (Integer bucketId : buckets) {
551-
int leader = metadataUpdater.leaderFor(new TableBucket(tableId, partitionId, bucketId));
548+
int leader =
549+
metadataUpdater.leaderFor(
550+
tablePath, new TableBucket(tableId, partitionId, bucketId));
552551
nodeForBucketList.computeIfAbsent(leader, k -> new ArrayList<>()).add(bucketId);
553552
}
554553

fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,46 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.metadata.TablePath;
2223

2324
import java.util.concurrent.CompletableFuture;
2425

2526
/** Abstract Class to represent a lookup operation. */
2627
@Internal
2728
public abstract class AbstractLookupQuery<T> {
2829

30+
private final TablePath tablePath;
2931
private final TableBucket tableBucket;
3032
private final byte[] key;
33+
private int retries;
3134

32-
public AbstractLookupQuery(TableBucket tableBucket, byte[] key) {
35+
public AbstractLookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] key) {
36+
this.tablePath = tablePath;
3337
this.tableBucket = tableBucket;
3438
this.key = key;
39+
this.retries = 0;
3540
}
3641

3742
public byte[] key() {
3843
return key;
3944
}
4045

46+
public TablePath tablePath() {
47+
return tablePath;
48+
}
49+
4150
public TableBucket tableBucket() {
4251
return tableBucket;
4352
}
4453

54+
public int retries() {
55+
return retries;
56+
}
57+
58+
public void incrementRetries() {
59+
retries++;
60+
}
61+
4562
public abstract LookupType lookupType();
4663

4764
public abstract CompletableFuture<T> future();

0 commit comments

Comments
 (0)