diff --git a/examples/axum-app/Cargo.toml b/examples/axum-app/Cargo.toml index fde58513..165b3874 100644 --- a/examples/axum-app/Cargo.toml +++ b/examples/axum-app/Cargo.toml @@ -9,7 +9,7 @@ publish = false # and axum as your web-framework. [dependencies] axum = "0.8.1" -rinja = { version = "0.3.5", path = "../../rinja" } +rinja = { version = "0.3.5", path = "../../rinja", features = ["dynamic"] } tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] } # serde and strum are used to parse (deserialize) and generate (serialize) information diff --git a/examples/axum-app/src/main.rs b/examples/axum-app/src/main.rs index a9112e35..defc2b16 100644 --- a/examples/axum-app/src/main.rs +++ b/examples/axum-app/src/main.rs @@ -1,13 +1,16 @@ +use std::borrow::Cow; + use axum::extract::{Path, Query}; use axum::http::StatusCode; use axum::response::{Html, IntoResponse, Redirect, Response}; use axum::routing::get; use axum::{Router, serve}; use rinja::Template; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tower_http::trace::TraceLayer; use tracing::{Level, info}; +#[rinja::main] #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() @@ -52,7 +55,7 @@ enum Error { /// * `PartialEq` so that we can use the type in comparisons with `==` or `!=`. /// * `serde::Deserialize` so that axum can parse the type in incoming URLs. /// * `strum::Display` so that rinja can write the value in templates. -#[derive(Default, Debug, Clone, Copy, PartialEq, Deserialize, strum::Display)] +#[derive(Default, Debug, Clone, Copy, PartialEq, Deserialize, Serialize, strum::Display)] #[allow(non_camel_case_types)] enum Lang { #[default] @@ -130,8 +133,8 @@ async fn index_handler( // In `IndexHandlerQuery` we annotated the field with `#[serde(default)]`, so if the value is // absent, an empty string is selected by default, which is visible to the user an empty // `` element. - #[derive(Debug, Template)] - #[template(path = "index.html")] + #[derive(Debug, Template, Serialize, Deserialize)] + #[template(path = "index.html", dynamic = true)] struct Tmpl { lang: Lang, name: String, @@ -158,16 +161,17 @@ async fn greeting_handler( Path((lang,)): Path<(Lang,)>, Query(query): Query, ) -> Result { - #[derive(Debug, Template)] - #[template(path = "greet.html")] - struct Tmpl { + #[derive(Debug, Template, Serialize, Deserialize)] + #[template(path = "greet.html", dynamic = true, print = "code")] + struct Tmpl<'a> { lang: Lang, - name: String, + #[serde(borrow)] + name: Cow<'a, str>, } let template = Tmpl { lang, - name: query.name, + name: query.name.into(), }; Ok(Html(template.render()?)) } diff --git a/rinja/Cargo.toml b/rinja/Cargo.toml index efc71169..6001e454 100644 --- a/rinja/Cargo.toml +++ b/rinja/Cargo.toml @@ -28,6 +28,12 @@ rinja_derive = { version = "=0.3.5", path = "../rinja_derive" } itoa = "1.0.11" +# needed by feature "dynamic" +linkme = { version = "0.3.31", optional = true } +notify = { version = "8.0.0", optional = true } +parking_lot = { version = "0.12.3", optional = true, features = ["arc_lock", "send_guard"] } +tokio = { version = "1.43.0", optional = true, features = ["macros", "io-util", "net", "process", "rt", "sync", "time"] } + # needed by feature "serde_json" serde = { version = "1.0", optional = true, default-features = false } serde_json = { version = "1.0", optional = true, default-features = false, features = [] } @@ -43,7 +49,7 @@ criterion = "0.5" maintenance = { status = "actively-developed" } [features] -default = ["config", "std", "urlencode"] +default = ["config", "std", "urlencode", "dynamic"] full = ["default", "code-in-doc", "serde_json"] alloc = [ @@ -54,6 +60,16 @@ alloc = [ ] code-in-doc = ["rinja_derive/code-in-doc"] config = ["rinja_derive/config"] +dynamic = [ + "std", + "rinja_derive/dynamic", + "serde/derive", + "dep:linkme", + "dep:notify", + "dep:parking_lot", + "dep:serde_json", + "dep:tokio", +] serde_json = ["rinja_derive/serde_json", "dep:serde", "dep:serde_json"] std = [ "alloc", diff --git a/rinja/src/dynamic/child.rs b/rinja/src/dynamic/child.rs new file mode 100644 index 00000000..94b1d205 --- /dev/null +++ b/rinja/src/dynamic/child.rs @@ -0,0 +1,236 @@ +use std::borrow::Cow; +use std::env::args; +use std::fmt::Write; +use std::io::ErrorKind; +use std::net::SocketAddr; +use std::process::exit; +use std::string::String; +use std::sync::Arc; +use std::time::Duration; +use std::vec::Vec; +use std::{eprintln, format}; + +use linkme::distributed_slice; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::spawn; +use tokio::sync::{Mutex, oneshot}; + +use super::{DYNAMIC_ENVIRON_KEY, MainRequest, MainResponse, Outcome}; + +const PROCESSORS: usize = 4; + +#[inline(never)] +pub(crate) fn run_dynamic_main() { + std::env::set_var(DYNAMIC_ENVIRON_KEY, "-"); + + let mut entries: Vec<_> = DYNAMIC_TEMPLATES.iter().map(|entry| entry.name()).collect(); + entries.sort_unstable(); + eprintln!("templates implemented by subprocess: {entries:?}"); + for window in entries.windows(2) { + if let &[a, b] = window { + if a == b { + eprintln!("duplicated dynamic template {a:?}"); + } + } + } + + let sock_addr: SocketAddr = { + let mut args = args().fuse(); + let (_, Some("--__rinja_dynamic"), Some(sock_addr), None) = ( + args.next(), + args.next().as_deref(), + args.next(), + args.next(), + ) else { + eprintln!("child process got unexpected arguments"); + exit(1); + }; + match serde_json::from_str(&sock_addr) { + Ok(sock_addr) => sock_addr, + Err(err) => { + eprintln!("subprocess could not interpret socket addr: {err}"); + exit(1); + } + } + }; + + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(err) => { + eprintln!("could not start tokio runtime: {err}"); + exit(1); + } + }; + let _ = rt.block_on(async move { + let sock = match TcpStream::connect(sock_addr).await { + Ok(sock) => sock, + Err(err) => { + eprintln!("subprocess could not connect to parent process: {err}"); + exit(1); + } + }; + let _: Result<(), std::io::Error> = sock.set_linger(None); + let _: Result<(), std::io::Error> = sock.set_nodelay(true); + let (read, write) = sock.into_split(); + + let stdout = Arc::new(Mutex::new(write)); + let stdin = Arc::new(Mutex::new(BufReader::new(read))); + let (done_tx, done_rx) = oneshot::channel(); + let done = Arc::new(Mutex::new(Some(done_tx))); + + let mut threads = Vec::with_capacity(PROCESSORS); + for _ in 0..PROCESSORS { + threads.push(spawn(dynamic_processor( + Arc::clone(&stdout), + Arc::clone(&stdin), + Arc::clone(&done), + ))); + } + + done_rx.await.map_err(|err| { + std::io::Error::new(ErrorKind::BrokenPipe, format!("lost result channel: {err}")); + }) + }); + rt.shutdown_timeout(Duration::from_secs(5)); + exit(0) +} + +async fn dynamic_processor( + stdout: Arc>, + stdin: Arc>>, + done: Arc>>>>, +) { + let done = move |result: Result<(), std::io::Error>| { + let done = Arc::clone(&done); + async move { + let mut lock = done.lock().await; + if let Some(done) = lock.take() { + let _: Result<_, _> = done.send(result); + } + } + }; + + let mut line_buf = String::new(); + let mut response_buf = String::new(); + loop { + line_buf.clear(); + match stdin.lock().await.read_line(&mut line_buf).await { + Ok(n) if n > 0 => {} + result => return done(result.map(|_| ())).await, + } + let line = line_buf.trim_ascii(); + if line.is_empty() { + continue; + } + + let MainRequest { callid, name, data } = match serde_json::from_str(line) { + Ok(req) => req, + Err(err) => { + let err = format!("could not deserialize request: {err}"); + return done(Err(std::io::Error::new(ErrorKind::InvalidData, err))).await; + } + }; + response_buf.clear(); + + let mut outcome = Outcome::NotFound; + for entry in DYNAMIC_TEMPLATES { + if entry.name() == name { + outcome = entry.dynamic_render(&mut response_buf, &data); + break; + } + } + + // SAFETY: `serde_json` writes valid UTF-8 data + let mut line = unsafe { line_buf.as_mut_vec() }; + + line.clear(); + if let Err(err) = serde_json::to_writer(&mut line, &MainResponse { callid, outcome }) { + let err = format!("could not serialize response: {err}"); + return done(Err(std::io::Error::new(ErrorKind::InvalidData, err))).await; + } + line.push(b'\n'); + + let is_done = { + let mut stdout = stdout.lock().await; + stdout.write_all(line).await.is_err() || stdout.flush().await.is_err() + }; + if is_done { + return done(Ok(())).await; + } + } +} + +/// Used by [`Template`][rinja_derive::Template] to register a template for dynamic processing. +#[macro_export] +macro_rules! register_dynamic_template { + ( + name: $Name:ty, + type: $Type:ty, + ) => { + const _: () = { + #[$crate::helpers::linkme::distributed_slice($crate::helpers::DYNAMIC_TEMPLATES)] + #[linkme(crate = $crate::helpers::linkme)] + static DYNAMIC_TEMPLATES: &'static dyn $crate::helpers::DynamicTemplate = &Dynamic; + + struct Dynamic; + + impl $crate::helpers::DynamicTemplate for Dynamic { + #[inline] + fn name(&self) -> &$crate::helpers::core::primitive::str { + $crate::helpers::core::any::type_name::<$Name>() + } + + fn dynamic_render<'a>( + &self, + buf: &'a mut rinja::helpers::alloc::string::String, + value: &rinja::helpers::core::primitive::str, + ) -> rinja::helpers::Outcome<'a> { + let result = rinja::helpers::from_json::<$Type>(value).map(|tmpl| { + buf.clear(); + let _ = buf.try_reserve(::SIZE_HINT); + tmpl.render_into(buf) + }); + $crate::helpers::use_dynamic_render_result(buf, result) + } + } + }; + }; +} + +/// Convert the result of [`serde::from_json()`] → [`Template::render()`] to an [`Outcome`]. +pub fn use_dynamic_render_result( + buf: &mut String, + result: Result, serde_json::Error>, +) -> Outcome<'_> { + let result = match &result { + Ok(Ok(())) => return Outcome::Success(Cow::Borrowed(buf)), + Ok(Err(err)) => Ok(err), + Err(err) => Err(err), + }; + + buf.clear(); + let result = match result { + Ok(e) => write!(buf, "{e}").map(|_| Outcome::Render(Cow::Borrowed(buf))), + Err(e) => write!(buf, "{e}").map(|_| Outcome::Deserialize(Cow::Borrowed(buf))), + }; + result.unwrap_or(Outcome::Fmt) +} + +/// List of implemented dynamic templates. Filled through +/// [`register_dynamic_template!`][crate::register_dynamic_template]. +#[distributed_slice] +pub static DYNAMIC_TEMPLATES: [&'static dyn DynamicTemplate]; + +/// A dynamic template implementation +pub trait DynamicTemplate: Send + Sync { + /// The type name of the template. + fn name(&self) -> &str; + + /// Take a JSON `value` to to render the template into `buf`. + fn dynamic_render<'a>(&self, buf: &'a mut String, value: &str) -> Outcome<'a>; +} diff --git a/rinja/src/dynamic/mod.rs b/rinja/src/dynamic/mod.rs new file mode 100644 index 00000000..1f14bf38 --- /dev/null +++ b/rinja/src/dynamic/mod.rs @@ -0,0 +1,102 @@ +pub(crate) mod child; +pub(crate) mod parent; + +use std::borrow::Cow; +use std::env::var_os; +use std::process::exit; +use std::sync::OnceLock; +use std::{eprintln, fmt}; + +use serde::{Deserialize, Serialize}; + +const DYNAMIC_ENVIRON_KEY: &str = "__rinja_dynamic"; +const DYNAMIC_ENVIRON_VALUE: &str = env!("CARGO_PKG_VERSION"); + +#[derive(Debug, Serialize, Deserialize)] +struct MainRequest<'a> { + callid: u64, + #[serde(borrow)] + name: Cow<'a, str>, + data: Cow<'a, str>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct MainResponse<'a> { + callid: u64, + #[serde(borrow, flatten)] + outcome: Outcome<'a>, +} + +/// The outcome of a dynamic template call. +#[derive(Debug, Serialize, Deserialize)] +pub enum Outcome<'a> { + /// The template was rendered correctly. + #[serde(borrow)] + Success(Cow<'a, str>), + /// The JSON serialized template could not be deserialized. + #[serde(borrow)] + Deserialize(Cow<'a, str>), + /// The template was not rendered correctly. + #[serde(borrow)] + Render(Cow<'a, str>), + /// The template's type name was not known to the subprocess. + NotFound, + /// An error occurred but the error could not be printed. + Fmt, +} + +impl std::error::Error for Outcome<'_> {} + +impl fmt::Display for Outcome<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Outcome::Success(_) => write!(f, "not an error"), + Outcome::Deserialize(err) => write!(f, "could not deserialize: {err}"), + Outcome::Render(err) => write!(f, "could not render: {err}"), + Outcome::NotFound => write!(f, "template not found"), + Outcome::Fmt => write!(f, "could not format error message"), + } + } +} + +/// True if the current process is a dynamic subprocess. +#[inline] +#[track_caller] +fn am_dynamic_child() -> bool { + #[inline(never)] + #[cold] + #[track_caller] + fn uninitialized() -> bool { + unreachable!("init_am_dynamic_child() was never called"); + } + + *AM_DYNAMIC_CHILD.get_or_init(uninitialized) +} + +pub(crate) fn init_am_dynamic_child() -> bool { + let value = if let Some(var) = var_os(DYNAMIC_ENVIRON_KEY) { + let Some(var) = var.to_str() else { + eprintln!("Environment variable {DYNAMIC_ENVIRON_KEY} does not contain UTF-8 data"); + exit(1); + }; + match var { + DYNAMIC_ENVIRON_VALUE => true, + "" => false, + var => { + eprintln!( + "\ + Environment variable {DYNAMIC_ENVIRON_KEY} contains wrong value. \ + Expected: {DYNAMIC_ENVIRON_VALUE:?}, actual: {var:?}" + ); + exit(1); + } + } + } else { + false + }; + + AM_DYNAMIC_CHILD.set(value).unwrap(); + value +} + +static AM_DYNAMIC_CHILD: OnceLock = OnceLock::new(); diff --git a/rinja/src/dynamic/parent.rs b/rinja/src/dynamic/parent.rs new file mode 100644 index 00000000..76462abc --- /dev/null +++ b/rinja/src/dynamic/parent.rs @@ -0,0 +1,423 @@ +use std::boxed::Box; +use std::collections::BTreeMap; +use std::convert::Infallible; +use std::env::current_exe; +use std::io::ErrorKind; +use std::net::Ipv4Addr; +use std::ops::ControlFlow; +use std::process::{Stdio, exit}; +use std::string::{String, ToString}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; +use std::{eprintln, format}; + +use notify::{Watcher, recommended_watcher}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpListener; +use tokio::net::tcp::OwnedReadHalf; +use tokio::process::Command; +use tokio::runtime::Handle; +use tokio::sync::{Mutex, mpsc, oneshot}; +use tokio::time::{Instant, sleep, sleep_until, timeout, timeout_at}; +use tokio::{select, try_join}; + +use super::{Outcome, am_dynamic_child}; +use crate::dynamic::{DYNAMIC_ENVIRON_KEY, DYNAMIC_ENVIRON_VALUE, MainRequest, MainResponse}; +use crate::{Error, Values}; + +static QUEUE: Queue = Queue::new(); +static RUNTIME: std::sync::Mutex> = std::sync::Mutex::new(None); + +const QUEUE_SIZE: usize = 4; +const TIMEOUT: Duration = Duration::from_secs(10); + +/// Renders a template dynamically we are inside the parent process. +#[inline] +pub fn maybe_render_dynamic_into( + name: &dyn Fn() -> &'static str, + serialize: &dyn Fn() -> Result, + dest: &mut (impl std::fmt::Write + ?Sized), + values: &dyn Values, +) -> Result, Error> { + if am_dynamic_child() { + return Ok(ControlFlow::Continue(())); + } + + let _ = values; // TODO + let data = request(name, serialize)?; + dest.write_str(&data)?; + Ok(ControlFlow::Break(())) +} + +#[inline] +fn request( + name: &dyn Fn() -> &'static str, + serialize: &dyn Fn() -> Result, +) -> Result { + let callid = QUEUE.callid.fetch_add(1, Ordering::Relaxed); + let request = MainRequest { + callid, + name: std::borrow::Cow::Borrowed(name()), + data: serialize() + .map_err(|err| Error::custom(format!("could not serialize template: {err}")))? + .into(), + }; + let mut request = serde_json::to_string(&{ request }) + .map_err(|err| Error::custom(format!("could not serialize request: {err}")))?; + request.push('\n'); + + let deadline = Instant::now() + TIMEOUT; + let (response_tx, response_rx) = oneshot::channel(); + let request = Box::new(Request { + callid, + request, + response: response_tx, + }); + + let result_arc = Arc::new(parking_lot::Mutex::new(None)); + + let runtime = RUNTIME.lock().unwrap().clone().unwrap(); + runtime.spawn({ + let mut result_guard = result_arc.try_lock_arc().unwrap(); + async move { + *result_guard = Some(handle_request(deadline, request, response_rx).await); + } + }); + + let mut result_guard = result_arc.lock(); + match result_guard.take() { + Some(result) => result, + None => Err(Error::custom("lost connection to handler thread (panic?)")), + } +} + +async fn handle_request( + deadline: Instant, + request: Box, + response_rx: oneshot::Receiver>, +) -> Result { + let insert = async { + QUEUE + .get_channel() + .await + .0 + .lock() + .await + .send(request) + .await + .map_err(|_| Error::custom("channel was closed unexpectedly (impossible)")) + }; + let response = async { + match response_rx.await { + Ok(response) => response, + Err(_) => Err(Error::custom("request got lost")), + } + }; + let write_and_read = async { + let (_, data) = try_join!(insert, response)?; + Ok(data) + }; + + if let Ok(result) = timeout_at(deadline, write_and_read).await { + result + } else { + Err(Error::custom("deadline expired")) + } +} + +#[inline(never)] +pub(crate) fn run_dynamic_main() { + struct Bomb; + + impl Drop for Bomb { + fn drop(&mut self) { + exit(1); + } + } + + std::thread::spawn(move || { + let _bomb = Bomb; + match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => { + *RUNTIME.try_lock().unwrap() = Some(rt.handle().clone()); + match rt.block_on(dynamic_dispatcher()) { + Err(err) => eprintln!("dynamic dispatcher execution stopped: {err}"), + } + } + Err(err) => { + eprintln!("could not start tokio runtime: {err}"); + } + }; + }); +} + +async fn dynamic_dispatcher() -> std::io::Result { + let (stdout_tx, stdout_rx) = mpsc::channel(1); + let pending_responses: Arc>> = Arc::default(); + + let read_loop = read_loop(stdout_rx, Arc::clone(&pending_responses)); + let respawn_loop = respawn_loop(stdout_tx, pending_responses); + + #[allow(unreachable_code)] // The `Ok(!)` path cannot be entered. + match try_join!(read_loop, respawn_loop) { + Err(err) => Err(err), + } +} + +type PendingResponses = Arc>>>>; + +async fn read_loop( + mut stdout_rx: mpsc::Receiver, + pending_responses: PendingResponses, +) -> Result { + let mut line_buf = String::new(); + 'read_loop: loop { + let read_sock = stdout_rx.recv().await.unwrap(); + let mut read_sock = BufReader::new(read_sock); + loop { + line_buf.clear(); + match read_sock.read_line(&mut line_buf).await { + Ok(0) => { + eprintln!("stdin closed"); + continue 'read_loop; + } + Ok(_) => {} + Err(err) => { + eprintln!("stdin broken: {err}"); + continue 'read_loop; + } + } + let line = line_buf.trim_ascii(); + if line.is_empty() { + continue; + } + + let MainResponse { callid, outcome } = match serde_json::from_str(line) { + Ok(outcome) => outcome, + Err(err) => { + let err = format!("could not deserialize response: {err}"); + return Err(std::io::Error::new(ErrorKind::InvalidData, err)); + } + }; + let request_response = pending_responses.lock().await.remove(&callid); + let result = if let Outcome::Success(data) = outcome { + Ok(data.into_owned()) + } else { + Err(Error::custom(outcome.to_string())) + }; + if let Some(request_response) = request_response { + let _: Result<(), _> = request_response.send(result); + } + } + } +} + +async fn respawn_loop( + stdout_tx: mpsc::Sender, + pending_responses: PendingResponses, +) -> Result { + let exe_path = current_exe().map_err(|err| { + std::io::Error::new( + ErrorKind::NotFound, + format!("could not find current executable's path: {err}"), + ) + })?; + + loop { + let exe_time = match exe_path.metadata().and_then(|m| m.modified()) { + Ok(exe_hash) => exe_hash, + Err(err) => { + eprintln!( + "could not get modification timestamp of current exe file, retrying: {err}" + ); + sleep(Duration::from_millis(1000)).await; + continue; + } + }; + + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await?; + let addr = listener.local_addr()?; + let addr = serde_json::to_string(&addr)?; + + let (ev_tx, mut ev_rx) = mpsc::channel(1); + let ev_tx = Arc::new(ev_tx); + let handle = Handle::current(); + let on_event = move |_| { + let ev_tx = Arc::clone(&ev_tx); + handle.spawn(async move { + let _ = ev_tx.send(()).await; + }); + }; + let _watcher = match recommended_watcher(on_event).and_then(|mut watcher| { + watcher.watch(&exe_path, notify::RecursiveMode::NonRecursive)?; + Ok(watcher) + }) { + Ok(watcher) => watcher, + Err(err) => { + eprintln!("cannot monitor subprocess exe path, retrying: {err}"); + sleep(Duration::from_millis(1000)).await; + continue; + } + }; + let notify_loop = async { + loop { + if ev_rx.recv().await.is_none() { + break; + } else if let Ok(ts) = exe_path.metadata().and_then(|m| m.modified()) { + if ts != exe_time { + break; + } + } else { + break; + } + } + }; + + let child = Command::new(&exe_path) + .arg("--__rinja_dynamic") + .arg(addr) + .env(DYNAMIC_ENVIRON_KEY, DYNAMIC_ENVIRON_VALUE) + .stdin(Stdio::null()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .spawn(); + pending_responses.lock().await.clear(); + let mut child = match child { + Ok(child) => child, + Err(err) => { + eprintln!("could not start child process, retrying: {err}"); + sleep(Duration::from_millis(1000)).await; + continue; + } + }; + + let sock = match timeout(Duration::from_secs(5), listener.accept()).await { + Ok(Ok((sock, _))) => Ok(sock), + Ok(Err(err)) => { + eprintln!("could not accept connection of subprocess: {err}"); + Err(()) + } + Err(_) => { + eprintln!("timeout while waiting for subprocess connection"); + Err(()) + } + }; + if let Ok(sock) = sock { + let sock = sock; + let _: Result<(), std::io::Error> = sock.set_linger(None); + let _: Result<(), std::io::Error> = sock.set_nodelay(true); + let (sock_read, mut sock_write) = sock.into_split(); + let _: Result<(), _> = stdout_tx.send(sock_read).await; + + let pending_responses_ = Arc::clone(&pending_responses); + let read_loop = async move { + let pending_responses = pending_responses_; + loop { + let Request { + callid, + request, + response, + .. + } = *QUEUE + .get_channel() + .await + .1 + .lock() + .await + .recv() + .await + .unwrap(); + pending_responses.lock().await.insert(callid, response); + + if let Err(err) = sock_write.write_all(request.as_bytes()).await { + return err; + } + if let Err(err) = sock_write.flush().await { + return err; + } + } + }; + + eprintln!("subprocess ready"); + select! { + biased; + exit_code = child.wait() => match exit_code { + Ok(exit_status) => { + eprintln!("subprocess died with {exit_status}, restarting"); + }, + Err(err) => { + eprintln!("could not query subprocess exit status, restarting: {err}"); + } + }, + err = read_loop => { + eprintln!("could not read from subprocess, restarting: {err}"); + }, + _ = notify_loop => { + eprintln!("subprocess exe was changed, restarting"); + } + } + }; + + eprintln!("stopping subprocess"); + let deadline = Instant::now() + Duration::from_millis(250); + if !matches!(child.try_wait(), Ok(Some(_))) { + let _ = child.start_kill(); + if !matches!(timeout_at(deadline, child.wait()).await, Ok(Ok(_))) { + let _ = child.kill().await; + let _ = child.wait().await; + } + } + sleep_until(deadline).await; + } +} + +type RequestChannel = Mutex< + Option< + Arc<( + Mutex>>, + Mutex>>, + )>, + >, +>; + +struct Queue { + callid: AtomicU64, + channel: RequestChannel, +} + +struct Request { + callid: u64, + request: String, + response: RequestResponse, +} + +type RequestResponse = oneshot::Sender>; + +impl Queue { + const fn new() -> Self { + Self { + callid: AtomicU64::new(0), + channel: Mutex::const_new(None), + } + } + + async fn get_channel( + &self, + ) -> Arc<( + Mutex>>, + Mutex>>, + )> { + Arc::clone(match &mut *self.channel.lock().await { + Some(channel) => channel, + channel @ None => { + let (tx, rx) = mpsc::channel(QUEUE_SIZE); + *channel = Some(Arc::new((Mutex::new(tx), Mutex::new(rx)))); + channel.as_ref().unwrap() + } + }) + } +} diff --git a/rinja/src/helpers.rs b/rinja/src/helpers.rs index 9d543663..e9d00661 100644 --- a/rinja/src/helpers.rs +++ b/rinja/src/helpers.rs @@ -12,6 +12,17 @@ use core::iter::{Enumerate, Peekable}; use core::ops::Deref; use core::pin::Pin; +#[cfg(feature = "dynamic")] +pub use { + crate::dynamic::{ + Outcome, + child::{DYNAMIC_TEMPLATES, DynamicTemplate, use_dynamic_render_result}, + parent::maybe_render_dynamic_into, + }, + linkme, + serde_json::{from_str as from_json, to_string as to_json}, +}; + pub use crate::error::{ErrorMarker, ResultConverter}; use crate::filters::FastWritable; pub use crate::values::get_value; diff --git a/rinja/src/lib.rs b/rinja/src/lib.rs index eecca307..dc623d07 100644 --- a/rinja/src/lib.rs +++ b/rinja/src/lib.rs @@ -65,6 +65,8 @@ extern crate alloc; #[cfg(feature = "std")] extern crate std; +#[cfg(feature = "dynamic")] +mod dynamic; mod error; pub mod filters; #[doc(hidden)] @@ -78,7 +80,7 @@ use core::fmt; #[cfg(feature = "std")] use std::io; -pub use rinja_derive::Template; +pub use rinja_derive::{Template, main}; #[doc(hidden)] pub use crate as shared; @@ -370,6 +372,19 @@ macro_rules! impl_for_ref { pub(crate) use impl_for_ref; +/// Used by [`#[rinja::main]`][crate::main] to run a subprocess for dynamic rendering. +/// +/// If manually invoked in `fn main()`, you need to place the call at the very top of the function. +/// Only initializations for e.g. `log` or `tracing` can come before. +pub fn run_dynamic_main() { + #[cfg(feature = "dynamic")] + if dynamic::init_am_dynamic_child() { + dynamic::child::run_dynamic_main(); + } else { + dynamic::parent::run_dynamic_main(); + } +} + #[cfg(all(test, feature = "alloc"))] mod tests { use std::fmt; diff --git a/rinja_derive/Cargo.toml b/rinja_derive/Cargo.toml index 4306ab63..715730bb 100644 --- a/rinja_derive/Cargo.toml +++ b/rinja_derive/Cargo.toml @@ -41,6 +41,7 @@ syn = { version = "2.0.3", features = ["full"] } alloc = [] code-in-doc = ["dep:pulldown-cmark"] config = ["dep:serde", "dep:basic-toml", "parser/config"] +dynamic = ["syn/full"] urlencode = [] serde_json = [] std = ["alloc"] diff --git a/rinja_derive/src/generator/node.rs b/rinja_derive/src/generator/node.rs index d3417b8d..420a4071 100644 --- a/rinja_derive/src/generator/node.rs +++ b/rinja_derive/src/generator/node.rs @@ -25,6 +25,87 @@ impl<'a> Generator<'a, '_> { ctx: &Context<'a>, buf: &mut Buffer, ) -> Result { + #[cfg(feature = "dynamic")] + use std::fmt::{self, Write}; + + #[cfg(feature = "dynamic")] + #[derive(Debug, Clone, Copy, Default)] + struct LifetimesTimesN(usize, &'static str); + + #[cfg(feature = "dynamic")] + impl fmt::Display for LifetimesTimesN { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.0 > 0 { + f.write_char('<')?; + for i in 0..self.0 { + if i > 0 { + f.write_char(',')?; + } + f.write_str(self.1)?; + } + f.write_char('>')?; + } + Ok(()) + } + } + + #[cfg(feature = "dynamic")] + let lt = if self.input.dynamic { + let item = match &self.input.ast.data { + syn::Data::Struct(_) => "struct", + syn::Data::Enum(_) => "enum", + syn::Data::Union(_) => "union", + }; + + let (lt, typ, cnst) = self.input.ast.generics.params.iter().fold( + (0, 0, 0), + |(lt, typ, cnst), p| match p { + syn::GenericParam::Lifetime(_) => (lt + 1, typ, cnst), + syn::GenericParam::Type(_) => (lt, typ + 1, cnst), + syn::GenericParam::Const(_) => (lt, typ, cnst + 1), + }, + ); + if typ > 0 || cnst > 0 { + return Err(CompileError::new( + format_args!( + "`dynamic` templates are not implemented for `{item}`s with generics" + ), + None, + )); + } + + buf.write(format_args!( + "\ + rinja::register_dynamic_template! {{\ + name: {ident}{lt_static},\ + type: {ident}{lt_any},\ + }}", + ident = &self.input.ast.ident, + lt_static = LifetimesTimesN(lt, "'static"), + lt_any = LifetimesTimesN(lt, "'_"), + )); + lt + } else { + 0 + }; + + #[cfg(feature = "dynamic")] + if self.input.dynamic { + buf.write(format_args!( + "\ + if rinja::helpers::maybe_render_dynamic_into(\ + &rinja::helpers::core::any::type_name::<{ident}{lt_static}>,\ + &|| rinja::helpers::to_json(self),\ + __rinja_writer,\ + __rinja_values\ + )?.is_break() {{\ + return rinja::Result::Ok(());\ + }}", + ident = &self.input.ast.ident, + lt_static = LifetimesTimesN(lt, "'static"), + )); + } + buf.set_discard(self.buf_writable.discard); let size_hint = if let Some(heritage) = self.heritage { self.handle(heritage.root, heritage.root.nodes, buf, AstLevel::Top) diff --git a/rinja_derive/src/input.rs b/rinja_derive/src/input.rs index 0243133a..170ea453 100644 --- a/rinja_derive/src/input.rs +++ b/rinja_derive/src/input.rs @@ -26,6 +26,8 @@ pub(crate) struct TemplateInput<'a> { pub(crate) block: Option<&'a str>, pub(crate) print: Print, pub(crate) escaper: &'a str, + #[cfg(feature = "dynamic")] + pub(crate) dynamic: bool, pub(crate) path: Arc, pub(crate) fields: Vec, } @@ -47,6 +49,8 @@ impl TemplateInput<'_> { ext, ext_span, syntax, + #[cfg(feature = "dynamic")] + dynamic, .. } = args; @@ -135,6 +139,8 @@ impl TemplateInput<'_> { escaper, path, fields, + #[cfg(feature = "dynamic")] + dynamic: dynamic.unwrap_or_default(), }) } @@ -338,6 +344,8 @@ pub(crate) struct TemplateArgs { ext_span: Option, syntax: Option, config: Option, + #[cfg(feature = "dynamic")] + dynamic: Option, pub(crate) whitespace: Option, pub(crate) template_span: Option, pub(crate) config_span: Option, @@ -386,6 +394,8 @@ impl TemplateArgs { ext_span: args.ext.as_ref().map(|value| value.span()), syntax: args.syntax.map(|value| value.value()), config: args.config.as_ref().map(|value| value.value()), + #[cfg(feature = "dynamic")] + dynamic: args.dynamic.map(|value| value.value()), whitespace: args.whitespace, template_span: Some(args.template.span()), config_span: args.config.as_ref().map(|value| value.span()), @@ -402,6 +412,8 @@ impl TemplateArgs { ext_span: None, syntax: None, config: None, + #[cfg(feature = "dynamic")] + dynamic: None, whitespace: None, template_span: None, config_span: None, @@ -673,6 +685,7 @@ pub(crate) struct PartialTemplateArgs { pub(crate) syntax: Option, pub(crate) config: Option, pub(crate) whitespace: Option, + pub(crate) dynamic: Option, } #[derive(Clone)] @@ -732,6 +745,7 @@ const _: () = { syntax: None, config: None, whitespace: None, + dynamic: None, }; let mut has_data = false; @@ -817,6 +831,8 @@ const _: () = { set_strlit_pair(ident, value, &mut this.config)?; } else if ident == "whitespace" { set_parseable_string(ident, value, &mut this.whitespace)?; + } else if ident == "dynamic" { + set_boollit_pair(ident, value, &mut this.dynamic)?; } else { return Err(CompileError::no_file_info( format!("unsupported template attribute `{ident}` found"), @@ -851,6 +867,16 @@ const _: () = { Ok(()) } + fn set_boollit_pair( + name: &Ident, + value: ExprLit, + dest: &mut Option, + ) -> Result<(), CompileError> { + ensure_only_once(name, dest)?; + *dest = Some(get_boollit(name, value)?); + Ok(()) + } + fn set_parseable_string>( name: &Ident, value: ExprLit, diff --git a/rinja_derive/src/lib.rs b/rinja_derive/src/lib.rs index fe1b2ece..4bd05935 100644 --- a/rinja_derive/src/lib.rs +++ b/rinja_derive/src/lib.rs @@ -142,6 +142,40 @@ pub fn derive_template(input: TokenStream12) -> TokenStream12 { } } +#[allow(clippy::useless_conversion)] // To be compatible with both `TokenStream`s +#[cfg_attr(not(feature = "__standalone"), proc_macro_attribute)] +#[must_use] +pub fn main(_attr: TokenStream12, item: TokenStream12) -> TokenStream12 { + main_impl(item.into()).into() +} + +#[cfg(not(feature = "dynamic"))] +fn main_impl(item: TokenStream) -> TokenStream { + item +} + +#[cfg(feature = "dynamic")] +fn main_impl(item: TokenStream) -> TokenStream { + let func: syn::ItemFn = match syn::parse2(item) { + Ok(func) => func, + Err(err) => { + return compile_error(std::iter::once(err.to_string()), Span::call_site()); + } + }; + let output = &func.sig.output; + syn::parse_quote_spanned! { + func.sig.ident.span() => + fn main() #output { + { + extern crate rinja as rinja; + rinja::run_dynamic_main(); + return main(); + } + #func + } + } +} + fn compile_error(msgs: impl Iterator, span: Span) -> TokenStream { quote_spanned! { span => diff --git a/rinja_derive_standalone/Cargo.toml b/rinja_derive_standalone/Cargo.toml index 63686383..e206ab8a 100644 --- a/rinja_derive_standalone/Cargo.toml +++ b/rinja_derive_standalone/Cargo.toml @@ -48,6 +48,7 @@ __standalone = [] code-in-doc = ["dep:pulldown-cmark"] config = ["dep:serde", "dep:basic-toml", "parser/config"] +dynamic = [] urlencode = [] serde_json = []