Skip to content

Commit a7e6141

Browse files
committed
feat: async phase handler
Add support for asynchronous phase handlers: - Introduce async phase handler framework - Add async request handling capabilities - Implement async/await support for nginx phases - Enable non-blocking request processing
1 parent a61402f commit a7e6141

5 files changed

Lines changed: 245 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 32 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ targets = []
4848
[dependencies]
4949
allocator-api2 = { version = "0.4.0", default-features = false, features = ["fresh-rust"] }
5050
async-task = { version = "4.7.1", optional = true }
51+
futures-util = { version = "0.3", default-features = false }
5152
lock_api = "0.4.13"
5253
nginx-sys = { path = "nginx-sys", version = "0.5.0"}
5354
pin-project-lite = { version = "0.2.16", optional = true }

src/http/async_request.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
use core::fmt::Display;
2+
use core::future::Future;
3+
use core::pin::Pin;
4+
use core::task::{Context, Poll};
5+
6+
use crate::http::{HttpModule, HttpPhase, HttpRequestHandler, IntoHandlerStatus, Request};
7+
use crate::{async_ as ngx_async, ngx_log_debug_http};
8+
9+
use crate::ffi::{ngx_http_request_t, ngx_int_t, ngx_post_event, ngx_posted_events};
10+
11+
use futures_util::FutureExt;
12+
use pin_project_lite::*;
13+
14+
/// An asynchronous HTTP request handler trait.
15+
pub trait AsyncHandler {
16+
/// The phase in which the handler will be executed.
17+
const PHASE: HttpPhase;
18+
/// The associated HTTP module type.
19+
type Module: HttpModule;
20+
/// The return type of the asynchronous worker function.
21+
type Output: IntoHandlerStatus;
22+
/// The asynchronous worker function to be implemented.
23+
fn worker(request: &mut Request) -> impl Future<Output = Self::Output>;
24+
}
25+
26+
const fn async_phase(phase: HttpPhase) -> HttpPhase {
27+
assert!(
28+
!matches!(phase, HttpPhase::Content),
29+
"Content phase is not supported"
30+
);
31+
phase
32+
}
33+
34+
/// An error type for asynchronous handler operations.
35+
#[derive(Debug)]
36+
pub enum AsyncHandlerError {
37+
/// Indicates that the context creation failed.
38+
ContextCreationFailed,
39+
/// Indicates that there is no async launcher available.
40+
NoAsyncLauncher,
41+
/// Indicates that the context deletion failed.
42+
ContextDeletionFailed,
43+
}
44+
45+
impl Display for AsyncHandlerError {
46+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
47+
match self {
48+
AsyncHandlerError::ContextCreationFailed => {
49+
write!(f, "AsyncHandler: Context creation failed")
50+
}
51+
AsyncHandlerError::NoAsyncLauncher => {
52+
write!(f, "AsyncHandler: No async launcher available")
53+
}
54+
AsyncHandlerError::ContextDeletionFailed => {
55+
write!(f, "AsyncHandler: Context deletion failed")
56+
}
57+
}
58+
}
59+
}
60+
61+
#[derive(Default)]
62+
struct AsyncRequestContext {
63+
launcher: Option<async_task::Task<ngx_int_t>>,
64+
}
65+
66+
impl<AH> HttpRequestHandler for AH
67+
where
68+
AH: AsyncHandler + 'static,
69+
{
70+
const PHASE: HttpPhase = async_phase(AH::PHASE);
71+
type Output = Result<ngx_int_t, AsyncHandlerError>;
72+
73+
fn handler(request: &mut Request) -> Self::Output {
74+
let mut pool = request.pool();
75+
76+
let ctx = pool
77+
.get_or_add_unique(|| {
78+
let request_ptr: *mut ngx_http_request_t = request.as_mut() as *mut _ as _;
79+
AsyncRequestContext {
80+
launcher: Some(ngx_async::spawn(handler_future::<AH>(request_ptr))),
81+
}
82+
})
83+
.ok_or(AsyncHandlerError::ContextCreationFailed)?;
84+
85+
match &ctx.launcher {
86+
None => Err(AsyncHandlerError::NoAsyncLauncher),
87+
Some(launcher) if launcher.is_finished() => {
88+
// task is finished, so both expect() should not panic
89+
let task = ctx
90+
.launcher
91+
.take()
92+
.expect("AsyncHandler: Task should be present");
93+
let rc = task
94+
.now_or_never()
95+
.expect("AsyncHandler: Task should be ready");
96+
ngx_log_debug_http!(request, "AsyncHandler: task joined; rc = {}", rc);
97+
pool.remove_unique::<AsyncRequestContext>()
98+
.ok_or(AsyncHandlerError::ContextDeletionFailed)?;
99+
Ok(rc)
100+
}
101+
Some(_) => {
102+
ngx_log_debug_http!(request, "AsyncHandler: running");
103+
Ok(nginx_sys::NGX_AGAIN as _)
104+
}
105+
}
106+
}
107+
}
108+
109+
pin_project! {
110+
struct HandlerFuture<Fut>
111+
where
112+
Fut: Future<Output = ngx_int_t>,
113+
{
114+
#[pin]
115+
worker_fut: Fut,
116+
request: *const ngx_http_request_t,
117+
}
118+
}
119+
120+
fn handler_future<AH>(request: *mut ngx_http_request_t) -> impl Future<Output = ngx_int_t>
121+
where
122+
AH: AsyncHandler,
123+
{
124+
let fut = async move {
125+
let request = unsafe { Request::from_ngx_http_request(request) };
126+
AH::worker(request).await.into_handler_status(request)
127+
};
128+
129+
HandlerFuture::<_> {
130+
worker_fut: fut,
131+
request,
132+
}
133+
}
134+
135+
impl<Fut> Future for HandlerFuture<Fut>
136+
where
137+
Fut: Future<Output = ngx_int_t>,
138+
{
139+
type Output = ngx_int_t;
140+
141+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
142+
let this = self.project();
143+
let request = unsafe { Request::from_const_ngx_http_request(*this.request) };
144+
145+
match this.worker_fut.poll(cx) {
146+
Poll::Pending => {
147+
ngx_log_debug_http!(request, "HandlerFuture: pending");
148+
Poll::Pending
149+
}
150+
Poll::Ready(rc) => {
151+
unsafe {
152+
ngx_post_event(
153+
(*request.connection()).write,
154+
core::ptr::addr_of_mut!(ngx_posted_events),
155+
)
156+
};
157+
ngx_log_debug_http!(request, "HandlerFuture: ready");
158+
Poll::Ready(rc)
159+
}
160+
}
161+
}
162+
}

src/http/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
#[cfg(feature = "async")]
2+
mod async_request;
3+
14
mod conf;
25
mod module;
36
mod request;
47
mod request_context;
58
mod status;
69
mod upstream;
710

11+
#[cfg(feature = "async")]
12+
pub use async_request::*;
13+
814
pub use conf::*;
915
pub use module::*;
1016
pub use request::*;

src/http/request.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::core::*;
99
use crate::ffi::*;
1010
use crate::http::HttpPhase;
1111
use crate::http::status::*;
12+
use crate::ngx_log_error;
1213

1314
/// Define a static request handler.
1415
///
@@ -85,7 +86,9 @@ macro_rules! http_variable_get {
8586
/// in the `into_handler_status` method.
8687
///
8788
/// There are predefined implementations for `ngx_int_t`, [`Status`], [`HTTPStatus`],
88-
/// [`Option`] with value type implementing [`IntoHandlerStatus`].
89+
/// [`Option`] with value type implementing [`IntoHandlerStatus`],
90+
/// and [`Result`] with value type implementing [`IntoHandlerStatus`]
91+
/// and error type implementing [`core::fmt::Display`].
8992
pub trait IntoHandlerStatus
9093
where
9194
Self: Sized,
@@ -105,6 +108,23 @@ where
105108
}
106109
}
107110

111+
impl<T, E> IntoHandlerStatus for Result<T, E>
112+
where
113+
T: IntoHandlerStatus,
114+
E: core::fmt::Display,
115+
{
116+
#[inline]
117+
fn into_handler_status(self, r: &Request) -> ngx_int_t {
118+
match self {
119+
Ok(val) => val.into_handler_status(r),
120+
Err(e) => {
121+
ngx_log_error!(NGX_LOG_ERR, r.log(), "{e}");
122+
NGX_ERROR as _
123+
}
124+
}
125+
}
126+
}
127+
108128
impl IntoHandlerStatus for ngx_int_t {
109129
#[inline]
110130
fn into_handler_status(self, _r: &Request) -> ngx_int_t {
@@ -196,6 +216,16 @@ impl Request {
196216
unsafe { &mut *r.cast::<Request>() }
197217
}
198218

219+
/// Create a const [`Request`] from a const [`ngx_http_request_t`].
220+
///
221+
/// # Safety
222+
///
223+
/// The caller has provided a valid non-null pointer to a valid `ngx_http_request_t`
224+
/// which shares the same representation as `Request`.
225+
pub unsafe fn from_const_ngx_http_request<'a>(r: *const ngx_http_request_t) -> &'a Request {
226+
unsafe { &*r.cast::<Request>() }
227+
}
228+
199229
/// Is this the main request (as opposed to a subrequest)?
200230
pub fn is_main(&self) -> bool {
201231
let main = self.0.main.cast();
@@ -301,6 +331,11 @@ impl Request {
301331
}
302332
}
303333

334+
/// Get HTTP status of response.
335+
pub fn get_status(&self) -> HTTPStatus {
336+
HTTPStatus::from_u16(self.0.headers_out.status as u16).unwrap_or(HTTPStatus(0))
337+
}
338+
304339
/// Set HTTP status of response.
305340
pub fn set_status(&mut self, status: HTTPStatus) {
306341
self.0.headers_out.status = status.into();
@@ -370,6 +405,14 @@ impl Request {
370405
unsafe { Status(ngx_http_output_filter(&raw mut self.0, body)) }
371406
}
372407

408+
/// Get the output chain buffer.
409+
pub fn get_out(&self) -> Option<&ngx_chain_t> {
410+
if self.0.out.is_null() {
411+
return None;
412+
}
413+
unsafe { Some(&*self.0.out) }
414+
}
415+
373416
/// Perform internal redirect to a location
374417
pub fn internal_redirect(&self, location: &str) -> Status {
375418
assert!(!location.is_empty(), "uri location is empty");

0 commit comments

Comments
 (0)