-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexample_muduo_tcp_server.cpp
More file actions
149 lines (121 loc) · 4.56 KB
/
Copy pathexample_muduo_tcp_server.cpp
File metadata and controls
149 lines (121 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#include "handle-proxy-protocol-v1-v2.hpp"
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <unordered_map>
#include <mutex>
using namespace std;
using namespace muduo;
using namespace muduo::net;
// 经过代理的用户 : key为代理ip:port value为用户真实ip:port;
// 未经过代理的用户: key 和 value一样,都是用户的真实 ip:port;
unordered_map<std::string, std::string> m_ClientIpMap;
mutex m_ClientIpLock;
// 接收到网络模块用户的数据,业务模块进行回调处理的函数
int handleMessage(const TcpConnectionPtr &session, const void* buff, std::size_t buff_size)
{
int consume = 0;
std::string ip_port = session->peerAddress().toIpPort();//session->GetRemoteIp() + ":" + std::to_string(session->GetRemotePort());
bool is_in_client_map = false; // 第一次 recv 到数据包,判断是否为代理,存入m_ClientIpMap,标识该连接已处理完第一个包
std::string real_ip_port; // 从代理协议获取用户的真实 ip port
{
lock_guard<mutex> guard(m_ClientIpLock);
is_in_client_map = (m_ClientIpMap.find(ip_port) == m_ClientIpMap.end());
}
if (is_in_client_map) // 解析第一个数据包
{
int ret = HandleProxyProtocol(buff, buff_size, real_ip_port);
// ret > 0 表示经过 HAProxy 以及 proxy protocol 的长度;
cout << "HandleProxyProtocol return:" << ret << " real_ip_port:" << real_ip_port << endl;
if (ret == -1) // 异常连接
{
cout << "Client proxy error " << ip_port << " buffSize = " << buff_size << endl;
{
lock_guard<mutex> guard(m_ClientIpLock);
m_ClientIpMap.erase(ip_port);
}
session->shutdown();
return -1;
}
else if (ret == 0) // 未经过 HAProxy 代理
{
real_ip_port = ip_port;
}
consume = ret;
{
lock_guard<mutex> guard(m_ClientIpLock);
m_ClientIpMap.insert(make_pair(ip_port, real_ip_port));
}
}
// handle client data from index of consume...
if(buff_size > consume)
{
char *msg = (char*)buff+consume;
cout << "handle user data: [" << msg << "]" << endl;
session->send(msg);
}
}
/*
基于 muduo 网络库开发服务器程序
*/
class MTcpServer
{
public:
MTcpServer(EventLoop *loop, // 事件循环
const InetAddress &listenAddr, // Listen IP PORT
const string &nameArg) // 服务器名字
:_server(loop,listenAddr, nameArg),_loop(loop)
{
// 注册用户连接的创建与断开回调
_server.setConnectionCallback(std::bind(&MTcpServer::onConnection, this, _1));
// 注册用户的读写事件回调
_server.setMessageCallback(std::bind(&MTcpServer::onMessage, this, _1, _2, _3));
// 设置服务器端的线程数量n: 1个线程处理accept n-1个线程处理连接的读写事件
_server.setThreadNum(4);
}
// 开启事件循环
void start()
{
_server.start();
}
// 处理用户连接的创建和断开
void onConnection(const TcpConnectionPtr &conn)
{
std::string ip_port = conn->peerAddress().toIpPort();
LOG_INFO << "example_proxy_protocol - " << ip_port << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if(!conn->connected())
{
{
lock_guard<mutex> guard(m_ClientIpLock);
m_ClientIpMap.erase(ip_port);
}
conn->shutdown(); // close fd
}
}
// 处理用户的读写事件
void onMessage(const TcpConnectionPtr &conn, // 连接
Buffer *buf, // 缓冲区
Timestamp time) // 接收到数据的时间信息
{
muduo::string msg(buf->retrieveAllAsString());
LOG_INFO << conn->name() << " recv " << msg.size() << " bytes, "
<< "data received at " << time.toString();
// 在这里将网络模块的数据抛给业务模块进行处理
handleMessage(conn, msg.c_str(), msg.size());
}
private:
TcpServer _server;
EventLoop *_loop;
};
int main()
{
cout << "start" << endl;
EventLoop loop;
InetAddress addr("0.0.0.0", 6000);
MTcpServer server(&loop, addr, "example_proxy_protocol");
server.start();
loop.loop();
return 0;
}