Skip to content

Commit f5e8928

Browse files
authored
[server] Fix admin operation cannot be done error when bootstrap server config as one of TabletServer (alibaba#525)
1 parent cc6b9af commit f5e8928

File tree

7 files changed

+43
-9
lines changed

7 files changed

+43
-9
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

+33
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616

1717
package com.alibaba.fluss.client.admin;
1818

19+
import com.alibaba.fluss.client.Connection;
20+
import com.alibaba.fluss.client.ConnectionFactory;
1921
import com.alibaba.fluss.client.metadata.KvSnapshotMetadata;
2022
import com.alibaba.fluss.client.metadata.KvSnapshots;
2123
import com.alibaba.fluss.client.table.Table;
2224
import com.alibaba.fluss.client.table.writer.UpsertWriter;
2325
import com.alibaba.fluss.cluster.ServerNode;
2426
import com.alibaba.fluss.config.AutoPartitionTimeUnit;
2527
import com.alibaba.fluss.config.ConfigOptions;
28+
import com.alibaba.fluss.config.Configuration;
2629
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
2730
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
2831
import com.alibaba.fluss.exception.DatabaseNotExistException;
@@ -742,6 +745,36 @@ tablePath, newPartitionSpec("pt", String.valueOf(currentYear + 1)), false)
742745
String.valueOf(currentYear)));
743746
}
744747

748+
@Test
749+
void testBootstrapServerConfigAsTabletServer() throws Exception {
750+
Configuration newConf = clientConf;
751+
ServerNode ts0 = FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().get(0);
752+
newConf.set(
753+
ConfigOptions.BOOTSTRAP_SERVERS,
754+
Collections.singletonList(String.format("%s:%d", ts0.host(), ts0.port())));
755+
try (Connection conn = ConnectionFactory.createConnection(clientConf)) {
756+
Admin newAdmin = conn.getAdmin();
757+
String dbName = "test_bootstrap_server_t1";
758+
newAdmin.createDatabase(
759+
dbName,
760+
DatabaseDescriptor.builder().comment("test comment").build(),
761+
false)
762+
.get();
763+
newAdmin.createTable(
764+
TablePath.of(dbName, "test_table_1"),
765+
TableDescriptor.builder().schema(Schema.newBuilder().build()).build(),
766+
false)
767+
.get();
768+
assertThat(newAdmin.getDatabaseInfo(dbName).get().getDatabaseName()).isEqualTo(dbName);
769+
assertThat(
770+
newAdmin.getTableInfo(TablePath.of(dbName, "test_table_1"))
771+
.get()
772+
.getTablePath()
773+
.getTableName())
774+
.isEqualTo("test_table_1");
775+
}
776+
}
777+
745778
private void assertHasTabletServerNumber(int tabletServerNumber) {
746779
CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
747780
retry(

fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
557557

558558
return new Cluster(
559559
aliveTabletServersById,
560-
new ServerNode(-1, "localhost", 89, ServerType.COORDINATOR),
560+
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
561561
bucketsByPath,
562562
tableIdByPath,
563563
Collections.emptyMap(),

fluss-client/src/test/java/com/alibaba/fluss/client/write/StickyStaticBucketAssignerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private Cluster updateCluster(List<BucketLocation> bucketLocations) {
224224

225225
return new Cluster(
226226
aliveTabletServersById,
227-
new ServerNode(-1, "localhost", 89, ServerType.COORDINATOR),
227+
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
228228
bucketsByPath,
229229
tableIdByPath,
230230
Collections.emptyMap(),

fluss-common/src/test/java/com/alibaba/fluss/cluster/ClusterTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
/** Test for {@link Cluster}. */
4747
class ClusterTest {
4848
private static final ServerNode COORDINATOR_SERVER =
49-
new ServerNode(-1, "localhost", 98, ServerType.COORDINATOR);
49+
new ServerNode(0, "localhost", 98, ServerType.COORDINATOR);
5050
private static final ServerNode[] NODES =
5151
new ServerNode[] {
5252
new ServerNode(0, "localhost", 99, ServerType.TABLET_SERVER),

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,11 @@ private ServerNode getCoordinatorServerNode() {
274274
.getCoordinatorAddress()
275275
.map(
276276
coordinatorAddress ->
277-
// we set id to -1 as the id for the id stored in zk
278-
// for coordinator server is an uuid now
277+
// TODO we set id to 0 as that CoordinatorServer don't support
278+
// HA, if we support HA, we need to set id to the config
279+
// CoordinatorServer id to avoid node drift.
279280
new ServerNode(
280-
-1,
281+
0,
281282
coordinatorAddress.getHost(),
282283
coordinatorAddress.getPort(),
283284
ServerType.COORDINATOR))

fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTestBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ public void setup() throws Exception {
201201
private void initMetadataCache(ServerMetadataCache metadataCache) {
202202
metadataCache.updateMetadata(
203203
new ClusterMetadataInfo(
204-
Optional.of(new ServerNode(-1, "localhost", 1234, ServerType.COORDINATOR)),
204+
Optional.of(new ServerNode(0, "localhost", 1234, ServerType.COORDINATOR)),
205205
new HashSet<>(
206206
Arrays.asList(
207207
new ServerNode(

fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,9 @@ public void startCoordinatorServer() throws Exception {
222222
coordinatorServer = new CoordinatorServer(conf);
223223
coordinatorServer.start();
224224
coordinatorServerNode =
225-
// we use -1 as coordinator server id
225+
// TODO, Currently, we use 0 as coordinator server id.
226226
new ServerNode(
227-
-1, HOST_ADDRESS, availablePort.getPort(), ServerType.COORDINATOR);
227+
0, HOST_ADDRESS, availablePort.getPort(), ServerType.COORDINATOR);
228228
}
229229
} else {
230230
// start the existing coordinator server

0 commit comments

Comments
 (0)