diff --git a/examples/watch_pods.rs b/examples/watch_pods.rs index 98d3918b..71e32f5b 100644 --- a/examples/watch_pods.rs +++ b/examples/watch_pods.rs @@ -12,17 +12,8 @@ use tracing::Instrument; #[derive(Parser)] #[clap(version)] struct Args { - /// The tracing filter used for logs - #[clap( - long, - env = "KUBERT_EXAMPLE_LOG", - default_value = "watch_pods=info,warn" - )] - log_level: kubert::LogFilter, - - /// The logging format - #[clap(long, default_value = "plain")] - log_format: kubert::LogFormat, + #[clap(flatten)] + log: kubert::LogArgs, #[clap(flatten)] client: kubert::ClientArgs, @@ -46,8 +37,7 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { let Args { - log_level, - log_format, + log, client, admin, exit, @@ -62,7 +52,7 @@ async fn main() -> Result<()> { // - an admin server with /live and /ready endpoints // - a tracing (logging) subscriber let rt = kubert::Runtime::builder() - .with_log(log_level, log_format) + .with_log_args(log) .with_admin(admin) .with_client(client); let mut runtime = match time::timeout_at(deadline, rt.build()).await { diff --git a/kubert/Cargo.toml b/kubert/Cargo.toml index c985ebe4..153e779a 100644 --- a/kubert/Cargo.toml +++ b/kubert/Cargo.toml @@ -23,7 +23,8 @@ index = [ "tracing", ] initialized = ["futures-core", "futures-util", "pin-project-lite", "tokio/sync"] -log = ["thiserror", "tracing", "tracing-subscriber"] +log = ["thiserror", "tracing", "tracing-subscriber", "once_cell"] +tokio-console = ["tracing-subscriber", "console-subscriber"] requeue = ["futures-core", "tokio/macros", "tokio/sync", "tokio-util/time", "tracing"] runtime = [ "admin", @@ -80,6 +81,7 @@ features = [ [dependencies] ahash = { version = "0.7", optional = true } +console-subscriber = { version = "0.1", optional = true } drain = { version = "0.1.1", optional = true, default-features = false } futures-core = { version = "0.3", optional = true, default-features = false } futures-util = { version = "0.3", optional = true, default-features = false } @@ -94,12 +96,13 @@ tokio-util = { version = "0.7", optional = true, default-features = false } tokio-rustls = { version = "0.23.2", optional = true, default-features = false } tower-service = { version = "0.3.1", optional = true } tracing = { version = "0.1.31", optional = true } +once_cell = { version = "1.10", optional = true } [dependencies.clap] version = "3.1.0" optional = true default-features = false -features = ["derive", "std"] +features = ["derive", "std", "env"] # Not used directly, but required to ensure that the k8s-openapi dependency is considered part of # the "deps" graph rather than just the "dev-deps" graph @@ -125,7 +128,7 @@ optional = true default-features = false [dependencies.tracing-subscriber] -version = "0.3.9" +version = "0.3.11" optional = true default-features = false features = [ @@ -146,4 +149,4 @@ tracing-subscriber = { version = "0.3", features = ["ansi"] } [dev-dependencies.tokio] version = "1.17" default-features = false -features = ["macros", "test-util"] +features = ["macros", "test-util", "rt-multi-thread"] diff --git a/kubert/src/admin.rs b/kubert/src/admin.rs index 0be205e8..b8fc43db 100644 --- a/kubert/src/admin.rs +++ b/kubert/src/admin.rs @@ -150,7 +150,8 @@ impl Bound { )) })); - let task = tokio::spawn( + let task = crate::spawn_named( + "kubert::admin", async move { debug!("Serving"); server.await diff --git a/kubert/src/lib.rs b/kubert/src/lib.rs index 4bc9abd1..b917ba3e 100644 --- a/kubert/src/lib.rs +++ b/kubert/src/lib.rs @@ -56,10 +56,34 @@ pub use self::client::ClientArgs; pub use self::initialized::Initialized; #[cfg(all(feature = "log"))] -pub use self::log::{LogFilter, LogFormat, LogInitError}; +pub use self::log::{LogArgs, LogFilter, LogFormat, LogInitError}; #[cfg(all(feature = "runtime"))] pub use self::runtime::Runtime; #[cfg(all(feature = "server"))] pub use self::server::ServerArgs; + +#[cfg(all(tokio_unstable, feature = "tokio-console"))] +#[track_caller] +pub(crate) fn spawn_named( + name: &'static str, + f: impl std::future::Future + Send + 'static, +) -> tokio::task::JoinHandle +where + T: Send + 'static, +{ + tokio::task::Builder::new().name(name).spawn(f) +} + +#[cfg(not(all(tokio_unstable, feature = "tokio-console")))] +#[track_caller] +pub(crate) fn spawn_named( + _: &'static str, + f: impl std::future::Future + Send + 'static, +) -> tokio::task::JoinHandle +where + T: Send + 'static, +{ + tokio::spawn(f) +} diff --git a/kubert/src/log.rs b/kubert/src/log.rs index e69da814..1128f8a2 100644 --- a/kubert/src/log.rs +++ b/kubert/src/log.rs @@ -4,10 +4,106 @@ use thiserror::Error; pub use tracing_subscriber::{util::TryInitError as LogInitError, EnvFilter as LogFilter}; +#[cfg(feature = "clap")] +use clap::{Arg, ArgEnum, ArgMatches, Args, Command, FromArgMatches}; +#[cfg(feature = "clap")] +use once_cell::sync::OnceCell; + +/// Configures logging settings. +/// +/// This type may be parsed from the command line using `clap`, or configured +/// manually. In some cases, it may be preferable to not use the default `clap` +/// implementation for `LogArgs`, so that environment variables, default log +/// targets, etc, may be overridden. +/// +/// # Examples +/// +/// If the default environment variable (`KUBERT_LOG`) and default value for the +/// log filter (`=info,warn`) are desired, the +/// `clap::FromArgMatches` impl for this type can be used directly: +/// +/// ```rust +/// use clap::Parser; +/// +/// #[derive(Parser)] +/// struct MyAppArgs { +/// #[clap(flatten)] +/// log: kubert::LogArgs, +/// // ... +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let args = MyAppArgs::parse(); +/// +/// let rt = kubert::Runtime::builder() +/// .with_log_args(args.log) +/// // ... +/// .build(); +/// # drop(rt); +/// } +/// ``` +/// +/// Alternatively, a `LogArgs` instance may be constructed from user-defined values: +/// +/// ```rust +/// use clap::Parser; +/// use kubert::log::{LogFilter, LogFormat}; +/// +/// #[derive(Parser)] +/// struct MyAppArgs { +/// // Use a different environment variable and default value than +/// // those provided by the `FromArgMatches` impl for `LogArgs` +/// #[clap(long, env = "MY_APP_LOG", default_value = "trace")] +/// log_filter: LogFilter, +/// +/// #[clap(long, env = "MY_APP_LOG_FORMAT", default_value = "json")] +/// log_format: LogFormat, +/// +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let args = MyAppArgs::parse(); +/// +/// // Construct a `LogArgs` from the values we parsed using our +/// // custom configuration: +/// let log_args = kubert::LogArgs::default() +/// .with_log_format(args.log_format) +/// .with_log_level(args.log_filter); +/// +/// let rt = kubert::Runtime::builder() +/// .with_log_args(log_args) +/// // ... +/// .build(); +/// # drop(rt); +/// } +/// ``` +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "log")))] +#[must_use] +pub struct LogArgs { + /// The log format to use. + pub log_format: LogFormat, + + /// The filter that determines what tracing spans and events are enabled. + pub log_level: LogFilter, + + /// Enables tokio-console support. + /// + /// If this is set, `kubert` must be compiled with the `tokio-console` cargo + /// feature enabled and `RUSTFLAGS="--cfg tokio_unstable"` must be set. + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + pub tokio_console: bool, + + pub(crate) _p: (), +} + /// Configures whether logs should be emitted in plaintext (the default) or as JSON-encoded /// messages #[derive(Clone, Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "log")))] +#[cfg_attr(feature = "clap", derive(ArgEnum))] pub enum LogFormat { /// The default plaintext format Plain, @@ -43,18 +139,86 @@ impl std::str::FromStr for LogFormat { } impl LogFormat { - /// Attempts to configure the global default tracing subscriber in the current scope, returning + /// Attempts to configure the global default `tracing` subscriber in the current scope, returning /// an error if one is already set /// - /// This method returns an error if a global default subscriber has already been set, or if a + /// This method returns an error if [a global default subscriber has already been set][set], or if a /// `log` logger has already been set. + /// + /// [set]: https://docs.rs/tracing-subscriber + #[deprecated(since = "0.6.1", note = "use `LogArgs::try_init` instead")] pub fn try_init(self, filter: LogFilter) -> Result<(), LogInitError> { + LogArgs { + log_format: self, + log_level: filter, + + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + tokio_console: false, + _p: (), + } + .try_init() + } +} + +// === impl LogArgs === + +impl LogArgs { + /// Sets the [`LogFormat`] used by this `LogArgs`. + /// + /// This can be used for "builder-style" configuration of `LogArgs`, or to + /// override values parsed from the command line using `clap`. + /// + /// # Examples + /// + /// ``` + /// let log_format = // ... + /// # Default::default(); + /// + /// let log_args = kubert::LogArgs::default() + /// .with_log_format(log_format); + /// # drop(log_args); + /// ``` + pub fn with_log_format(self, log_format: LogFormat) -> Self { + Self { log_format, ..self } + } + + /// Sets the [`LogLevel`] used by this `LogArgs`. + /// + /// This can be used for "builder-style" configuration of `LogArgs`, or to + /// override values parsed from the command line using `clap`. + /// + /// # Examples + /// + /// ``` + /// let filter = // ... + /// # Default::default(); + /// + /// let log_args = kubert::LogArgs::default() + /// .with_log_level(filter); + /// # drop(log_args); + /// ``` + pub fn with_log_level(self, log_level: LogFilter) -> Self { + Self { log_level, ..self } + } + + /// Attempts to configure the global default `tracing` subscriber in the current scope, returning + /// an error if one is already set + /// + /// This method returns an error if [a global default subscriber has already been set][set], or if a + /// `log` logger has already been set. + pub fn try_init(self) -> Result<(), LogInitError> { use tracing_subscriber::prelude::*; - let registry = tracing_subscriber::registry().with(filter); + let registry = tracing_subscriber::registry(); + + // TODO(eliza): can we serve the tokio console server on the Admin server? + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + let registry = registry.with(self.tokio_console.then(console_subscriber::spawn)); - match self { - LogFormat::Plain => registry.with(tracing_subscriber::fmt::layer()).try_init()?, + match self.log_format { + LogFormat::Plain => registry + .with(tracing_subscriber::fmt::layer().with_filter(self.log_level)) + .try_init()?, LogFormat::Json => { let event_fmt = tracing_subscriber::fmt::format() @@ -69,7 +233,8 @@ impl LogFormat { // Use the JSON event formatter and the JSON field formatter. let fmt = tracing_subscriber::fmt::layer() .event_format(event_fmt) - .fmt_fields(tracing_subscriber::fmt::format::JsonFields::default()); + .fmt_fields(tracing_subscriber::fmt::format::JsonFields::default()) + .with_filter(self.log_level); registry.with(fmt).try_init()? } @@ -78,3 +243,114 @@ impl LogFormat { Ok(()) } } + +// This hand-implements `Args` in order to generate some values based +// on the command's name. +impl Args for LogArgs { + fn augment_args(cmd: Command<'_>) -> Command<'_> { + let level = Arg::new("log-level") + .long("log-level") + .takes_value(true) + .env("KUBERT_LOG") + .help("The filter that determines what tracing spans and events are enabled") + .long_help( + // XXX(eliza): had to use tinyurl because `clap` would line-wrap the docs.rs URL :( + "The filter that determines what tracing spans and events are enabled.\n\n\ + See here for details on the accepted syntax for tracing filters:\n\ + https://tinyurl.com/envfilter-directives", + ) + .default_value(default_log_filter(&cmd)); + let format = Arg::new("log-format") + .long("log-format") + .takes_value(true) + .help("Which log format to use.") + .possible_values( + LogFormat::value_variants() + .iter() + .filter_map(LogFormat::to_possible_value), + ) + .default_value("plain"); + + let cmd = cmd.arg(level).arg(format); + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + let cmd = cmd.arg( + Arg::new("tokio-console") + .long("tokio-console") + .takes_value(false) + .help("Enables `tokio-console` instrumentation") + .long_help( + "Enables `tokio-console` instrumentation.\n\n\ + If this is set, `kubert` must be compiled with the \ + `tokio-console` cargo feature enabled, and \ + `RUSTFLAGS=\"--cfg tokio_unstable\"` must be set.", + ), + ); + cmd + } + fn augment_args_for_update(cmd: Command<'_>) -> Command<'_> { + Self::augment_args(cmd) + } +} + +impl FromArgMatches for LogArgs { + fn from_arg_matches(matches: &ArgMatches) -> Result { + // The `log_level` and `log_format` arguments both have default values, + // so we expect they will always be present. + let log_level = matches.value_of_t::("log-level")?; + let log_format = matches.value_of_t::("log-format")?; + + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + let tokio_console = matches.is_present("tokio-console"); + + Ok(Self { + log_level, + log_format, + + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + tokio_console, + _p: (), + }) + } + + fn update_from_arg_matches(&mut self, matches: &ArgMatches) -> Result<(), clap::Error> { + self.log_level = matches.value_of_t::("log-level")?; + self.log_format = matches.value_of_t::("log-format")?; + + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + { + self.tokio_console = matches.is_present("tokio-console"); + } + + Ok(()) + } +} + +impl Default for LogArgs { + fn default() -> Self { + Self { + log_format: LogFormat::Plain, + log_level: DEFAULT_FILTER + .get() + .and_then(|default| default.parse().ok()) + .unwrap_or_else(|| { + LogFilter::default() + .add_directive(tracing_subscriber::filter::LevelFilter::WARN.into()) + }), + + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + tokio_console: false, + _p: (), + } + } +} + +static DEFAULT_FILTER: OnceCell = OnceCell::new(); + +fn default_log_filter(cmd: &Command<'_>) -> &'static str { + DEFAULT_FILTER.get_or_init(|| { + let name = cmd.get_bin_name().unwrap_or_else(|| cmd.get_name()); + let mut filter = name.replace('-', "_"); + filter.push_str("=info,warn"); + filter + }) +} diff --git a/kubert/src/runtime.rs b/kubert/src/runtime.rs index 1d980208..94aaffc0 100644 --- a/kubert/src/runtime.rs +++ b/kubert/src/runtime.rs @@ -7,7 +7,7 @@ use crate::{ client::{self, Client, ClientArgs}, errors, initialized::{self, Initialized}, - shutdown, LogFilter, LogFormat, LogInitError, + shutdown, LogArgs, LogFilter, LogFormat, LogInitError, }; use futures_core::Stream; use kube_core::{params::ListParams, Resource}; @@ -28,7 +28,7 @@ pub struct Builder { admin: Option, client: Option, error_delay: Option, - log: Option, + log: Option, #[cfg(feature = "server")] server: S, @@ -91,12 +91,6 @@ pub enum BuildError { Signal(#[from] shutdown::RegisterError), } -#[derive(Debug)] -struct LogSettings { - filter: LogFilter, - format: LogFormat, -} - // === impl Builder === impl Builder { @@ -114,9 +108,23 @@ impl Builder { self } - /// Configures the runtime to use the given logging configuration + /// Configures the runtime to use the given logging configuration. + #[deprecated(since = "0.6.1", note = "use `Builder::with_log_args` instead.")] pub fn with_log(mut self, filter: LogFilter, format: LogFormat) -> Self { - self.log = Some(LogSettings { filter, format }); + self.log = Some(LogArgs { + log_level: filter, + log_format: format, + + #[cfg(all(tokio_unstable, feature = "tokio-console"))] + tokio_console: false, + _p: (), + }); + self + } + + /// Configures the runtime to use the given logging configuration + pub fn with_log_args(mut self, log_args: LogArgs) -> Self { + self.log = Some(log_args); self } @@ -500,7 +508,7 @@ impl Runtime { // Set the admin readiness to succeed once all initilization handles have been released. let ready = admin.readiness(); - tokio::spawn(async move { + crate::spawn_named("kubert::ready", async move { initialized.initialized().await; ready.set(true); tracing::debug!("initialized"); @@ -515,20 +523,3 @@ impl Runtime { Ok(()) } } - -// === impl LogSettings === - -impl Default for LogSettings { - fn default() -> Self { - Self { - filter: LogFilter::from_default_env(), - format: LogFormat::default(), - } - } -} - -impl LogSettings { - fn try_init(self) -> Result<(), LogInitError> { - self.format.try_init(self.filter) - } -} diff --git a/kubert/src/server.rs b/kubert/src/server.rs index af6edf38..4c40a609 100644 --- a/kubert/src/server.rs +++ b/kubert/src/server.rs @@ -148,7 +148,8 @@ impl Bound { tls_certs, } = self; - let task = tokio::spawn( + let task = crate::spawn_named( + "kubert::server", accept_loop(tcp, drain, service, tls_key, tls_certs) .instrument(info_span!("server", port = %local_addr.port())), ); @@ -219,7 +220,8 @@ async fn accept_loop( } }; - tokio::spawn( + crate::spawn_named( + "kubert::conn", serve_conn( socket, drain.clone(),