Skip to content

Commit 03939fe

Browse files
authored
[kafka] Add Kafka protocol compatibility configs (#592)
1 parent 51094e9 commit 03939fe

File tree

5 files changed

+121
-33
lines changed

5 files changed

+121
-33
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1373,6 +1373,28 @@ public class ConfigOptions {
13731373
"The datalake format used by Fluss to be as lake storage, such as Paimon, Iceberg, Hudi. "
13741374
+ "Now, only support Paimon.");
13751375

1376+
// ------------------------------------------------------------------------
1377+
// ConfigOptions for fluss kafka
1378+
// ------------------------------------------------------------------------
1379+
public static final ConfigOption<Boolean> KAFKA_ENABLED =
1380+
key("kafka.enabled")
1381+
.booleanType()
1382+
.defaultValue(false)
1383+
.withDescription(
1384+
"Whether enable fluss kafka. Disabled by default. "
1385+
+ "When this option is set to true, the fluss kafka will be enabled.");
1386+
public static final ConfigOption<Integer> KAFKA_PORT =
1387+
key("kafka.port")
1388+
.intType()
1389+
.defaultValue(9092)
1390+
.withDescription("The port for fluss kafka. The default port is 9092.");
1391+
public static final ConfigOption<String> KAFKA_DATABASE =
1392+
key("kafka.database")
1393+
.stringType()
1394+
.defaultValue("_kafka")
1395+
.withDescription(
1396+
"The database for fluss kafka. The default database is '_kafka'.");
1397+
13761398
/**
13771399
* Compaction style for Fluss's kv, which is same to rocksdb's, but help use avoid including
13781400
* rocksdb dependency when only need include this common module.

fluss-dist/src/main/resources/server.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,10 @@ tablet-server.id: 0
8080
# fs.oss.endpoint: xxx
8181
# fs.oss.accessKeyId: xxx
8282
# fs.oss.accessKeySecret: xxx
83+
84+
#==============================================================================
85+
# Fluss Kafka Configs
86+
#==============================================================================
87+
kafka.enabled: false
88+
kafka.port: 9092
89+
kafka.database: _kafka

fluss-kafka/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,11 @@
4646
<groupId>org.slf4j</groupId>
4747
<artifactId>slf4j-api</artifactId>
4848
</dependency>
49+
<dependency>
50+
<groupId>com.alibaba.fluss</groupId>
51+
<artifactId>fluss-common</artifactId>
52+
<version>${project.version}</version>
53+
<scope>test</scope>
54+
</dependency>
4955
</dependencies>
5056
</project>
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.kafka;
18+
19+
import com.alibaba.fluss.config.ConfigOptions;
20+
import com.alibaba.fluss.config.Configuration;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
29+
public class KafkaConfigsTest {
30+
@Test
31+
public void testFromMap() throws Exception {
32+
Map<String, String> map = new HashMap<>();
33+
map.put(ConfigOptions.KAFKA_ENABLED.key(), "true");
34+
map.put(ConfigOptions.KAFKA_PORT.key(), "9093");
35+
map.put(ConfigOptions.KAFKA_DATABASE.key(), "fluss");
36+
Configuration configuration = Configuration.fromMap(map);
37+
38+
assertThat(configuration.getBoolean(ConfigOptions.KAFKA_ENABLED)).isTrue();
39+
assertThat(configuration.getInt(ConfigOptions.KAFKA_PORT)).isEqualTo(9093);
40+
assertThat(configuration.getString(ConfigOptions.KAFKA_DATABASE)).isEqualTo("fluss");
41+
}
42+
43+
@Test
44+
public void testFromDefault() throws Exception {
45+
Configuration configuration = Configuration.fromMap(new HashMap<>());
46+
assertThat(configuration.getBoolean(ConfigOptions.KAFKA_ENABLED)).isFalse();
47+
assertThat(configuration.getInt(ConfigOptions.KAFKA_PORT)).isEqualTo(9092);
48+
assertThat(configuration.getString(ConfigOptions.KAFKA_DATABASE)).isEqualTo("_kafka");
49+
}
50+
}

0 commit comments

Comments
 (0)