Skip to content

Commit fe75966

Browse files
authored
Merge pull request #8 from Icingworld/dev
Fix a bug and complete readme
2 parents ae7925b + e43300d commit fe75966

File tree

9 files changed

+158
-9
lines changed

9 files changed

+158
-9
lines changed

README.md

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,141 @@
11
# WW-RaftKV
2-
基于Raft共识算法的分布式KV存储
2+
3+
基于Raft共识算法的分布式 KV 存储
4+
5+
## 一、简介
6+
7+
该项目是一个基于`Raft`共识算法实现的分布式`KV`存储系统,支持节点容错、高可用部署和一致性日志复制。是`WW`系列中学习分布式系统知识后关于`Raft`的简易实现。
8+
9+
## 二、依赖
10+
11+
+ 系统: `Linux` (for `muduo`)
12+
+ 第三方库: `Protobuf3`
13+
14+
## 三、模块
15+
16+
该实现包含了几个核心模块: `KVStore` `RaftRpc` `Raft`
17+
18+
### 1. KVStore
19+
20+
该模块位于`raftkv/kvstore/`,用于提供存分布式存储系统所需要的`KV`存储功能,它的底层实现是跳表。
21+
22+
### 2. RaftRpc
23+
24+
该模块用于提供`Raft`实现中所需要的`Rpc`功能,模块是基于`Google Rpc`框架的扩展。在该框架中,服务端由`Dispatcher``ServiceImpl`组成,并用`Server`封装,`ServiceImpl`提供具体方法实现,`Dispatcher`提供`TcpServer`和服务方法表,处理客户端请求;客户端由`Channel`组成,并用`Client`封装,`Channel`提供具体的`TcpClient`连接实现。
25+
26+
#### 服务端
27+
28+
服务端由以下组件组成:
29+
30+
+ `ServiceImpl`: 封装`Raft`协议中各类`RPC`方法的具体处理逻辑,包括`RequestVote``AppendEntries``InstallSnapshot`,以及客户端操作`KV`存储所需要的`RaftOperate`方法;
31+
+ `Dispatcher`: 维护服务方法注册表,并内置一个`TcpServer`,用于监听连接并分发请求;
32+
+ `Server`: 对外提供统一的服务端启动接口,整合`Dispatcher``ServiceImpl`
33+
34+
#### 客户端
35+
36+
客户端由以下组件组成:
37+
38+
+ `Channel`: 封装对某个`Raft`节点的连接与通信逻辑,内部使用`TcpClient`实现持久连接和异步消息发送;
39+
+ `Client`: 管理`Channel`实例,提供面向`Raft`协议调用的高层封装,便于发送`RequestVote``AppendEntries``InstallSnapshot`请求。
40+
41+
### 3. Raft
42+
43+
该模块位于`raftkv/raft-core`,是关于`Raft`的核心算法实现,它采用状态机模式,不关心网络行为,通过上下文与应用层(`RaftClerk`)通信。它负责处理节点间的选举、日志复制、日志提交以及状态同步等关键流程。该模块遵循`Raft`协议的规范设计,具备良好的可读性与可维护性,核心功能包括:
44+
45+
+ 节点状态管理: 支持`Follower``Candidate``Leader`三种角色的状态切换;
46+
+ 选举机制: 实现了基于任期的投票逻辑,支持随机超时选举触发与投票限制;
47+
+ 日志复制: `Leader`将客户端请求转化为日志条目并同步给其他节点;
48+
+ 日志一致性保障: 通过`prevLogIndex``prevLogTerm`保证日志匹配性;
49+
+ 日志提交与应用: 日志在被大多数节点确认后提交,并推送给状态机应用;
50+
+ 心跳机制: `Leader`定期发送心跳保持领导地位并触发空日志复制;
51+
+ 任期与投票状态持久化: 防止重启后错误行为;
52+
+ 快照与日志压缩支持: 适用于长时间运行场景下的日志膨胀问题。
53+
54+
## 四、测试
55+
56+
测试代码位于`example/`中,包含了用于模拟启动集群的`raft_example`,使用方法:
57+
58+
```bash
59+
./raft_example n
60+
```
61+
62+
其中`n`为节点号,该集群中包含`0-8`共9个节点。`example/`中还有两个`shell`脚本,`run.sh``stop.sh`,其中,`run.sh`用于启动`2-8`共7个节点,其日志会重定向到`logs/`中;`stop.sh`强制关闭所有`raft_example`进程。
63+
64+
`example/`中还包含了用于模拟客户端操作请求的`raft_client`,使用方法:
65+
66+
```bash
67+
./raft_client put a b
68+
./raft_client get a
69+
```
70+
71+
`raft_client`默认连接`node 0`,确保在启动集群时,先启动该节点,再启动剩余节点,以保证`node 0`能够当选`Leader`,当`node 0`不为`Leader`时,`raft_client`请求会返回当前`Leader``IP`地址和端口用于重定向。
72+
73+
当前存储系统支持的操作有:
74+
75+
+ `put`: 不允许重复的插入键值对,使用方法为`put {key} {value}`
76+
+ `update`: 更新或插入键值对,使用方法为`update {key} {value}`
77+
+ `get`: 读取已存在的键值对,使用方法为`get {key}`
78+
+ `remove`: 删除已存在的键值对,使用方法为`remove {key}`
79+
80+
## 五、快速启动
81+
82+
这是一个快速使用`Raft`测试集群的示例。
83+
84+
### 1. 编译
85+
86+
```bash
87+
mkdir build && cd build
88+
cmake ..
89+
make -j4
90+
```
91+
92+
### 2. 启动集群
93+
94+
以下按顺序启动。
95+
96+
终端1,启动用于观察`Leader`日志的节点:
97+
98+
```bash
99+
cd build/example/
100+
./raft_example 0
101+
```
102+
103+
终端2,启动用于观察`Follower`日志的节点:
104+
105+
```bash
106+
cd build/example/
107+
./raft_example 1
108+
```
109+
110+
终端3,启动剩余节点:
111+
112+
```bash
113+
cd build/example/
114+
./run.sh
115+
```
116+
117+
终端4,进行想要的操作:
118+
119+
```bash
120+
cd build/example/
121+
./raft_client put hello ww-raftkv
122+
./raft_client get hello
123+
```
124+
125+
### 3. 运行示例
126+
127+
+ node 0
128+
129+
![node 0](doc/img/node%200.png)
130+
131+
+ node 1
132+
133+
![node 1](doc/img/node%201.png)
134+
135+
+ node 2-8
136+
137+
![node 2-8](doc/img/node%202-8.png)
138+
139+
+ client
140+
141+
![client](doc/img/client.png)

doc/img/client.png

13.9 KB
Loading

doc/img/node 0.png

103 KB
Loading

doc/img/node 1.png

60.5 KB
Loading

doc/img/node 2-8.png

31.5 KB
Loading

example/src/raft_client.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <client_channel.h>
66
#include <RaftOperation.pb.h>
77
#include <RaftRpcClosure.h>
8+
#include <muduo/base/Logging.h>
89

910
void ParseResponse(WW::RaftOperationService_Stub* stub, const WW::RaftOperationRequest& request, const WW::RaftOperationResponse& response)
1011
{
@@ -84,6 +85,8 @@ int main(int argc, char** argv)
8485
return 1;
8586
}
8687

88+
muduo::Logger::setLogLevel(muduo::Logger::LogLevel::ERROR);
89+
8790
std::string ip = "127.0.0.1";
8891
std::string port = "4397";
8992

raftkv/raft-core/src/Raft.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ void Raft::_SendAppendEntries(bool _IsHearbeat)
183183
// DEBUG("prev_index:%d, base_index:%d, last_index:%d", prev_index, _Node.getBaseIndex(), _Node.getLastIndex());
184184
TermId prev_term = _Node.getTerm(prev_index);
185185

186-
if (prev_term == 0) {
186+
if (prev_term == 0 && prev_index != 0) {
187187
// 无论是心跳还是同步,都可以改为快照发送
188188
// 该位置可能非法或处于快照的范围内,改为发送 InstallSnapshot
189189
heartbeat_request.type = RaftMessage::MessageType::InstallSnapshotRequest;
@@ -392,7 +392,7 @@ void Raft::_HandleAppendEntriesRequest(const RaftMessage & _Message)
392392
if (!_Node.match(other_last_log_index, other_last_log_term)) {
393393
// 日志存在冲突,需要 Leader 进行回退
394394
DEBUG("log (index:%d , term:%zu) doesn't match (index:%d, term:%zu), refuse append entries",
395-
other_last_log_index, other_last_log_term, _Node.getLastIndex(), _Node.getLastTerm());
395+
other_last_log_index, other_last_log_term, other_last_log_index, _Node.getTerm(other_last_log_index));
396396
// 截断该索引之后的所有日志
397397
_Node.truncateAfter(other_last_log_index);
398398
// 告知自己的索引位置,冲突情况下不使用
@@ -843,6 +843,11 @@ bool Raft::load()
843843
_Node.setSnapshotIndex(persist_data.snapshot_index());
844844
_Node.setSnapshotTerm(persist_data.snapshot_term());
845845

846+
DEBUG("term: %zu", persist_data.term());
847+
DEBUG("voted for: node: %d", persist_data.voted_for());
848+
DEBUG("snapshot index: %d", persist_data.snapshot_index());
849+
DEBUG("snapshot term: %zu", persist_data.snapshot_term());
850+
846851
// 加载未快照日志
847852
for (const PersistLogEntry & persist_entry : persist_data.entries()) {
848853
RaftLogEntry entry(persist_entry.term(), persist_entry.command());

raftkv/raft-core/src/RaftLog.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
#include <stdexcept>
44
#include <iostream>
55

6+
#include <RaftLogger.h>
7+
68
namespace WW
79
{
810

911
RaftLog::RaftLog()
1012
: _Logs()
1113
, _Base_index(1)
12-
, _Snapshot_index(1)
14+
, _Snapshot_index(0)
1315
, _Snapshot_term(0)
1416
{
1517
}

raftkv/raft/src/RaftClerk.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ RaftClerk::RaftClerk(NodeId _Id, const std::vector<RaftPeerNet> & _Peers)
3939
_Op_server = new RaftOperationServer(&_Loop, _Peers[_Id].getIp(), _Peers[_Id].getPort(), &_Op_service);
4040

4141
// 设置 muduo 日志等级
42-
// muduo::Logger::setLogLevel(muduo::Logger::LogLevel::ERROR);
42+
muduo::Logger::setLogLevel(muduo::Logger::LogLevel::ERROR);
4343

4444
// 从 Raft 持久化恢复
4545
if (_Raft->load()) {
@@ -61,27 +61,27 @@ void RaftClerk::_InstallSnapshotFromPersist()
6161
std::string file_path = "snapshot_" + std::to_string(_Raft->getId()) + ".snapshot";
6262
std::ifstream in(file_path, std::ios::binary);
6363
if (!in.is_open()) {
64-
ERROR("open snapshot file failed");
64+
ERROR("open persist snapshot file failed");
6565
return;
6666
}
6767

6868
// 反序列化快照数据
6969
SnapshotData snapshot;
7070
if (!snapshot.ParseFromIstream(&in)) {
71-
ERROR("parse snapshot file failed");
71+
ERROR("parse persist snapshot file failed");
7272
return;
7373
}
7474

7575
// 应用快照数据到状态机
7676
for (const SnapshotEntry & entry : snapshot.kvs()) {
7777
if (!_KVStore.update(entry.key(), entry.value())) {
78-
ERROR("install snapshot failed");
78+
ERROR("install persist snapshot failed");
7979
_KVStore.clear();
8080
return;
8181
}
8282
}
8383

84-
DEBUG("install snapshot success");
84+
DEBUG("install persist snapshot success");
8585
}
8686

8787
void RaftClerk::run()

0 commit comments

Comments
 (0)