Skip to content

Commit 4345663

Browse files
committed
[server] Use unified batch-concurrent implementation for ZooKeeper operations and remove redundant code
This commit replaces all single-operation async calls with a unified batch-concurrent implementation. This eliminates the need to maintain three separate code paths (batch concurrent, single async, and sync), significantly reducing code duplication in ZooKeeperClient. Going forward, all sync ZooKeeper operations will be replaced by the batch-concurrent approach, which improves coordinator performance through better I/O utilization and reduced latency. Additionally, the `cacheOnly` flag has been removed from the current metadata request. Support for cache-only semantics will be reintroduced in a future `MetadataRequestV2` with a cleaner design.
1 parent 32d50b8 commit 4345663

File tree

26 files changed

+1102
-1876
lines changed

26 files changed

+1102
-1876
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ void testOperateNotExistPartitionShouldThrowException() throws Exception {
327327
LogScanner logScanner = table.newScan().createLogScanner();
328328
assertThatThrownBy(() -> logScanner.subscribe(100L, 0, 0))
329329
.isInstanceOf(PartitionNotExistException.class)
330-
.hasMessageContaining("Partition not exist for partition ids: [100]");
330+
.hasMessageContaining("The partition id '100' does not exist");
331331
logScanner.close();
332332

333333
// todo: test the case that client produce to a partition to a server, but

fluss-common/src/main/java/org/apache/fluss/exception/CacheMissException.java

Lines changed: 0 additions & 36 deletions
This file was deleted.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
org.apache.fluss.testutils.common.TestLoggerExtension
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
# Set root logger level to OFF to not flood build logs
20+
# set manually to INFO for debugging purposes
21+
rootLogger.level = OFF
22+
rootLogger.appenderRef.test.ref = TestLogger
23+
24+
appender.testlogger.name = TestLogger
25+
appender.testlogger.type = CONSOLE
26+
appender.testlogger.target = SYSTEM_ERR
27+
appender.testlogger.layout.type = PatternLayout
28+
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
29+
30+
# suppress the duplicated logger extension
31+
logger.flink.name = org.apache.flink.util.TestLoggerExtension
32+
logger.flink.level = OFF
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
org.apache.fluss.testutils.common.TestLoggerExtension
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
# Set root logger level to OFF to not flood build logs
20+
# set manually to INFO for debugging purposes
21+
rootLogger.level = OFF
22+
rootLogger.appenderRef.test.ref = TestLogger
23+
24+
appender.testlogger.name = TestLogger
25+
appender.testlogger.type = CONSOLE
26+
appender.testlogger.target = SYSTEM_ERR
27+
appender.testlogger.layout.type = PatternLayout
28+
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
29+
30+
# suppress the duplicated logger extension
31+
logger.flink.name = org.apache.flink.util.TestLoggerExtension
32+
logger.flink.level = OFF

fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.exception.ApiException;
2121
import org.apache.fluss.exception.AuthenticationException;
2222
import org.apache.fluss.exception.AuthorizationException;
23-
import org.apache.fluss.exception.CacheMissException;
2423
import org.apache.fluss.exception.CorruptMessageException;
2524
import org.apache.fluss.exception.CorruptRecordException;
2625
import org.apache.fluss.exception.DatabaseAlreadyExistException;
@@ -225,9 +224,6 @@ public enum Errors {
225224
55,
226225
"The new ISR contains at least one ineligible replica.",
227226
IneligibleReplicaException::new),
228-
CACHE_MISS(
229-
56, "The requested metadata is not available in local cache.", CacheMissException::new);
230-
IneligibleReplicaException::new),
231227
INVALID_ALTER_TABLE_EXCEPTION(
232228
56, "The alter table is invalid.", InvalidAlterTableException::new);
233229

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,6 @@ message MetadataRequest {
176176
// metadata request
177177
// todo: we won't need the assumption after we introduce metadata cache in server
178178
repeated int64 partitions_id = 3 [packed = true];
179-
180-
// if cache_only is true, return immediately if metadata is not in local cache
181-
// instead of waiting for ZooKeeper access to complete
182-
optional bool cache_only = 4;
183179
}
184180

185181
message MetadataResponse {

0 commit comments

Comments
 (0)