Skip to content

Commit d34aba6

Browse files
background thread v1
v3
1 parent 809e22f commit d34aba6

3 files changed

Lines changed: 267 additions & 0 deletions

File tree

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright (c) 2026 VillageSQL Contributors
2+
#
3+
# This program is free software; you can redistribute it and/or modify
4+
# it under the terms of the GNU General Public License, version 2.0,
5+
# as published by the Free Software Foundation.
6+
#
7+
# This program is designed to work with certain software (including
8+
# but not limited to OpenSSL) that is licensed under separate terms,
9+
# as designated in a particular file or component or in included license
10+
# documentation. The authors of MySQL hereby grant you an additional
11+
# permission to link the program and your derivative works with the
12+
# separately licensed software that they have either included with
13+
# the program or referenced in the documentation.
14+
#
15+
# This program is distributed in the hope that it will be useful,
16+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
# GNU General Public License, version 2.0, for more details.
19+
#
20+
# You should have received a copy of the GNU General Public License
21+
# along with this program; if not, write to the Free Software
22+
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23+
24+
# Quick start (standalone):
25+
# mkdir build && cd build
26+
# cmake .. -DCMAKE_PREFIX_PATH=/path/to/villagesql-extension-sdk
27+
# make
28+
# make install DESTDIR=/path/to/villagesql/lib/veb
29+
30+
cmake_minimum_required(VERSION 3.16)
31+
project(vsql_http_monitor_extension)
32+
33+
set(CMAKE_CXX_STANDARD 17)
34+
set(CMAKE_CXX_STANDARD_REQUIRED ON)
35+
36+
set(EXTENSION_NAME "vsql_http_monitor")
37+
38+
find_package(VillageSQLExtensionFramework REQUIRED)
39+
40+
include_directories(${VillageSQLExtensionFramework_INCLUDE_DIR})
41+
42+
add_library(extension SHARED
43+
src/extension.cc
44+
)
45+
46+
target_compile_options(extension PRIVATE -fPIC)
47+
48+
VEF_CREATE_VEB(
49+
NAME ${EXTENSION_NAME}
50+
LIBRARY_TARGET extension
51+
MANIFEST ${CMAKE_CURRENT_SOURCE_DIR}/manifest.json
52+
)
53+
54+
install(FILES ${VEB_OUTPUT} DESTINATION ".")
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"name": "vsql_http_monitor",
3+
"version": "0.0.1",
4+
"description": "Minimal HTTP monitoring endpoint example using on_load/on_unload and poll_fd",
5+
"author": "VillageSQL Community",
6+
"license": "GPL-2.0"
7+
}
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// Copyright (c) 2026 VillageSQL Contributors
2+
//
3+
// This program is free software; you can redistribute it and/or modify
4+
// it under the terms of the GNU General Public License, version 2.0,
5+
// as published by the Free Software Foundation.
6+
//
7+
// This program is distributed in the hope that it will be useful,
8+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
9+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10+
// GNU General Public License, version 2.0, for more details.
11+
//
12+
// You should have received a copy of the GNU General Public License
13+
// along with this program; if not, write to the Free Software
14+
// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
15+
16+
// vsql-http-monitor: minimal HTTP monitoring endpoint example.
17+
//
18+
// Demonstrates:
19+
// - preview_thread_worker with poll_fd for instant wakeup on connections
20+
// - sys_var for configuring port and bind address
21+
// - Socket lifecycle managed via VEF_WAKEUP_ENABLE / VEF_WAKEUP_DISABLE
22+
// - Simple HTTP response (placeholder body; extend in on_serve())
23+
//
24+
// Usage:
25+
// INSTALL EXTENSION vsql_http_monitor SONAME 'vsql_http_monitor.veb';
26+
// SET GLOBAL vsql_http_monitor.port = 9200;
27+
// SET GLOBAL vsql_http_monitor.bind_address = '0.0.0.0';
28+
// SET GLOBAL vsql_http_monitor.http.enabled = ON;
29+
//
30+
// Then: curl http://127.0.0.1:9200/
31+
32+
#include <arpa/inet.h>
33+
#include <netinet/in.h>
34+
#include <sys/socket.h>
35+
#include <unistd.h>
36+
37+
#include <atomic>
38+
#include <cstring>
39+
#include <string>
40+
41+
#include <villagesql/preview/thread_worker.h>
42+
#include <villagesql/vsql.h>
43+
44+
using namespace vsql;
45+
using vsql::preview::preview_thread_worker;
46+
47+
// ---- Configuration sys vars ------------------------------------------------
48+
49+
static long long g_port = 9200;
50+
static char *g_bind_address = nullptr;
51+
52+
// ---- Listener state --------------------------------------------------------
53+
54+
// The listening socket fd. -1 when the listener is not running.
55+
// Exposed to the framework via .poll_fd(&g_listen_fd) so the worker thread
56+
// wakes immediately when a new connection arrives.
57+
static int g_listen_fd = -1;
58+
59+
static std::atomic<long long> g_requests_total{0};
60+
61+
static auto g_thread_worker = vsql::preview::thread_worker::make_capability();
62+
63+
// ---- Socket helpers --------------------------------------------------------
64+
65+
static int setup_listen_socket(const char *bind_addr, int port) {
66+
if (bind_addr == nullptr || *bind_addr == '\0') return -1;
67+
68+
int fd = socket(AF_INET, SOCK_STREAM, 0);
69+
if (fd < 0) return -1;
70+
71+
int reuse = 1;
72+
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
73+
74+
struct sockaddr_in addr;
75+
memset(&addr, 0, sizeof(addr));
76+
addr.sin_family = AF_INET;
77+
addr.sin_port = htons(static_cast<uint16_t>(port));
78+
if (inet_pton(AF_INET, bind_addr, &addr.sin_addr) != 1) {
79+
close(fd);
80+
return -1;
81+
}
82+
83+
if (bind(fd, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr)) < 0) {
84+
close(fd);
85+
return -1;
86+
}
87+
88+
if (listen(fd, 5) < 0) {
89+
close(fd);
90+
return -1;
91+
}
92+
93+
return fd;
94+
}
95+
96+
static void write_full(int fd, const char *buf, size_t len) {
97+
size_t written = 0;
98+
while (written < len) {
99+
ssize_t n = send(fd, buf + written, len - written, MSG_NOSIGNAL);
100+
if (n < 0) {
101+
if (errno == EINTR) continue;
102+
break;
103+
}
104+
if (n == 0) break;
105+
written += static_cast<size_t>(n);
106+
}
107+
}
108+
109+
static ssize_t read_http_request(int fd, char *buf, size_t max_len) {
110+
size_t total = 0;
111+
while (total < max_len - 1) {
112+
ssize_t n = recv(fd, buf + total, max_len - 1 - total, 0);
113+
if (n < 0) {
114+
if (errno == EINTR) continue;
115+
return -1;
116+
}
117+
if (n == 0) break;
118+
total += static_cast<size_t>(n);
119+
buf[total] = '\0';
120+
if (strstr(buf, "\r\n\r\n") != nullptr) break;
121+
}
122+
buf[total] = '\0';
123+
return static_cast<ssize_t>(total);
124+
}
125+
126+
static std::string on_serve() {
127+
return "requests_total " + std::to_string(g_requests_total.load()) + "\n";
128+
}
129+
130+
static void handle_client(int client_fd) {
131+
struct timeval tv;
132+
tv.tv_sec = 5;
133+
tv.tv_usec = 0;
134+
setsockopt(client_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
135+
setsockopt(client_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
136+
137+
char req_buf[4096];
138+
read_http_request(client_fd, req_buf, sizeof(req_buf));
139+
140+
std::string body = on_serve();
141+
std::string response =
142+
"HTTP/1.1 200 OK\r\n"
143+
"Content-Type: text/plain\r\n"
144+
"Content-Length: " +
145+
std::to_string(body.size()) +
146+
"\r\n"
147+
"Connection: close\r\n"
148+
"\r\n" +
149+
body;
150+
151+
write_full(client_fd, response.c_str(), response.size());
152+
close(client_fd);
153+
154+
g_requests_total.fetch_add(1, std::memory_order_relaxed);
155+
}
156+
157+
// ---- Worker thread ---------------------------------------------------------
158+
159+
static vef_next_wakeup_t serve_tick(vef_wakeup_reason_t reason,
160+
struct vef_thread_handle_t *, void *) {
161+
if (reason == VEF_WAKEUP_ENABLE) {
162+
const char *bind_addr =
163+
(g_bind_address != nullptr && *g_bind_address != '\0') ? g_bind_address
164+
: "127.0.0.1";
165+
g_listen_fd = setup_listen_socket(bind_addr, static_cast<int>(g_port));
166+
return {0, g_listen_fd};
167+
}
168+
169+
if (reason == VEF_WAKEUP_DISABLE) {
170+
if (g_listen_fd >= 0) {
171+
close(g_listen_fd);
172+
g_listen_fd = -1;
173+
}
174+
return {};
175+
}
176+
177+
if (g_listen_fd < 0) return {};
178+
179+
struct sockaddr_in client_addr;
180+
socklen_t addr_len = sizeof(client_addr);
181+
int client_fd =
182+
accept(g_listen_fd, reinterpret_cast<struct sockaddr *>(&client_addr),
183+
&addr_len);
184+
if (client_fd >= 0) handle_client(client_fd);
185+
return {};
186+
}
187+
188+
// ---- VDF: request count ----------------------------------------------------
189+
190+
void requests_total(IntResult out) {
191+
out.set(g_requests_total.load(std::memory_order_relaxed));
192+
}
193+
194+
// ---- Extension registration ------------------------------------------------
195+
196+
VEF_GENERATE_ENTRY_POINTS(
197+
make_extension()
198+
.sys_var(make_sys_var_int("port", "TCP port for the HTTP endpoint",
199+
&g_port, 9200, 1024, 65535))
200+
.sys_var(make_sys_var_str("bind_address",
201+
"IP address to bind the HTTP endpoint",
202+
&g_bind_address, "127.0.0.1"))
203+
.status_var(make_status_var_int("requests_total", &g_requests_total))
204+
.func(make_func<&requests_total>("requests_total").returns(INT).build())
205+
.with<preview_thread_worker<g_thread_worker>>(
206+
make_thread_worker<&serve_tick>("http")))

0 commit comments

Comments
 (0)