Skip to content

Commit cf73e49

Browse files
committed
feat(kafka-storage-plugin): 添加 Kafka 存储插件核心功能
- 实现 PluginConfig 类用于管理插件配置,包括 Kafka、存储、指标和主题配置 - 开发 Kafka 协议解析器,支持 Kafka 请求帧解码和协议处理 - 构建 Kafka 服务器实现,支持 METADATA、PRODUCE、FETCH 等核心 API - 实现消费者偏移量存储和管理功能 - 添加 Kafka 存储指标收集和监控能力 - 集成 smart-socket 框架提供 Kafka 协议接入层 - 实现消息持久化存储和分区管理功能
1 parent d7f6e23 commit cf73e49

23 files changed

Lines changed: 2742 additions & 1 deletion
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>tech.smartboot.mqtt</groupId>
9+
<artifactId>plugins</artifactId>
10+
<version>1.5.5</version>
11+
</parent>
12+
13+
<artifactId>kafka-storage-plugin</artifactId>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>org.apache.kafka</groupId>
18+
<artifactId>kafka-clients</artifactId>
19+
<version>3.5.1</version>
20+
</dependency>
21+
22+
<dependency>
23+
<groupId>tech.smartboot.mqtt</groupId>
24+
<artifactId>smart-mqtt-broker</artifactId>
25+
<version>${project.version}</version>
26+
<scope>test</scope>
27+
</dependency>
28+
<dependency>
29+
<groupId>tech.smartboot.mqtt</groupId>
30+
<artifactId>smart-mqtt-client</artifactId>
31+
<version>${project.version}</version>
32+
<scope>test</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.junit.jupiter</groupId>
36+
<artifactId>junit-jupiter</artifactId>
37+
<version>5.10.0</version>
38+
<scope>test</scope>
39+
</dependency>
40+
</dependencies>
41+
42+
<build>
43+
<plugins>
44+
<plugin>
45+
<groupId>org.apache.maven.plugins</groupId>
46+
<artifactId>maven-surefire-plugin</artifactId>
47+
<version>3.2.5</version>
48+
<configuration>
49+
<useModulePath>false</useModulePath>
50+
</configuration>
51+
</plugin>
52+
</plugins>
53+
</build>
54+
</project>
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package tech.smartboot.mqtt.plugin.kafka.storage;
2+
3+
import tech.smartboot.mqtt.plugin.kafka.storage.config.PluginConfig;
4+
import tech.smartboot.mqtt.plugin.kafka.storage.kafka.server.KafkaServer;
5+
import tech.smartboot.mqtt.plugin.kafka.storage.metrics.KafkaStorageMetrics;
6+
import tech.smartboot.mqtt.plugin.kafka.storage.store.PersistentMessageStore;
7+
import tech.smartboot.mqtt.plugin.spec.BrokerContext;
8+
import tech.smartboot.mqtt.plugin.spec.Options;
9+
import tech.smartboot.mqtt.plugin.spec.Plugin;
10+
import tech.smartboot.mqtt.plugin.spec.schema.Enum;
11+
import tech.smartboot.mqtt.plugin.spec.schema.Item;
12+
import tech.smartboot.mqtt.plugin.spec.schema.Schema;
13+
14+
import java.io.File;
15+
import java.nio.file.Path;
16+
import java.util.concurrent.TimeUnit;
17+
18+
public class KafkaStoragePlugin extends Plugin {
19+
private PluginConfig config;
20+
private KafkaStorageMetrics metrics;
21+
private PersistentMessageStore store;
22+
private KafkaServer kafkaServer;
23+
24+
@Override
25+
protected void initPlugin(BrokerContext brokerContext) throws Throwable {
26+
File pluginStorage;
27+
try {
28+
pluginStorage = storage();
29+
} catch (IllegalStateException e) {
30+
log("storage directory is not assigned, skip kafka-storage-plugin bootstrap for classpath-only loading");
31+
return;
32+
}
33+
config = loadPluginConfig(PluginConfig.class);
34+
if (config == null) {
35+
config = new PluginConfig();
36+
}
37+
metrics = new KafkaStorageMetrics();
38+
File dataDir = new File(pluginStorage, config.getStorage().getDataPath());
39+
Path root = dataDir.toPath();
40+
store = new PersistentMessageStore(root, config.getStorage(), config.getTopics(), metrics);
41+
store.start();
42+
43+
kafkaServer = new KafkaServer(config.getKafka(), brokerContext, store, metrics);
44+
kafkaServer.start();
45+
addUsagePort(config.getKafka().getPort(), "kafka protocol port");
46+
47+
consumer((session, message) -> {
48+
try {
49+
store.appendMqtt(
50+
message.getTopic().getTopic(),
51+
message.getPayload(),
52+
(short) message.getQos().value(),
53+
message.isRetained(),
54+
session == null ? null : session.getClientId()
55+
);
56+
} catch (Exception e) {
57+
metrics.markError();
58+
e.printStackTrace();
59+
}
60+
});
61+
62+
timer().scheduleWithFixedDelay(() -> {
63+
try {
64+
store.flushDue();
65+
} catch (Exception e) {
66+
metrics.markError();
67+
e.printStackTrace();
68+
}
69+
}, config.getStorage().getFlushIntervalMs(), TimeUnit.MILLISECONDS);
70+
71+
timer().scheduleWithFixedDelay(() -> {
72+
try {
73+
store.cleanupExpired();
74+
} catch (Exception e) {
75+
metrics.markError();
76+
e.printStackTrace();
77+
}
78+
}, config.getStorage().getCleanupIntervalMs(), TimeUnit.MILLISECONDS);
79+
80+
timer().scheduleWithFixedDelay(() -> {
81+
KafkaStorageMetrics.Snapshot snapshot = metrics.snapshot(store.topicCount(), store.consumerGroupCount());
82+
log("[kafka-storage-plugin] " + snapshot.toString());
83+
}, config.getMetrics().getLogIntervalMs(), TimeUnit.MILLISECONDS);
84+
85+
log("kafka-storage-plugin started, kafka port: " + config.getKafka().getPort());
86+
}
87+
88+
@Override
89+
protected void destroyPlugin() {
90+
try {
91+
if (kafkaServer != null) {
92+
kafkaServer.shutdown();
93+
}
94+
if (store != null) {
95+
store.flushDue();
96+
store.close();
97+
}
98+
} catch (Exception e) {
99+
e.printStackTrace();
100+
}
101+
}
102+
103+
@Override
104+
public String getVersion() {
105+
return Options.VERSION;
106+
}
107+
108+
@Override
109+
public String getVendor() {
110+
return Options.VENDOR;
111+
}
112+
113+
@Override
114+
public String pluginName() {
115+
return "kafka-storage-plugin";
116+
}
117+
118+
@Override
119+
public Schema schema() {
120+
Schema schema = new Schema();
121+
122+
Item kafka = Item.Object("kafka", "Kafka 服务配置").col(12);
123+
kafka.addItems(
124+
Item.String("host", "监听地址").col(4),
125+
Item.Int("port", "监听端口").col(4),
126+
Item.Int("broker_id", "Broker ID").col(4),
127+
Item.String("cluster_id", "Cluster ID").col(4),
128+
Item.String("advertised_host", "对外地址").col(4),
129+
Item.Int("advertised_port", "对外端口").col(4),
130+
Item.Int("request_max_bytes", "请求最大字节数").col(4)
131+
);
132+
schema.addItem(kafka);
133+
134+
Item storage = Item.Object("storage", "存储配置").col(12);
135+
storage.addItems(
136+
Item.String("data_path", "数据目录").col(4),
137+
Item.Switch("auto_create_topics", "自动创建 topic").col(4),
138+
Item.Int("default_partition_count", "默认分区数").col(4),
139+
Item.Int("segment_bytes", "分段大小").col(4),
140+
Item.Int("retention_bytes", "容量上限").col(4),
141+
Item.Int("retention_hours", "保留小时数").col(4),
142+
Item.Int("cleanup_interval_ms", "清理周期(ms)").col(4),
143+
Item.Int("flush_interval_ms", "刷盘周期(ms)").col(4),
144+
Item.Switch("flush_on_every_write", "每次写入立即刷盘").col(4),
145+
Item.String("mqtt_partition_strategy", "MQTT 分区策略")
146+
.col(4)
147+
.addEnums(Enum.of("round_robin", "round_robin"), Enum.of("topic_hash", "topic_hash"))
148+
);
149+
schema.addItem(storage);
150+
151+
Item metricsItem = Item.Object("metrics", "指标配置").col(12);
152+
metricsItem.addItems(Item.Int("log_interval_ms", "日志输出周期(ms)").col(4));
153+
schema.addItem(metricsItem);
154+
155+
Item topics = Item.ItemArray("topics", "预创建 topic").col(12);
156+
topics.addItems(
157+
Item.String("name", "topic 名称").col(8),
158+
Item.Int("partitions", "分区数").col(4)
159+
);
160+
schema.addItem(topics);
161+
return schema;
162+
}
163+
}

0 commit comments

Comments
 (0)