Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/).

# 0.9.3

## Added
* `ShutdownHandle` — programmatic graceful shutdown that composes with the built-in OS signal handler. Construct via `ShutdownHandle::new()` or `ShutdownHandle::from_token(token)` / `From<CancellationToken>`. Trigger with `handle.shutdown()`; observe with `handle.is_shutdown_requested()` and `handle.cancelled()`.
* `App::with_shutdown()` — returns `(App, ShutdownHandle)` for the common case where the framework owns the handle.
* `App::with_shutdown_signal(handle)` — registers an externally-owned `ShutdownHandle` on an existing `App`.
* `App::shutdown_on(future)` — chains async triggers (e.g. an external watchdog future) that fire a graceful shutdown when they resolve. Composes with the OS signal handler and any `ShutdownHandle` already registered, and is safe to call before a Tokio runtime exists.

# 0.9.2

## Added
Expand Down
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ exclude = ["fuzz"]
resolver = "2"

[workspace.package]
version = "0.9.2"
version = "0.9.3"
license = "MIT"
edition = "2024"
rust-version = "1.90.0"
Expand All @@ -21,11 +21,11 @@ categories = [
]

[workspace.dependencies]
volga-dev-cert = { path = "volga-dev-cert", version = "0.9.2" }
volga-di = { path = "volga-di", version = "0.9.2" }
volga-macros = { path = "volga-macros", version = "0.9.2" }
volga-open-api = { path = "volga-open-api", version = "0.9.2" }
volga-rate-limiter = { path = "volga-rate-limiter", version = "0.9.2" }
volga-dev-cert = { path = "volga-dev-cert", version = "0.9.3" }
volga-di = { path = "volga-di", version = "0.9.3" }
volga-macros = { path = "volga-macros", version = "0.9.3" }
volga-open-api = { path = "volga-open-api", version = "0.9.3" }
volga-rate-limiter = { path = "volga-rate-limiter", version = "0.9.3" }

[workspace.lints.rust]
unsafe_code = "forbid"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Fast, simple, and high-performance web framework for Rust, built on top of
Volga is designed to make building HTTP services straightforward and explicit,
while keeping performance predictable and overhead minimal.

[![latest](https://img.shields.io/badge/latest-0.9.2-blue)](https://crates.io/crates/volga)
[![latest](https://img.shields.io/badge/latest-0.9.3-blue)](https://crates.io/crates/volga)
[![latest](https://img.shields.io/badge/rustc-1.90+-964B00)](https://releases.rs/docs/1.90.0/)
[![License: MIT](https://img.shields.io/badge/License-MIT-violet.svg)](https://github.com/RomanEmreis/volga/blob/main/LICENSE)
[![Build](https://github.com/RomanEmreis/volga/actions/workflows/rust.yml/badge.svg)](https://github.com/RomanEmreis/volga/actions/workflows/rust.yml)
Expand Down Expand Up @@ -46,7 +46,7 @@ Volga is a good fit if you:
### Dependencies
```toml
[dependencies]
volga = "0.9.2"
volga = "0.9.3"
tokio = { version = "1", features = ["full"] }
```
### Simple request handler
Expand Down
2 changes: 1 addition & 1 deletion volga/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ memchr = "2.8.0"
mime = "0.3.17"
mime_guess = "2.0.5"
pin-project-lite = "0.2.17"
tokio = { version = "1.52.1", features = ["fs", "io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync", "time"] }
tokio = { version = "1.52.3", features = ["fs", "io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync", "time"] }
tokio-util = "0.7.18"
serde = "1.0.228"
serde_json = "1.0.149"
Expand Down
169 changes: 161 additions & 8 deletions volga/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use connection::Connection;
use hyper_util::rt::TokioIo;

use std::{
fmt,
future::Future,
pin::Pin,
sync::{Arc, Weak},
};

Expand Down Expand Up @@ -66,6 +68,7 @@ pub use crate::limits::Http2Limits;
#[cfg(feature = "tls")]
pub(crate) use app_env::GRACEFUL_SHUTDOWN_TIMEOUT;

use crate::app::shutdown::ShutdownHandle;
pub(crate) use app_env::AppEnv;

mod app_env;
Expand All @@ -76,6 +79,7 @@ mod host_env;
pub(crate) mod pipeline;
pub mod router;
pub(crate) mod scope;
pub(crate) mod shutdown;

/// The main entry point for building and running a Volga application.
///
Expand Down Expand Up @@ -199,6 +203,28 @@ pub struct App {
/// Pre-built config store (populated by `with_config`/`with_default_config`, passed to `AppEnv`)
#[cfg(feature = "config")]
pub(crate) config_store: Option<Arc<crate::config::ConfigStore>>,

/// Optional user-provided shutdown handle that composes with the
/// built-in OS signal handler.
shutdown_handle: Option<ShutdownHandle>,

/// Async triggers registered via [`App::shutdown_on`]. Drained and
/// spawned by [`App::run_internal`] once a Tokio runtime is active.
shutdown_triggers: ShutdownTriggers,
}

/// Async triggers queued via [`App::shutdown_on`]. Wraps a vector of
/// boxed futures with a manual [`fmt::Debug`] impl, since trait objects
/// don't implement [`Debug`].
#[derive(Default)]
struct ShutdownTriggers(Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>);

impl fmt::Debug for ShutdownTriggers {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ShutdownTriggers")
.field("len", &self.0.len())
.finish()
}
}

impl Default for App {
Expand Down Expand Up @@ -258,7 +284,99 @@ impl App {
openapi: Default::default(),
#[cfg(feature = "config")]
config_store: None,
shutdown_handle: None,
shutdown_triggers: ShutdownTriggers::default(),
}
}

/// Creates a new [`App`] paired with a fresh [`ShutdownHandle`].
///
/// The returned handle can be cloned freely. Calling
/// [`ShutdownHandle::shutdown`] from anywhere triggers a graceful
/// server shutdown that composes with the built-in OS signal
/// handler (Ctrl+C, SIGTERM).
///
/// # Example
///
/// ```no_run
/// use volga::App;
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
/// let (app, shutdown) = App::with_shutdown();
/// tokio::spawn(async move {
/// tokio::time::sleep(std::time::Duration::from_secs(60)).await;
/// shutdown.shutdown();
/// });
/// app.run().await
/// }
/// ```
pub fn with_shutdown() -> (Self, ShutdownHandle) {
let handle = ShutdownHandle::new();
(Self::new().with_shutdown_signal(handle.clone()), handle)
Comment thread
RomanEmreis marked this conversation as resolved.
}

/// Registers an external shutdown signal.
///
/// The handle is composed with the built-in OS signal handler
/// (Ctrl+C, SIGTERM on Unix, or the equivalents on Windows) —
/// whichever fires first triggers a graceful shutdown.
///
/// # Example
///
/// ```no_run
/// use volga::{App, ShutdownHandle};
///
/// let handle = ShutdownHandle::new();
/// let app = App::new().with_shutdown_signal(handle.clone());
/// // Later: handle.shutdown() triggers a graceful shutdown.
/// # let _ = app;
/// # let _ = handle;
/// ```
pub fn with_shutdown_signal(mut self, handle: ShutdownHandle) -> Self {
self.shutdown_handle = Some(handle);
self
}

/// Registers an async trigger that fires a graceful shutdown when
/// the given future resolves.
///
/// Multiple `shutdown_on` calls compose — any of the registered
/// futures resolving will trigger shutdown. The trigger composes
/// with both the OS signal handler and any [`ShutdownHandle`]
/// registered via [`App::with_shutdown`] or
/// [`App::with_shutdown_signal`]. If no handle has been registered,
/// a fresh internal one is created automatically.
///
/// The future is spawned onto the Tokio runtime when the app starts
/// running, so this method is safe to call before any runtime exists
/// (e.g. before [`App::run_blocking`]).
///
/// # Example
///
/// ```no_run
/// use volga::App;
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
/// let (tx, rx) = tokio::sync::oneshot::channel::<()>();
/// let app = App::new()
/// .bind("127.0.0.1:7878")
/// .shutdown_on(async move { let _ = rx.await; });
/// // Sending on `tx` later triggers a graceful shutdown.
/// # let _ = tx;
/// app.run().await
/// }
/// ```
pub fn shutdown_on<F>(mut self, future: F) -> Self
where
F: Future<Output = ()> + Send + 'static,
{
if self.shutdown_handle.is_none() {
self.shutdown_handle = Some(ShutdownHandle::new());
}
self.shutdown_triggers.0.push(Box::pin(future));
self
}

/// Returns the current bound socket address.
Expand Down Expand Up @@ -676,7 +794,7 @@ impl App {
}

#[inline]
async fn run_internal(self, tcp_listener: TcpListener) -> io::Result<()> {
async fn run_internal(mut self, tcp_listener: TcpListener) -> io::Result<()> {
#[cfg(all(debug_assertions, feature = "openapi"))]
if self.openapi.is_configure_but_not_exposed() {
#[cfg(feature = "tracing")]
Expand Down Expand Up @@ -705,7 +823,26 @@ impl App {

let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());
let shutdown_tx = Arc::new(shutdown_tx);
Self::shutdown_signal(shutdown_rx);

// Spawn any async triggers registered via `App::shutdown_on`.
// Each trigger cancels the handle's token when it resolves, and
// exits early if another arm cancels the token first — otherwise
// an unresolved watchdog future would leak its task after shutdown.
if !self.shutdown_triggers.0.is_empty() {
let handle = self.shutdown_handle.get_or_insert_with(ShutdownHandle::new);
let token = handle.token();
for trigger in self.shutdown_triggers.0.drain(..) {
let token = token.clone();
tokio::spawn(async move {
tokio::select! {
_ = trigger => token.cancel(),
_ = token.cancelled() => {}
}
});
}
}

Self::shutdown_signal(shutdown_rx, self.shutdown_handle.clone());

#[cfg(feature = "tls")]
let redirection_config = self
Expand Down Expand Up @@ -784,15 +921,31 @@ impl App {
}

#[inline]
fn shutdown_signal(shutdown_rx: watch::Receiver<()>) {
fn shutdown_signal(shutdown_rx: watch::Receiver<()>, handle: Option<ShutdownHandle>) {
let token = handle.map(|h| h.token()).unwrap_or_default();

// OS signal listener — spawned as its own task so it lives
// independently of the manual-shutdown path. Tokio's process-wide
// signal handlers are installed on first poll and never removed
// (see `tokio::signal::ctrl_c` docs), so dropping the polled
// signal future when the manual-shutdown path wins would leave
// future SIGINT/SIGTERM with no Volga listener. Keeping the
// listener alive ensures the next signal is still consumed and
// (idempotently) cancels the shared token.
let os_token = token.clone();
tokio::spawn(async move {
match Self::wait_for_shutdown_signal().await {
Ok(_) => (),
if let Err(_err) = Self::wait_for_shutdown_signal().await {
#[cfg(feature = "tracing")]
Err(err) => tracing::error!("unable to listen for shutdown signal: {err:#}"),
#[cfg(not(feature = "tracing"))]
Err(_) => (),
tracing::error!("unable to listen for shutdown signal: {_err:#}");
}
os_token.cancel();
});

// Watch the shared token: any source (OS signal, `ShutdownHandle`,
// or `App::shutdown_on` trigger) cancels it, and we then drop
// `shutdown_rx` to start the accept loop's drain.
tokio::spawn(async move {
token.cancelled_owned().await;
#[cfg(feature = "tracing")]
tracing::trace!("shutdown signal received, not accepting new requests");
drop(shutdown_rx);
Expand Down
Loading
Loading