Skip to content

Commit b85de0f

Browse files
global secondary index
1 parent 32003fc commit b85de0f

File tree

205 files changed

+26517
-508
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

205 files changed

+26517
-508
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ target
1717
build-target
1818
*.ipr
1919
*.iws
20+
.qoder/
2021
dependency-reduced-pom.xml
2122

2223
### VS Code ###

docs/global-secondary-index-design.md

Lines changed: 567 additions & 0 deletions
Large diffs are not rendered by default.

docs/state-manager-design.md

Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
# Bucket级别持久化状态管理器设计方案
2+
3+
## 1. 概述
4+
5+
本文档描述了在fluss项目中实现bucket级别持久化状态管理器的设计方案。该状态管理器提供两层存储抽象:底层的bytes模型KV存储和上层的强类型KV存储,与LogTablet有机结合,实现WAL能力和checkpoint机制。
6+
7+
## 2. 架构设计
8+
9+
### 2.1 整体架构
10+
11+
```
12+
┌─────────────────────────────────────────────────┐
13+
│ 强类型KV存储抽象层 │
14+
├─────────────────────────────────────────────────┤
15+
│ KV存储抽象层 │
16+
├─────────────────────────────────────────────────┤
17+
│ 存储实现层 (FileKv/LSMTree/RocksDB) │
18+
├─────────────────────────────────────────────────┤
19+
│ WAL & Checkpoint │
20+
│ (集成到LogSegment) │
21+
└─────────────────────────────────────────────────┘
22+
```
23+
24+
### 2.2 包结构设计
25+
26+
```
27+
org.apache.fluss.server.log.state/
28+
├── BucketStateManager.java // Bucket级别的状态管理器接口
29+
├── encoding/
30+
│ ├── StateKeyEncoder.java // 状态Key编码器
31+
│ ├── StateValueEncoder.java // 状态Value编码器
32+
│ └── VersionedValueWrapper.java // 支持版本演进的Value包装器
33+
├── storage/
34+
│ ├── StateStorage.java // 存储引擎抽象接口
35+
│ └── DefaultFileStateStorage.java // 简单文件存储实现
36+
├── wal/
37+
│ ├── StateChangeRecord.java // 状态变更记录
38+
│ └── StateWALWriter.java // 状态WAL写入器
39+
├── checkpoint/
40+
│ ├── StateCheckpoint.java // 状态检查点
41+
│ ├── StateCheckpointManager.java // 检查点管理器
42+
│ └── StateRecoveryManager.java // 状态恢复管理器
43+
└── recovery/
44+
└── StateReplayer.java // 状态重放引擎
45+
```
46+
47+
## 3. 核心接口设计
48+
49+
### 3.1 状态管理器接口
50+
51+
```java
52+
// StateManager.java - 状态管理器主接口
53+
public interface BucketStateManager {
54+
<K, V> TypedStateStore<K, V> getOrCreateStore(StateDescriptor<K, V> descriptor);
55+
void checkpoint(long logOffset) throws IOException;
56+
void recover(long fromOffset, long toOffset) throws IOException;
57+
void close() throws IOException;
58+
}
59+
```
60+
61+
### 3.2 存储层接口
62+
63+
```java
64+
// RawStateStore.java - 底层bytes KV存储
65+
public interface RawStateStore {
66+
void put(byte[] key, byte[] value) throws IOException;
67+
byte[] get(byte[] key) throws IOException;
68+
void delete(byte[] key) throws IOException;
69+
CloseableIterator<Entry<byte[], byte[]>> iterator() throws IOException;
70+
}
71+
72+
// TypedStateStore.java - 强类型KV存储
73+
public interface TypedStateStore<K, V> {
74+
void put(K key, V value) throws IOException;
75+
V get(K key) throws IOException;
76+
void delete(K key) throws IOException;
77+
CloseableIterator<Entry<K, V>> iterator() throws IOException;
78+
}
79+
```
80+
81+
### 3.3 存储引擎接口
82+
83+
```java
84+
// StateStorageEngine.java - 存储引擎抽象
85+
public interface StateStorage extends RawStateStore {
86+
void flush() throws IOException;
87+
void compact() throws IOException;
88+
long estimateSize();
89+
StateSnapshot createSnapshot() throws IOException;
90+
void restoreFromSnapshot(StateSnapshot snapshot) throws IOException;
91+
}
92+
```
93+
94+
## 4. WAL集成设计
95+
96+
### 4.1 LogSegment文件格式扩展
97+
98+
在现有LogSegment文件格式基础上,追加State变更日志存储区域:
99+
100+
```
101+
LogSegment文件结构:
102+
┌─────────────────────┐
103+
│ LogRecordBatch 1 │ <- 现有数据区域
104+
├─────────────────────┤
105+
│ LogRecordBatch 2 │
106+
├─────────────────────┤
107+
│ ... │
108+
├─────────────────────┤
109+
│ StateChangeLog │ <- 新增状态变更日志区域
110+
│ - Batch Index │ - 记录所属的Batch索引
111+
│ - Offset Index │ - 记录在Batch中的offset
112+
│ - State Changes │ - 具体的状态变更内容
113+
└─────────────────────┘
114+
```
115+
116+
### 4.2 状态变更日志格式
117+
118+
```java
119+
// StateChangeRecord.java - 状态变更记录
120+
public class StateChangeRecord {
121+
private final long batchIndex; // 关联的LogRecordBatch索引
122+
private final int offsetInBatch; // 在Batch中的offset位置
123+
private final String namespace; // 命名空间
124+
private final byte[] key; // 状态key
125+
private final byte[] value; // 状态value(null表示删除)
126+
private final long timestamp; // 变更时间戳
127+
private final byte operationType; // 操作类型:PUT/DELETE
128+
}
129+
```
130+
131+
### 4.3 WAL写入流程
132+
133+
```java
134+
// StateWALWriter.java - 状态WAL写入器
135+
public class StateWALWriter {
136+
// 与LogSegment写入同步,确保一致性
137+
public void writeStateChange(
138+
long batchIndex,
139+
int offsetInBatch,
140+
StateChangeRecord record
141+
) throws IOException;
142+
143+
// 在LogSegment rolling时触发
144+
public void flush() throws IOException;
145+
}
146+
```
147+
148+
## 5. 编码方案设计
149+
150+
### 5.1 Key编码格式
151+
152+
```
153+
State Key格式:
154+
┌──────────────┬──────────────┬──────────────┬──────────────┐
155+
│ Namespace │ Version(1B) │ Key Length │ User Key │
156+
│ (8B) │ │ (4B) │ (Variable) │
157+
└──────────────┴──────────────┴──────────────┴──────────────┘
158+
```
159+
160+
### 5.2 Value编码格式
161+
162+
```
163+
State Value格式:
164+
┌──────────────┬──────────────┬──────────────┬──────────────┐
165+
│ Version │ Schema ID │ Value Length │ User Value │
166+
│ (2B) │ (2B) │ (4B) │ (Variable) │
167+
└──────────────┴──────────────┴──────────────┴──────────────┘
168+
```
169+
170+
### 5.3 命名空间设计
171+
172+
```java
173+
// NamespaceEncoder.java - 命名空间编码
174+
public class NamespaceEncoder {
175+
// 支持层次化命名空间:table.bucket.operator.state
176+
public static long encodeNamespace(
177+
long tableId,
178+
int bucketId,
179+
String operatorName,
180+
String stateName
181+
);
182+
183+
public static NamespaceInfo decodeNamespace(long encodedNamespace);
184+
}
185+
```
186+
187+
## 6. Checkpoint机制设计
188+
189+
### 6.1 Checkpoint触发时机
190+
191+
- LogSegment rolling时自动触发
192+
- 手动触发(管理接口)
193+
194+
### 6.2 Checkpoint文件格式
195+
196+
```
197+
StateCheckpoint文件结构:
198+
┌─────────────────────┐
199+
│ Header │
200+
│ - Version: 4B │
201+
│ - Offset: 8B │
202+
│ - Timestamp: 8B │
203+
│ - CRC: 4B │
204+
├─────────────────────┤
205+
│ Metadata │
206+
│ - Namespace Count │
207+
│ - Total Key Count │
208+
├─────────────────────┤
209+
│ Data Blocks │
210+
│ - Namespace 1 │
211+
│ - Key-Value... │
212+
│ - Namespace 2 │
213+
│ - Key-Value... │
214+
└─────────────────────┘
215+
```
216+
217+
### 6.3 恢复流程
218+
219+
1. 加载最新的StateCheckpoint
220+
2. 从checkpoint对应的LogSegment开始重放WAL
221+
3. 应用状态变更到内存状态
222+
4. 重建索引和缓存
223+
224+
## 7. 存储实现方案
225+
226+
### 7.1 SimpleFileStateStorage(简单文件存储)
227+
228+
- 基于文件的简单KV存储
229+
- 适用于小规模状态
230+
- 实现简单,性能一般
231+
232+
233+
## 8. 配置参数设计
234+
235+
```properties
236+
# 状态管理器配置
237+
fluss.bucket.state.storage.engine=file # 存储引擎选择
238+
```
239+
240+
## 9. 与LogTablet集成
241+
242+
### 9.1 生命周期管理
243+
244+
```java
245+
// 在LogTablet中集成StateManager
246+
public class LogTablet {
247+
private final BucketStateManager stateManager;
248+
249+
// LogTablet创建时初始化StateManager
250+
public static LogTablet create(...) {
251+
BucketStateManager stateManager = new BucketStateManager(...);
252+
// ... 其他初始化逻辑
253+
}
254+
255+
// 在LogSegment rolling时触发checkpoint
256+
private void rollSegment() {
257+
// ... 现有rolling逻辑
258+
stateManager.checkpoint(getCurrentLogEndOffset());
259+
}
260+
}
261+
```
262+
263+
### 9.2 故障恢复集成
264+
265+
```java
266+
// 在LogTablet恢复过程中恢复状态
267+
public void recover(long fromOffset, long toOffset) {
268+
// ... 现有恢复逻辑
269+
stateManager.recover(fromOffset, toOffset);
270+
}
271+
```
272+
273+
## 10. 监控和度量
274+
275+
### 10.1 关键指标
276+
277+
- 状态存储大小
278+
- 读写QPS和延迟
279+
- Checkpoint频率和耗时
280+
- WAL写入量
281+
- 恢复时间
282+
283+
### 10.2 JMX指标
284+
285+
```java
286+
// 在BucketMetricGroup中添加状态管理器指标
287+
public class BucketMetricGroup {
288+
private void registerStateMetrics(BucketStateManager stateManager) {
289+
// 注册状态相关的监控指标
290+
}
291+
}
292+
```
293+
294+
## 11. 实现计划
295+
296+
### Phase 1: 核心框架
297+
1. 实现基础接口定义
298+
2. 实现SimpleFileStateStorage
299+
3. 实现基础的编码/解码功能
300+
4. 基础单元测试
301+
302+
### Phase 2: WAL集成
303+
1. 扩展LogSegment文件格式
304+
2. 实现StateWALWriter
305+
3. 集成到LogTablet生命周期
306+
4. 集成测试
307+
308+
### Phase 3: 高级功能
309+
1. 实现LSMTreeStateStorage或RocksDBStateStorage
310+
2. 完善Checkpoint机制
311+
3. 实现状态恢复功能
312+
4. 性能优化
313+
314+
### Phase 4: 生产就绪
315+
1. 完善监控和度量
316+
2. 压力测试和性能调优
317+
3. 文档完善
318+
4. 生产环境验证
319+
320+
## 12. 注意事项
321+
322+
1. **线程安全**: 所有公共接口必须是线程安全的
323+
2. **资源管理**: 正确管理文件句柄、内存等资源
324+
3. **错误处理**: 完善的异常处理和错误恢复机制
325+
4. **向后兼容**: 编码格式支持版本演进
326+
5. **性能考虑**: 避免在关键路径上引入性能瓶颈
327+
6. **配置灵活性**: 支持不同场景下的配置调优
328+
329+
## 13. 风险评估
330+
331+
1. **复杂性风险**: 设计较为复杂,需要careful实现
332+
2. **性能风险**: WAL写入可能影响写入性能
333+
3. **兼容性风险**: 文件格式变更需要向后兼容
334+
4. **稳定性风险**: 新增组件可能引入稳定性问题
335+
336+
## 14. 缓解方案
337+
338+
1. 分阶段实现,逐步验证
339+
2. 充分的单元测试和集成测试
340+
3. 性能基准测试和压力测试
341+
4. 提供配置开关,支持渐进式部署

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public Table getTable(TablePath tablePath) {
101101
// force to update the table info from server to avoid stale data in cache
102102
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
103103
TableInfo tableInfo = metadataUpdater.getTableInfoOrElseThrow(tablePath);
104-
return new FlussTable(this, tablePath, tableInfo);
104+
return new FlussTable(this, tablePath, tableInfo, conf);
105105
}
106106

107107
public RpcClient getRpcClient() {
@@ -133,7 +133,7 @@ public LookupClient getOrCreateLookupClient() {
133133
if (lookupClient == null) {
134134
synchronized (this) {
135135
if (lookupClient == null) {
136-
lookupClient = new LookupClient(conf, metadataUpdater);
136+
lookupClient = new LookupClient(conf, metadataUpdater, clientMetricGroup);
137137
}
138138
}
139139
}

0 commit comments

Comments
 (0)