forked from krypdkat/qubicbob
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQubicServer.cpp
More file actions
203 lines (171 loc) · 6.52 KB
/
QubicServer.cpp
File metadata and controls
203 lines (171 loc) · 6.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#include <atomic>
#include <thread>
#include <vector>
#include <memory>
#include <mutex>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <fcntl.h>
#include "Logger.h"
#include "connection/connection.h"
#include "shim.h"
// Forward declaration from IOProcessor.cpp
void connReceiver(QCPtr& conn, const bool isTrustedNode, std::atomic_bool& stopFlag);
namespace {
class QubicServer {
public:
static QubicServer& instance() {
static QubicServer inst;
return inst;
}
bool start(uint16_t port = 21842) {
std::lock_guard<std::mutex> lk(m_);
if (running_) return true;
listen_fd_ = ::socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd_ < 0) {
Logger::get()->critical("QubicServer: socket() failed (errno={})", errno);
return false;
}
int yes = 1;
::setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
#ifdef SO_REUSEPORT
::setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes));
#endif
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port);
if (::bind(listen_fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0) {
Logger::get()->critical("QubicServer: bind() failed on port {} (errno={})", port, errno);
::close(listen_fd_);
listen_fd_ = -1;
return false;
}
if (::listen(listen_fd_, 128) < 0) {
Logger::get()->critical("QubicServer: listen() failed (errno={})", errno);
::close(listen_fd_);
listen_fd_ = -1;
return false;
}
running_ = true;
accept_thread_ = std::thread(&QubicServer::acceptLoop, this);
Logger::get()->info("QubicServer: listening on port {}", port);
return true;
}
void stop() {
std::lock_guard<std::mutex> lk(m_);
if (!running_) return;
running_ = false;
if (listen_fd_ >= 0) {
::shutdown(listen_fd_, SHUT_RDWR);
::close(listen_fd_);
listen_fd_ = -1;
}
if (accept_thread_.joinable()) {
accept_thread_.join();
}
// Signal all client handlers to stop and disconnect sockets to break I/O.
std::vector<std::shared_ptr<ClientCtx>> local_clients;
{
std::lock_guard<std::mutex> lk2(clients_m_);
local_clients = clients_; // copy list to operate without holding the mutex
for (auto& c : local_clients) {
c->stopFlag.store(true, std::memory_order_relaxed);
if (c->conn) {
c->conn->disconnect();
}
if (c->fd >= 0) {
::shutdown(c->fd, SHUT_RDWR);
::close(c->fd);
c->fd = -1;
}
}
}
// Join all client threads without holding clients_m_ to avoid deadlock
for (auto& c : local_clients) {
if (c->th.joinable()) c->th.join();
}
// Now clear the shared list
{
std::lock_guard<std::mutex> lk2(clients_m_);
clients_.clear();
}
Logger::get()->info("QubicServer: stopped");
}
private:
struct ClientCtx {
std::atomic_bool stopFlag{false};
QCPtr conn;
std::thread th;
int fd{-1};
};
QubicServer() = default;
~QubicServer() { stop(); }
void acceptLoop() {
while (running_) {
sockaddr_in cli{};
socklen_t len = sizeof(cli);
int cfd = ::accept(listen_fd_, reinterpret_cast<sockaddr*>(&cli), &len);
if (cfd < 0) {
if (!running_) break;
// EAGAIN/EINTR acceptable during shutdown or transient
continue;
}
// Basic socket tuning
int one = 1;
::setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
#ifdef SO_KEEPALIVE
::setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#endif
auto ctx = std::make_shared<ClientCtx>();
ctx->fd = cfd;
// Wrap the accepted socket into QCPtr (NON-reconnectable as per connection.h)
ctx->conn = make_qc_by_socket(cfd);
{
std::lock_guard<std::mutex> lk(clients_m_);
clients_.push_back(ctx);
}
// Non-trusted connections
const bool isTrustedNode = false;
// Launch per-connection receiver thread
ctx->th = std::thread([this, ctx, isTrustedNode]() {
try {
ctx->conn->doHandshake();
connReceiver(ctx->conn, isTrustedNode, ctx->stopFlag);
} catch (...) {
Logger::get()->warn("QubicServer: connReceiver crashed for a client");
}
// Cleanup when receiver exits
if (ctx->conn) ctx->conn->disconnect();
if (ctx->fd >= 0) {
::shutdown(ctx->fd, SHUT_RDWR);
::close(ctx->fd);
ctx->fd = -1;
}
ctx->conn.reset();
// Note: Do NOT detach or modify clients_ here to avoid races/deadlocks with stop().
// The stop() method is responsible for joining threads and clearing the list.
});
}
}
private:
std::mutex m_;
std::atomic_bool running_{false};
int listen_fd_{-1};
std::thread accept_thread_;
std::mutex clients_m_;
std::vector<std::shared_ptr<ClientCtx>> clients_;
};
} // namespace
// Public helpers to control the server
bool StartQubicServer(uint16_t port = 21842) {
return QubicServer::instance().start(port);
}
void StopQubicServer() {
QubicServer::instance().stop();
Logger::get()->info("Stop qubic server");
}