Skip to content

Commit 90fc979

Browse files
committed
feat: refactor main event loop and add cgi fds to epoll watchlist
1 parent 37c992a commit 90fc979

19 files changed

+694
-303
lines changed

Makefile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ CONFIG ?= debug
44

55
# Compiler settings
66
CXX = clang++
7-
CXXFLAGS = -std=c++98 -Wall -Wextra -Wshadow -Werror
7+
CXXFLAGS = -std=c++98 -Wall -Wextra -Werror
88
CPPFLAGS = -Isrc -Ithird_party/utest -MMD -MP
99

1010
# Build specific options
@@ -37,6 +37,8 @@ srcs = \
3737
src/core/server_defaults.hpp \
3838
src/core/signals.cpp \
3939
src/core/signals.hpp \
40+
src/core/virtual_server.cpp \
41+
src/core/virtual_server.hpp \
4042
src/handler/cgi_handler.cpp \
4143
src/handler/cgi_handler.hpp \
4244
src/handler/delete_handler.cpp \
@@ -63,15 +65,15 @@ srcs = \
6365
src/router/router.cpp \
6466
src/router/router.hpp \
6567
src/util/itoa.cpp \
66-
src/util/to_string.cpp \
67-
src/util/to_string.hpp \
6868
src/util/log_message.cpp \
6969
src/util/log_message.hpp \
7070
src/util/str_split.cpp \
7171
src/util/str_trim.cpp \
7272
src/util/string.hpp \
7373
src/util/syscall_error.cpp \
7474
src/util/syscall_error.hpp \
75+
src/util/to_string.cpp \
76+
src/util/to_string.hpp \
7577

7678
cpps = $(filter %.cpp,$(srcs))
7779
hpps = $(filter %.hpp,$(srcs))

src/core/client.cpp

Lines changed: 317 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,333 @@
11
#include "core/client.hpp"
22

3+
#include "core/virtual_server.hpp"
4+
#include "http/http_parser.hpp"
5+
#include "util/log_message.hpp"
6+
37
#include <fcntl.h>
8+
#include <sys/epoll.h>
49
#include <unistd.h>
510

6-
Client::Client(int fd)
7-
: state_(kReceivingHeaders),
8-
fd_(fd),
11+
#include <cerrno>
12+
#include <cstring>
13+
#include <iostream>
14+
15+
Client::Client(int fd, const VirtualServer& vs)
16+
: sockfd_(fd),
17+
vserv_(vs),
18+
state_(Client::kReceivingHeaders),
919
handler_(NULL)
1020
{
21+
epoll_fds_[sockfd_] = EPOLLIN | EPOLLRDHUP;
22+
pipefd_[0] = -1;
23+
pipefd_[1] = -1;
1124
}
1225

1326
Client::~Client()
1427
{
15-
if (fd_ != -1) {
16-
close(fd_);
28+
if (sockfd_ != -1) {
29+
close(sockfd_);
1730
}
1831
if (handler_) {
1932
delete handler_;
2033
}
2134
}
35+
36+
size_t Client::sendbuf_available_size() const
37+
{
38+
return WEBSERV_MAX_SENDBUF_SIZE - sendbuf_.size();
39+
}
40+
41+
void Client::want_read(int fd)
42+
{
43+
epoll_fds_[fd] |= EPOLLIN;
44+
}
45+
46+
void Client::want_write(int fd)
47+
{
48+
epoll_fds_[fd] |= EPOLLOUT;
49+
}
50+
51+
void Client::stop_read(int fd)
52+
{
53+
epoll_fds_[fd] &= ~EPOLLIN;
54+
}
55+
56+
void Client::stop_write(int fd)
57+
{
58+
epoll_fds_[fd] &= ~EPOLLOUT;
59+
}
60+
61+
void Client::on_epollin(int fd)
62+
{
63+
if (fd == sockfd_) {
64+
read_from_socket();
65+
maybe_create_request_handler();
66+
maybe_write_to_regular_file();
67+
}
68+
else if (fd == pipefd_[0]) {
69+
read_from_pipe();
70+
}
71+
refresh_interest_list();
72+
}
73+
74+
void Client::on_epollout(int fd)
75+
{
76+
if (fd == sockfd_) {
77+
maybe_read_from_regular_file();
78+
write_to_socket();
79+
}
80+
else if (fd == pipefd_[1]) {
81+
write_to_pipe();
82+
}
83+
refresh_interest_list();
84+
}
85+
86+
void Client::on_sync_complete()
87+
{
88+
if (state_ == Client::kPreparingNextRequest) {
89+
parser_.reset_for_next_request();
90+
91+
delete handler_;
92+
handler_ = NULL;
93+
94+
epoll_fds_.clear();
95+
epoll_fds_[sockfd_] = EPOLLIN | EPOLLRDHUP;
96+
pipefd_[0] = -1;
97+
pipefd_[1] = -1;
98+
99+
state_ = Client::kReceivingHeaders;
100+
}
101+
}
102+
103+
// TODO: get rid of this
104+
static void print_raw_data(const char* s, size_t n)
105+
{
106+
std::cout << "< ";
107+
108+
for (size_t i = 0; i < n; ++i) {
109+
char c = s[i];
110+
if (c == '\r') {
111+
std::cout << "\\r";
112+
}
113+
else if (c == '\n') {
114+
std::cout << "\\n\n";
115+
if (i < n - 1) {
116+
std::cout << "< ";
117+
}
118+
}
119+
else {
120+
std::cout.put(c);
121+
}
122+
}
123+
std::cout.flush();
124+
}
125+
126+
void Client::read_from_socket()
127+
{
128+
if (state_ != Client::kReceivingHeaders && state_ != Client::kProcessingRequest)
129+
return;
130+
131+
char buf[4096];
132+
133+
while (!parser_.has_error() && !parser_.is_done() && parser_.available_size() > 0) {
134+
size_t max = std::min(parser_.available_size(), sizeof(buf));
135+
136+
ssize_t n = recv(sockfd_, buf, max, 0);
137+
if (n == -1) {
138+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
139+
LOG(DEBUG) << "Client #" << sockfd_ << ": recv blocked, retrying";
140+
return;
141+
}
142+
LOG(ERROR) << "Client #" << sockfd_ << ": recv failed: " << strerror(errno);
143+
state_ = Client::kClosingConnection;
144+
return;
145+
}
146+
if (n == 0) {
147+
LOG(WARN) << "Client #" << sockfd_ << ": Connection closed by remote peer";
148+
state_ = Client::kClosingConnection;
149+
return;
150+
}
151+
152+
parser_.append_data(buf, n);
153+
print_raw_data(buf, n);
154+
}
155+
156+
if (parser_.has_error()) {
157+
LOG(ERROR) << "Client #" << sockfd_ << ": Invalid HTTP request";
158+
state_ = Client::kClosingConnection;
159+
return;
160+
}
161+
}
162+
163+
void Client::write_to_socket()
164+
{
165+
if (state_ != Client::kProcessingRequest)
166+
return;
167+
168+
if (sendbuf_.empty())
169+
return;
170+
171+
ssize_t n = send(sockfd_, sendbuf_.data(), sendbuf_.size(), 0);
172+
if (n == -1) {
173+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
174+
LOG(WARN) << "Client #" << sockfd_ << ": send blocked, retrying";
175+
return;
176+
}
177+
LOG(ERROR) << "Client #" << sockfd_ << ": send failed: " << strerror(errno);
178+
state_ = Client::kClosingConnection;
179+
return;
180+
}
181+
if (n == 0) {
182+
LOG(WARN) << "Client #" << sockfd_ << "Failed to send any bytes!";
183+
return;
184+
}
185+
186+
sendbuf_.erase(0, n);
187+
188+
if (handler_->is_done() && sendbuf_.empty()) {
189+
if (keep_alive()) {
190+
state_ = Client::kPreparingNextRequest;
191+
}
192+
else {
193+
state_ = Client::kClosingConnection;
194+
}
195+
}
196+
}
197+
198+
static void log_request_info(int fd, const HttpRequest& request)
199+
{
200+
LOG(INFO) << "Client #" << fd << ": " << request.method << " " << request.path << " "
201+
<< http_version_to_string(request.http_version);
202+
}
203+
204+
void Client::maybe_create_request_handler()
205+
{
206+
if (state_ != Client::kReceivingHeaders)
207+
return;
208+
209+
if (!parser_.did_parse_headers())
210+
return;
211+
212+
const HttpRequest& req = parser_.request();
213+
log_request_info(sockfd_, req);
214+
215+
handler_ = vserv_.router().handle_request(req);
216+
pipefd_[0] = handler_->cgi_read_fd();
217+
pipefd_[1] = handler_->cgi_write_fd();
218+
219+
if (pipefd_[0] != -1) {
220+
epoll_fds_[pipefd_[0]] = EPOLLIN;
221+
}
222+
if (pipefd_[1] != -1) {
223+
epoll_fds_[pipefd_[1]] = EPOLLOUT;
224+
}
225+
226+
state_ = Client::kProcessingRequest;
227+
}
228+
229+
void Client::maybe_write_to_regular_file()
230+
{
231+
if (state_ == Client::kProcessingRequest) {
232+
assert(handler_ != NULL);
233+
234+
if (handler_->is_regular_file()) {
235+
write_to_virtual_file();
236+
}
237+
}
238+
}
239+
240+
void Client::maybe_read_from_regular_file()
241+
{
242+
if (state_ == Client::kProcessingRequest) {
243+
assert(handler_ != NULL);
244+
245+
if (handler_->is_regular_file()) {
246+
read_from_virtual_file();
247+
}
248+
}
249+
}
250+
251+
void Client::read_from_pipe()
252+
{
253+
assert(state_ == Client::kProcessingRequest);
254+
assert(handler_ != NULL);
255+
assert(handler_->cgi_read_fd() != -1);
256+
257+
read_from_virtual_file();
258+
}
259+
260+
void Client::write_to_pipe()
261+
{
262+
assert(state_ == Client::kProcessingRequest);
263+
assert(handler_ != NULL);
264+
assert(handler_->cgi_write_fd() != -1);
265+
266+
write_to_virtual_file();
267+
}
268+
269+
void Client::read_from_virtual_file()
270+
{
271+
char buf[4096];
272+
273+
while (!handler_->is_done() && handler_->has_output() && sendbuf_available_size() > 0) {
274+
size_t max = std::min(sizeof(buf), sendbuf_available_size());
275+
size_t n = handler_->read_output(buf, max);
276+
if (n == 0) {
277+
break;
278+
}
279+
sendbuf_.append(buf, n);
280+
}
281+
}
282+
283+
void Client::write_to_virtual_file()
284+
{
285+
char buf[4096];
286+
287+
while (!handler_->is_done() && handler_->needs_input() && parser_.has_body_chunk()) {
288+
size_t n = parser_.read_next_body_chunk(buf, sizeof(buf));
289+
size_t written = handler_->write_input(buf, n);
290+
parser_.consume_body_chunk(written);
291+
if (written < n) {
292+
break;
293+
}
294+
}
295+
}
296+
297+
void Client::refresh_interest_list()
298+
{
299+
for (std::map<int, uint32_t>::iterator it = epoll_fds_.begin(); it != epoll_fds_.end(); ++it) {
300+
it->second = 0;
301+
}
302+
303+
if (state_ == Client::kClosingConnection) {
304+
return;
305+
}
306+
else if (state_ == Client::kReceivingHeaders) {
307+
want_read(sockfd_);
308+
}
309+
else if (state_ == Client::kProcessingRequest) {
310+
assert(handler_ != NULL);
311+
312+
if (handler_->needs_input()) {
313+
want_read(sockfd_);
314+
}
315+
if (handler_->has_output()) {
316+
want_write(sockfd_);
317+
}
318+
if (pipefd_[0] != -1 && handler_->has_output()) {
319+
want_read(pipefd_[0]);
320+
}
321+
if (pipefd_[1] != -1 && handler_->needs_input()) {
322+
want_write(pipefd_[1]);
323+
}
324+
}
325+
else if (state_ == Client::kPreparingNextRequest) {
326+
want_read(sockfd_);
327+
}
328+
else {
329+
assert(0 && "UNREACHABLE");
330+
}
331+
332+
epoll_fds_[sockfd_] |= EPOLLRDHUP;
333+
}

0 commit comments

Comments
 (0)