为emqx-go项目的集群功能部署3个实例的集群,并进行e2e测试,发现并修复bugs。
测试结果: ✅ 成功! 跨节点消息路由已验证工作正常
- ✅ Docker Compose配置 (
docker-compose-cluster.yaml): 创建了3节点集群的Docker配置 - ✅ 本地脚本部署 (
scripts/start-cluster.sh,scripts/stop-cluster.sh): 创建了本地3进程集群启动/停止脚本 - ✅ 环境变量支持 (
cmd/emqx-go/main.go): 添加了从环境变量读取配置的支持- NODE_ID, MQTT_PORT, GRPC_PORT, METRICS_PORT
- PEER_NODES: 支持静态peer节点配置
- ✅ 连接peer函数 (
cmd/emqx-go/main.go:connectToPeers): 实现了手动连接peer节点的功能
创建了Go语言的集群测试工具 (cmd/cluster-test/main.go):
测试流程:
- 连接subscriber到node2 (port 1884)
- 在node2上订阅主题 'cluster/test'
- 等待路由信息传播到集群
- 连接publisher到node1 (port 1883)
- 从node1发布消息到 'cluster/test'
- 验证node2的subscriber是否收到消息
测试结果:
✓✓✓ SUCCESS! Cross-node messaging works!
✓ Message 'Hello from Node1 to Node2!' successfully routed from node1 to node2
位置: cmd/emqx-go/main.go:124-125
问题描述:
节点在创建cluster manager时,使用nodeID作为地址的主机名部分,例如"node2:8083"。在本地多进程部署场景下,这些主机名无法解析,导致节点间无法建立连接。
日志证据:
Received Join request from node node2 at node2:8083
Cluster Manager: Attempting to connect to peer node2 at node2:8083
Failed to connect to peer node2: dial tcp: lookup node2: no such host
根本原因:
// 错误代码 (修复前):
clusterMgr := cluster.NewManager(nodeID, fmt.Sprintf("%s%s", nodeID, cfg.Broker.GRPCPort), ...)
// 当nodeID="node2", GRPCPort=":8083"时
// 生成的地址: "node2:8083" - 在本地环境无法解析修复方案: 对于本地部署,使用"localhost"作为地址主机名:
// 修复后:
nodeAddress := fmt.Sprintf("localhost%s", cfg.Broker.GRPCPort)
clusterMgr := cluster.NewManager(nodeID, nodeAddress, ...)
// 生成的地址: "localhost:8083" - 可以正确解析影响: 此bug导致所有节点间连接失败,是集群功能完全无法工作的根本原因之一。
注意: 在生产环境或Kubernetes部署中,应该使用实际的pod hostname或service name。
位置: pkg/cluster/server.go:55-57
问题描述: 当节点A收到节点B的Join请求后,节点A尝试建立到节点B的反向连接以实现双向通信。但由于使用了gRPC请求的context,该context在RPC调用返回后立即被取消,导致反向连接建立失败。
日志证据:
2025/10/10 21:06:14 Received Join request from node node2 at localhost:8083
2025/10/10 21:06:14 Cluster Manager: Attempting to connect to peer node2 at localhost:8083
2025/10/10 21:06:14 Failed to connect to peer node2: context canceled
根本原因:
// 错误代码 (修复前):
func (s *Server) Join(ctx context.Context, req *clusterpb.JoinRequest) (*clusterpb.JoinResponse, error) {
log.Printf("Received Join request from node %s at %s", req.Node.NodeId, req.Node.Address)
// ❌ 使用请求的ctx - 当RPC返回时此context会被取消
go s.manager.AddPeer(ctx, req.Node.NodeId, req.Node.Address)
return &clusterpb.JoinResponse{...}, nil
}gRPC的请求context是短生命周期的,仅在RPC调用期间有效。当Join RPC返回后,context被取消,导致正在进行的AddPeer操作也被取消。
修复方案:
使用context.Background()代替请求context,使反向连接可以独立于RPC调用周期:
// 修复后:
func (s *Server) Join(ctx context.Context, req *clusterpb.JoinRequest) (*clusterpb.JoinResponse, error) {
log.Printf("Received Join request from node %s at %s", req.Node.NodeId, req.Node.Address)
// ✅ 使用background context - 连接建立独立于RPC生命周期
go s.manager.AddPeer(context.Background(), req.Node.NodeId, req.Node.Address)
return &clusterpb.JoinResponse{
Success: true,
ClusterId: "cluster-1",
Message: "Welcome to the cluster!",
ClusterNodes: []*clusterpb.NodeInfo{{NodeId: s.NodeID}},
}, nil
}影响: 此bug导致双向peer连接无法建立。即使节点B成功连接到节点A,节点A也无法向节点B转发消息,因为A的peers map中没有B的连接。
这两个bug共同导致了跨节点消息路由的完全失败:
- Bug #1 导致节点无法建立任何连接(地址无法解析)
- 修复Bug #1后,Bug #2 导致反向连接失败(context过早取消)
- 即使节点B→A的连接成功,由于A→B的反向连接失败,A无法转发消息给B
消息路由失败的完整流程:
1. Node2发起Join请求到Node1
2. Node1收到Join请求
3. Node1尝试建立反向连接到Node2 (Bug #2导致失败)
4. Node2的subscriber订阅topic
5. Node2广播路由更新到Node1
6. Node1记录: Node2有订阅者
7. Node1的publisher发布消息
8. Node1尝试转发消息到Node2
9. ❌ 失败: peers map中没有Node2的连接
10. 日志: "Cannot forward publish: no peer client for node node2"
修复后的成功流程:
1. Node2发起Join请求到Node1
2. Node1收到Join请求
3. ✅ Node1成功建立反向连接到Node2
4. ✅ Node1的peers map中添加Node2
5. Node2的subscriber订阅topic
6. Node2广播路由更新到Node1
7. Node1记录: Node2有订阅者
8. Node1的publisher发布消息
9. ✅ Node1成功转发消息到Node2
10. ✅ Node2的subscriber收到消息
只有同时修复这两个bug,集群才能正常工作。
docker-compose-cluster.yaml- Docker集群配置scripts/start-cluster.sh- 本地集群启动脚本scripts/stop-cluster.sh- 本地集群停止脚本cmd/cluster-test/main.go- Go语言集群测试工具scripts/test-cluster.py- Python测试脚本(未使用,paho-mqtt问题)scripts/simple-cluster-test.py- 简化Python测试(未使用)CLUSTER_E2E_TEST_REPORT.md- 本报告
-
cmd/emqx-go/main.go- 添加环境变量配置支持 (行95-107)
- 修复节点地址使用localhost (行124-130)
- 添加
connectToPeers函数 (行228-274)
-
pkg/cluster/server.go- 修复Join handler使用background context (行55-57)
- 添加ClusterId到响应 (行61)
节点连接状态 (来自日志):
Node1:
- 收到node2 Join请求,成功建立连接 ✓
- 收到node3 Join请求,成功建立连接 ✓
Node2:
- 连接到node1成功 ✓
- 收到node1反向连接 ✓
- 收到node3 Join请求,成功建立连接 ✓
Node3:
- 连接到node1成功 ✓
- 连接到node2成功 ✓
- 收到node1和node2的反向连接 ✓
路由同步验证 (来自日志):
Node2 订阅 'cluster/test':
2025/10/10 21:07:25 [INFO] Client test-subscriber successfully subscribed to 'cluster/test' with QoS 1
2025/10/10 21:07:25 [DEBUG] Broadcasting 1 new routes to cluster peers
Node1 收到路由更新:
2025/10/10 21:07:25 Received BatchUpdateRoutes request from node node2
2025/10/10 21:07:25 Adding remote route: Topic=cluster/test, Node=node2
消息转发验证 (来自日志):
Node1 转发消息:
2025/10/10 21:07:27 Forwarding message for topic 'cluster/test' to remote node node2
Node2 接收并路由:
2025/10/10 21:07:27 Received ForwardPublish request for topic 'cluster/test' from node node1
2025/10/10 21:07:27 Routing message on topic 'cluster/test' to 1 local subscribers
============================================================
EMQX-Go Cluster Cross-Node Messaging Test
============================================================
1. Creating subscriber connecting to node2 (port 1884)...
✓ Subscriber connected to node2
2. Subscribing to 'cluster/test'...
✓ Subscribed successfully
3. Waiting for route propagation (2 seconds)...
4. Creating publisher connecting to node1 (port 1883)...
✓ Publisher connected to node1
5. Publishing message from node1 to 'cluster/test'...
✓ Message published
6. Waiting for cross-node delivery (3 seconds)...
✓ RECEIVED: 'Hello from Node1 to Node2!' on topic 'cluster/test'
============================================================
Test Results:
============================================================
✓✓✓ SUCCESS! Cross-node messaging works!
✓ Message 'Hello from Node1 to Node2!' successfully routed from node1 to node2
============================================================
-
连接管理优化:
- 实现连接池和自动重连机制
- 添加心跳检测和健康检查
- 增加连接状态监控和metrics
-
架构改进:
- 考虑使用双向gRPC流来复用连接
- 实现更智能的节点发现机制
- 添加集群状态管理API
- 支持动态节点加入/离开
-
测试增强:
- 添加节点故障恢复测试
- 添加网络分区测试
- 添加大规模消息吞吐测试
- 添加多topic并发测试
-
生产环境配置:
- 添加配置选项区分local/k8s部署
- 实现基于环境变量的自动地址发现
- 添加TLS支持用于节点间通信
通过本次e2e测试,成功发现并修复了2个关键bug,使得集群的核心功能——跨节点消息路由得以正常工作。
主要成果:
- ✅ 3节点集群成功部署和运行
- ✅ 节点间连接和双向通信正常
- ✅ 路由表同步机制工作正常
- ✅ 跨节点消息转发功能验证通过
- ✅ 完整的测试工具和脚本
集群基本功能现在可以正常工作,为后续的性能优化和功能增强奠定了坚实基础。