Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/datum_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,20 @@ enum MHD_Result datum_api_answer(void *cls, struct MHD_Connection *connection, c
return ret;
}

static struct MHD_Daemon *datum_api_try_start(unsigned int flags) {
flags |= MHD_USE_AUTO; // event loop API
flags |= MHD_USE_INTERNAL_POLLING_THREAD;
return MHD_start_daemon(
flags,
datum_config.api_listen_port,
NULL, NULL, // accept policy filter
&datum_api_answer, NULL, // default URI handler
MHD_OPTION_CONNECTION_LIMIT, 128,
MHD_OPTION_NOTIFY_COMPLETED, datum_api_request_completed, NULL,
MHD_OPTION_LISTENING_ADDRESS_REUSE, (unsigned int)1,
MHD_OPTION_END);
}

void *datum_api_thread(void *ptr) {
struct MHD_Daemon *daemon;

Expand All @@ -1136,10 +1150,8 @@ void *datum_api_thread(void *ptr) {
return NULL;
}

daemon = MHD_start_daemon(MHD_USE_AUTO | MHD_USE_INTERNAL_POLLING_THREAD, datum_config.api_listen_port, NULL, NULL, &datum_api_answer, NULL,
MHD_OPTION_CONNECTION_LIMIT, 128,
MHD_OPTION_NOTIFY_COMPLETED, datum_api_request_completed, NULL,
MHD_OPTION_END);
daemon = datum_api_try_start(MHD_USE_DUAL_STACK);
if (!daemon) daemon = datum_api_try_start(0);

if (!daemon) {
DLOG_FATAL("Unable to start daemon for API");
Expand Down
97 changes: 60 additions & 37 deletions src/datum_sockets.c
Original file line number Diff line number Diff line change
Expand Up @@ -537,18 +537,39 @@ int assign_to_thread(T_DATUM_SOCKET_APP *app, int fd) {
return 1;
}

const char *datum_sockets_setup_listen_sock(const int listen_sock, const struct sockaddr * const sa, const size_t sa_len) {
if (-1 == listen_sock) {
return "Could not create listening socket";
}

datum_socket_setoptions(listen_sock);

static const int reuse = 1;
if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) {
return "setsockopt(SO_REUSEADDR) failed";
}

if (bind(listen_sock, sa, sa_len) < 0) {
return "bind failed";
}

if (listen(listen_sock, 10) < 0) {
return "listen failed";
}

return NULL;
}

void *datum_gateway_listener_thread(void *arg) {
struct sockaddr_in serveraddr;
int i, ret;
int reuse = 1;
bool rejecting_now = false;
uint64_t last_reject_msg_tsms = 0, curtime_tsms = 0;
uint64_t reject_count = 0;

T_DATUM_SOCKET_APP *app = (T_DATUM_SOCKET_APP *)arg;

struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
int listen_socks[2], conn_sock, nfds, epollfd;

if (!app) {
DLOG_FATAL("Called without application data structure. :(");
Expand Down Expand Up @@ -577,37 +598,36 @@ void *datum_gateway_listener_thread(void *arg) {

app->datum_active_threads = 0;

listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (!listen_sock) {
DLOG_FATAL("Could get socket: %s", strerror(errno));
panic_from_thread(__LINE__);
return NULL;
}

datum_socket_setoptions(listen_sock);
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(app->listen_port);

// TODO: Add option to bind to specific IP per configuration!
serveraddr.sin_addr.s_addr = INADDR_ANY;

if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0) {
DLOG_FATAL("setsockopt(SO_REUSEADDR) failed: %s", strerror(errno));
panic_from_thread(__LINE__);
return NULL;
const struct sockaddr_in6 anyaddr6 = {
.sin6_family = AF_INET6,
.sin6_port = htons(app->listen_port),
.sin6_addr = IN6ADDR_ANY_INIT,
};
listen_socks[0] = socket(AF_INET6, SOCK_STREAM, 0);
const char * const errstr6 = datum_sockets_setup_listen_sock(listen_socks[0], (const struct sockaddr *)&anyaddr6, sizeof(anyaddr6));
const int errno6 = errno;
if (errstr6 && listen_socks[0] != -1) {
close(listen_socks[0]);
listen_socks[0] = -1;
}

if(bind(listen_sock, (struct sockaddr*)&serveraddr, sizeof(serveraddr)) < 0) {
DLOG_FATAL("bind failed: %s", strerror(errno));
const struct sockaddr_in anyaddr4 = {
.sin_family = AF_INET,
.sin_port = htons(app->listen_port),
.sin_addr.s_addr = INADDR_ANY,
};
listen_socks[1] = socket(AF_INET, SOCK_STREAM, 0);
const char *errstr = datum_sockets_setup_listen_sock(listen_socks[1], (const struct sockaddr *)&anyaddr4, sizeof(anyaddr4));
if (errstr && errstr6) {
const int errno4 = errno;
DLOG_FATAL("%s (IPv6): %s", errstr6, strerror(errno6));
DLOG_FATAL("%s (IPv4): %s", errstr, strerror(errno4));
panic_from_thread(__LINE__);
return NULL;
}

if (listen(listen_sock, 10) < 0) {
DLOG_FATAL("listen failed: %s", strerror(errno));
panic_from_thread(__LINE__);
return NULL;
if (errstr && listen_socks[1] != -1) {
close(listen_socks[1]);
listen_socks[1] = -1;
}

epollfd = epoll_create1(0);
Expand All @@ -617,12 +637,15 @@ void *datum_gateway_listener_thread(void *arg) {
return NULL;
}

ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev)<0) {
DLOG_FATAL("epoll_ctl failed: %s", strerror(errno));
panic_from_thread(__LINE__);
return NULL;
for (i = 0; i < 2; ++i) {
if (listen_socks[i] == -1) continue;
ev.events = EPOLLIN;
ev.data.fd = listen_socks[i];
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
DLOG_FATAL("epoll_ctl failed: %s", strerror(errno));
panic_from_thread(__LINE__);
return NULL;
}
}

DLOG_INFO("DATUM Socket listener thread active for '%s'", app->name);
Expand All @@ -641,8 +664,8 @@ void *datum_gateway_listener_thread(void *arg) {
}
}
for (int n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock, NULL, NULL);
if (events[n].data.fd == listen_socks[0] || events[n].data.fd == listen_socks[1]) {
conn_sock = accept(events[n].data.fd, NULL, NULL);
if (conn_sock < 0) {
DLOG_ERROR("accept failed: %s", strerror(errno));
continue;
Expand Down