From 4127dd1078333b8829fd2f7245d47d49e06ee575 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Tue, 6 May 2025 13:29:22 -0400 Subject: [PATCH 01/11] Remove hacky sleep when port forwarding - Instead of waiting until port forwarding, we read from stderr until we know for sure port forwarding has started - Renamed `create_kubectl_port_forwarder` to `create_pg_wire_port_forwarder` since we'll be port forwarding more than once - Removed the retry mechanism given we don't see the port forwarding disconnecting too often and we can't do it AND initial acknowledgement together easily - Tie child process to a struct instead of a tokio spawn. --- src/mz-debug/src/kubectl_port_forwarder.rs | 140 +++++++++------------ src/mz-debug/src/main.rs | 43 ++++--- 2 files changed, 85 insertions(+), 98 deletions(-) diff --git a/src/mz-debug/src/kubectl_port_forwarder.rs b/src/mz-debug/src/kubectl_port_forwarder.rs index fd52139400ffd..d27a6d4d1c05f 100644 --- a/src/mz-debug/src/kubectl_port_forwarder.rs +++ b/src/mz-debug/src/kubectl_port_forwarder.rs @@ -19,14 +19,12 @@ use anyhow::Result; use k8s_openapi::api::core::v1::Service; use kube::api::ListParams; use kube::{Api, Client}; +use tokio::io::AsyncBufReadExt; -use std::time::Duration; - -use mz_ore::retry::{self, RetryResult}; -use tracing::{info, warn}; +use tracing::info; use crate::SelfManagedDebugMode; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct KubectlPortForwarder { pub namespace: String, pub service_name: String, @@ -36,89 +34,73 @@ pub struct KubectlPortForwarder { pub context: Option, } -impl KubectlPortForwarder { - /// Port forwards a given k8s service via Kubectl. - /// The process will retry if the port-forwarding fails and - /// will terminate once the port forwarding reaches the max number of retries. - /// We retry since kubectl port-forward is flaky. - pub async fn port_forward(&self) { - if let Err(err) = retry::Retry::default() - .max_duration(Duration::from_secs(60)) - .retry_async(|retry_state| { - let k8s_context = self.context.clone(); - let namespace = self.namespace.clone(); - let service_name = self.service_name.clone(); - let local_address = self.local_address.clone(); - let local_port = self.local_port; - let target_port = self.target_port; - - info!( - "Spawning port forwarding process for {} from ports {}:{} -> {}", - service_name, local_address, local_port, target_port - ); +pub struct PortForwardConnection { + // tokio process that's killed on drop + pub _port_forward_process: tokio::process::Child, +} - async move { - let port_arg_str = format!("{}:{}", &local_port, &target_port); - let service_name_arg_str = format!("services/{}", &service_name); - let mut args = vec![ - "port-forward", - &service_name_arg_str, - &port_arg_str, - "-n", - &namespace, - "--address", - &local_address, - ]; - - if let Some(k8s_context) = &k8s_context { - args.extend(["--context", k8s_context]); - } +impl KubectlPortForwarder { + /// Spawns a port forwarding process that resolves when + /// the port forward is established. + pub async fn spawn_port_forward(&self) -> Result { + let port_arg_str = format!("{}:{}", &self.local_port, &self.target_port); + let service_name_arg_str = format!("services/{}", &self.service_name); + let mut args = vec![ + "port-forward", + &service_name_arg_str, + &port_arg_str, + "-n", + &self.namespace, + "--address", + &self.local_address, + ]; + + if let Some(k8s_context) = &self.context { + args.extend(["--context", k8s_context]); + } - match tokio::process::Command::new("kubectl") - .args(args) - // Silence stdout/stderr - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .kill_on_drop(true) - .output() - .await - { - Ok(output) => { - if !output.status.success() { - let retry_err_msg = format!( - "Failed to port-forward{}: {}", - retry_state.next_backoff.map_or_else( - || "".to_string(), - |d| format!(", retrying in {:?}", d) - ), - String::from_utf8_lossy(&output.stderr) - ); - warn!("{}", retry_err_msg); - - return RetryResult::RetryableErr(anyhow::anyhow!(retry_err_msg)); - } - } - Err(err) => { - return RetryResult::RetryableErr(anyhow::anyhow!( - "Failed to port-forward: {}", - err - )); + let child = tokio::process::Command::new("kubectl") + .args(args) + // Silence stdout + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::piped()) + .kill_on_drop(true) + .spawn(); + + if let Ok(mut child) = child { + if let Some(stderr) = child.stderr.take() { + let stderr_reader = tokio::io::BufReader::new(stderr); + // Wait until we know port forwarding is established + let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { + let mut lines = stderr_reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + if line.contains("Forwarding from") { + break; } } - // The kubectl subprocess's future will only resolve on error, thus the - // code here is unreachable. We return RetryResult::Ok to satisfy - // the type checker. - RetryResult::Ok(()) + }) + .await; + + if timeout.is_err() { + return Err(anyhow::anyhow!("Port forwarding timed out after 5 seconds")); } - }) - .await - { - warn!("{}", err); + + info!( + "Port forwarding established for {} from ports {}:{} -> {}", + &self.service_name, &self.local_address, &self.local_port, &self.target_port + ); + + return Ok(PortForwardConnection { + _port_forward_process: child, + }); + } } + Err(anyhow::anyhow!("Failed to spawn port forwarding process")) } } -pub async fn create_kubectl_port_forwarder( +/// Creates a port forwarder for the external pg wire port of balancerd. +pub async fn create_pg_wire_port_forwarder( client: &Client, args: &SelfManagedDebugMode, ) -> Result { diff --git a/src/mz-debug/src/main.rs b/src/mz-debug/src/main.rs index cc3c8ebd7b248..a0a1892ddd005 100644 --- a/src/mz-debug/src/main.rs +++ b/src/mz-debug/src/main.rs @@ -19,7 +19,6 @@ use kube::{Client, Config}; use mz_build_info::{BuildInfo, build_info}; use mz_ore::cli::{self, CliConfig}; use mz_ore::error::ErrorExt; -use mz_ore::task; use tracing::{error, info, warn}; use tracing_subscriber::EnvFilter; use tracing_subscriber::layer::SubscriberExt; @@ -27,7 +26,7 @@ use tracing_subscriber::util::SubscriberInitExt; use crate::docker_dumper::DockerDumper; use crate::k8s_dumper::K8sDumper; -use crate::kubectl_port_forwarder::create_kubectl_port_forwarder; +use crate::kubectl_port_forwarder::create_pg_wire_port_forwarder; use crate::utils::{ create_tracing_log_file, format_base_path, validate_pg_connection_string, zip_debug_folder, }; @@ -63,6 +62,8 @@ pub struct SelfManagedDebugMode { /// If true, the tool will dump the values of secrets in the Kubernetes cluster. #[clap(long, default_value = "false", action = clap::ArgAction::Set)] k8s_dump_secret_values: bool, + // TODO (debug_tool1): Convert port forwarding variables into a map since we'll be + // portforwarding multiple times /// If true, the tool will automatically port-forward the external SQL port in the Kubernetes cluster. /// If dump_k8s is false however, we will not automatically port-forward. #[clap(long, default_value = "true", action = clap::ArgAction::Set)] @@ -221,8 +222,9 @@ async fn main() { async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> { // Depending on if the user is debugging either a k8s environments or docker environment, - // dump the respective system's resources. - let container_system_dumper = match &args.debug_mode { + // dump the respective system's resources and spin up a port forwarder if auto port forwarding. + // We'd like to keep the port forwarder alive until the end of the program. + let (container_system_dumper, _pg_wire_port_forward_connection) = match &args.debug_mode { DebugMode::SelfManaged(args) => { if args.dump_k8s { let client = match create_k8s_client(args.k8s_context.clone()).await { @@ -232,17 +234,20 @@ async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> { return Err(e); } }; - - if args.auto_port_forward { - let port_forwarder = create_kubectl_port_forwarder(&client, args).await?; - task::spawn(|| "port-forwarding", async move { - port_forwarder.port_forward().await; - }); - // There may be a delay between when the port forwarding process starts and when it's ready - // to use. We wait a few seconds to ensure that port forwarding is ready. - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - } - + let port_forward_connection = if args.auto_port_forward { + let port_forwarder = create_pg_wire_port_forwarder(&client, args).await?; + + match port_forwarder.spawn_port_forward().await { + Ok(connection) => Some(connection), + Err(err) => { + warn!("{}", err); + None + } + } + } else { + None + }; + // Parker: Return it as a tuple (dumper, port_forwarder) let dumper = ContainerServiceDumper::new_k8s_dumper( &context, client, @@ -250,9 +255,9 @@ async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> { args.k8s_context.clone(), args.k8s_dump_secret_values, ); - Some(dumper) + (Some(dumper), port_forward_connection) } else { - None + (None, None) } } DebugMode::Emulator(args) => { @@ -263,9 +268,9 @@ async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> { .expect("docker_container_id is required"); let dumper = ContainerServiceDumper::new_docker_dumper(&context, docker_container_id); - Some(dumper) + (Some(dumper), None) } else { - None + (None, None) } } }; From af6ed6a36920f6bb056f6603c3bca173a8dd7092 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Wed, 7 May 2025 16:08:21 -0400 Subject: [PATCH 02/11] Preprocess args into context We create an initialization function that preprocesses as much as possible and maps args to context values. --- src/mz-debug/src/kubectl_port_forwarder.rs | 14 +- src/mz-debug/src/main.rs | 199 ++++++++++++++------- src/mz-debug/src/system_catalog_dumper.rs | 10 +- 3 files changed, 146 insertions(+), 77 deletions(-) diff --git a/src/mz-debug/src/kubectl_port_forwarder.rs b/src/mz-debug/src/kubectl_port_forwarder.rs index d27a6d4d1c05f..e645da286ce1e 100644 --- a/src/mz-debug/src/kubectl_port_forwarder.rs +++ b/src/mz-debug/src/kubectl_port_forwarder.rs @@ -23,7 +23,6 @@ use tokio::io::AsyncBufReadExt; use tracing::info; -use crate::SelfManagedDebugMode; #[derive(Debug)] pub struct KubectlPortForwarder { pub namespace: String, @@ -102,9 +101,12 @@ impl KubectlPortForwarder { /// Creates a port forwarder for the external pg wire port of balancerd. pub async fn create_pg_wire_port_forwarder( client: &Client, - args: &SelfManagedDebugMode, + k8s_context: &Option, + k8s_namespaces: &Vec, + port_forward_local_address: &String, + port_forward_local_port: i32, ) -> Result { - for namespace in &args.k8s_namespaces { + for namespace in k8s_namespaces { let services: Api = Api::namespaced(client.clone(), namespace); let services = services .list(&ListParams::default().labels("materialize.cloud/mz-resource-id")) @@ -131,12 +133,12 @@ pub async fn create_pg_wire_port_forwarder( // We want to find the external SQL port and not the internal one if port_name.to_lowercase().contains("pgwire") { return Some(KubectlPortForwarder { - context: args.k8s_context.clone(), + context: k8s_context.clone(), namespace: namespace.clone(), service_name: service_name.to_owned(), target_port: port_info.port, - local_address: args.port_forward_local_address.clone(), - local_port: args.port_forward_local_port, + local_address: port_forward_local_address.clone(), + local_port: port_forward_local_port, }); } } diff --git a/src/mz-debug/src/main.rs b/src/mz-debug/src/main.rs index a0a1892ddd005..6fd51cd6dd70c 100644 --- a/src/mz-debug/src/main.rs +++ b/src/mz-debug/src/main.rs @@ -10,12 +10,12 @@ //! Debug tool for self managed environments. use std::path::PathBuf; use std::process; -use std::sync::LazyLock; +use std::sync::{Arc, LazyLock}; use chrono::{DateTime, Utc}; use clap::Parser; use kube::config::KubeConfigOptions; -use kube::{Client, Config}; +use kube::{Client as KubernetesClient, Config}; use mz_build_info::{BuildInfo, build_info}; use mz_ore::cli::{self, CliConfig}; use mz_ore::error::ErrorExt; @@ -26,12 +26,13 @@ use tracing_subscriber::util::SubscriberInitExt; use crate::docker_dumper::DockerDumper; use crate::k8s_dumper::K8sDumper; -use crate::kubectl_port_forwarder::create_pg_wire_port_forwarder; +use crate::kubectl_port_forwarder::{PortForwardConnection, create_pg_wire_port_forwarder}; use crate::utils::{ create_tracing_log_file, format_base_path, validate_pg_connection_string, zip_debug_folder, }; mod docker_dumper; +mod internal_http_dumper; mod k8s_dumper; mod kubectl_port_forwarder; mod system_catalog_dumper; @@ -42,7 +43,7 @@ static VERSION: LazyLock = LazyLock::new(|| BUILD_INFO.human_version(Non static ENV_FILTER: &str = "mz_debug=info"; #[derive(Parser, Debug, Clone)] -pub struct SelfManagedDebugMode { +pub struct SelfManagedDebugModeArgs { // === Kubernetes options. === /// If true, the tool will dump debug information in Kubernetes cluster such as logs, pod describes, etc. #[clap(long, default_value = "true", action = clap::ArgAction::Set)] @@ -87,18 +88,13 @@ pub struct SelfManagedDebugMode { } #[derive(Parser, Debug, Clone)] -pub struct EmulatorDebugMode { +pub struct EmulatorDebugModeArgs { /// If true, the tool will dump debug information of the docker container. #[clap(long, default_value = "true", action = clap::ArgAction::Set)] dump_docker: bool, /// The ID of the docker container to dump. - #[clap( - long, - // We require both `require`s because `required_if_eq` doesn't work for default values. - required_unless_present = "dump_docker", - required_if_eq("dump_docker", "true") - )] - docker_container_id: Option, + #[clap(long)] + docker_container_id: String, /// The URL of the Materialize SQL connection used to dump the system catalog. /// An example URL is `postgres://root@127.0.0.1:6875/materialize?sslmode=disable`. // TODO(debug_tool3): Allow users to specify the pgconfig via separate variables @@ -113,18 +109,18 @@ pub struct EmulatorDebugMode { } #[derive(Parser, Debug, Clone)] -pub enum DebugMode { +pub enum DebugModeArgs { /// Debug self-managed environments - SelfManaged(SelfManagedDebugMode), + SelfManaged(SelfManagedDebugModeArgs), /// Debug emulator environments - Emulator(EmulatorDebugMode), + Emulator(EmulatorDebugModeArgs), } #[derive(Parser, Debug, Clone)] #[clap(name = "mz-debug", next_line_help = true, version = VERSION.as_str())] pub struct Args { #[clap(subcommand)] - debug_mode: DebugMode, + debug_mode_args: DebugModeArgs, /// If true, the tool will dump the system catalog in Materialize. #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)] dump_system_catalog: bool, @@ -141,7 +137,7 @@ pub enum ContainerServiceDumper<'n> { impl<'n> ContainerServiceDumper<'n> { fn new_k8s_dumper( context: &'n Context, - client: Client, + client: KubernetesClient, k8s_namespaces: Vec, k8s_context: Option, k8s_dump_secret_values: bool, @@ -168,10 +164,35 @@ impl<'n> ContainerDumper for ContainerServiceDumper<'n> { } } } +#[derive(Clone)] +struct SelfManagedContext { + dump_k8s: bool, + k8s_client: KubernetesClient, + k8s_context: Option, + k8s_namespaces: Vec, + k8s_dump_secret_values: bool, + + _mz_port_forward_process: Arc>, +} +#[derive(Clone)] +struct EmulatorContext { + dump_docker: bool, + docker_container_id: String, +} + +#[derive(Clone)] +enum DebugModeContext { + SelfManaged(SelfManagedContext), + Emulator(EmulatorContext), +} #[derive(Clone)] pub struct Context { start_time: DateTime, + debug_mode_context: DebugModeContext, + + mz_connection_url: String, + dump_system_catalog: bool, } #[tokio::main] @@ -208,9 +229,13 @@ async fn main() { .try_init(); } - let context = Context { start_time }; + let initialize_then_run = async move { + // Preprocess args into contexts + let context = initialize_context(args, start_time).await?; + run(context).await + }; - if let Err(err) = run(context, args).await { + if let Err(err) = initialize_then_run.await { error!( "mz-debug: fatal: {}\nbacktrace: {}", err.display_with_causes(), @@ -220,25 +245,32 @@ async fn main() { } } -async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> { - // Depending on if the user is debugging either a k8s environments or docker environment, - // dump the respective system's resources and spin up a port forwarder if auto port forwarding. - // We'd like to keep the port forwarder alive until the end of the program. - let (container_system_dumper, _pg_wire_port_forward_connection) = match &args.debug_mode { - DebugMode::SelfManaged(args) => { - if args.dump_k8s { - let client = match create_k8s_client(args.k8s_context.clone()).await { - Ok(client) => client, +async fn initialize_context( + args: Args, + start_time: DateTime, +) -> Result { + let (debug_mode_context, mz_connection_url) = match &args.debug_mode_args { + DebugModeArgs::SelfManaged(args) => { + let k8s_client = match create_k8s_client(args.k8s_context.clone()).await { + Ok(k8s_client) => k8s_client, Err(e) => { error!("Failed to create k8s client: {}", e); return Err(e); } }; - let port_forward_connection = if args.auto_port_forward { - let port_forwarder = create_pg_wire_port_forwarder(&client, args).await?; + + let _mz_port_forward_process = if args.auto_port_forward { + let port_forwarder = create_pg_wire_port_forwarder( + &k8s_client, + &args.k8s_context, + &args.k8s_namespaces, + &args.port_forward_local_address, + args.port_forward_local_port, + ) + .await?; match port_forwarder.spawn_port_forward().await { - Ok(connection) => Some(connection), + Ok(process) => Some(process), Err(err) => { warn!("{}", err); None @@ -247,54 +279,85 @@ async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> { } else { None }; - // Parker: Return it as a tuple (dumper, port_forwarder) - let dumper = ContainerServiceDumper::new_k8s_dumper( + let mz_connection_url = kubectl_port_forwarder::create_mz_connection_url( + args.port_forward_local_address.clone(), + args.port_forward_local_port, + args.mz_connection_url.clone(), + ); + ( + DebugModeContext::SelfManaged(SelfManagedContext { + dump_k8s: args.dump_k8s, + k8s_client, + k8s_context: args.k8s_context.clone(), + k8s_namespaces: args.k8s_namespaces.clone(), + k8s_dump_secret_values: args.k8s_dump_secret_values, + _mz_port_forward_process: Arc::new(_mz_port_forward_process), + }), + mz_connection_url, + ) + } + DebugModeArgs::Emulator(args) => ( + DebugModeContext::Emulator(EmulatorContext { + dump_docker: args.dump_docker, + docker_container_id: args.docker_container_id.clone(), + }), + args.mz_connection_url.clone(), + ), + }; + + Ok(Context { + start_time, + debug_mode_context, + mz_connection_url, + dump_system_catalog: args.dump_system_catalog, + }) +} + +async fn run(context: Context) -> Result<(), anyhow::Error> { + // Depending on if the user is debugging either a k8s environments or docker environment, + // dump the respective system's resources + let container_system_dumper = match &context.debug_mode_context { + DebugModeContext::SelfManaged(SelfManagedContext { + k8s_client, + dump_k8s, + k8s_context, + k8s_namespaces, + k8s_dump_secret_values, + .. + }) => { + if *dump_k8s { + Some(ContainerServiceDumper::new_k8s_dumper( &context, - client, - args.k8s_namespaces.clone(), - args.k8s_context.clone(), - args.k8s_dump_secret_values, - ); - (Some(dumper), port_forward_connection) + k8s_client.clone(), + k8s_namespaces.clone(), + k8s_context.clone(), + *k8s_dump_secret_values, + )) } else { - (None, None) + None } } - DebugMode::Emulator(args) => { - if args.dump_docker { - let docker_container_id = args - .docker_container_id - .clone() - .expect("docker_container_id is required"); - let dumper = - ContainerServiceDumper::new_docker_dumper(&context, docker_container_id); - (Some(dumper), None) + DebugModeContext::Emulator(EmulatorContext { + dump_docker, + docker_container_id, + }) => { + if *dump_docker { + Some(ContainerServiceDumper::new_docker_dumper( + &context, + docker_container_id.clone(), + )) } else { - (None, None) + None } } }; - if let Some(dumper) = container_system_dumper { dumper.dump_container_resources().await; } - let connection_url = match &args.debug_mode { - DebugMode::SelfManaged(args) => kubectl_port_forwarder::create_mz_connection_url( - args.port_forward_local_address.clone(), - args.port_forward_local_port, - args.mz_connection_url.clone(), - ), - DebugMode::Emulator(args) => args.mz_connection_url.clone(), - }; - if args.dump_system_catalog { + if context.dump_system_catalog { // Dump the system catalog. - let catalog_dumper = match system_catalog_dumper::SystemCatalogDumper::new( - &context, - &connection_url, - ) - .await - { + let catalog_dumper = match system_catalog_dumper::SystemCatalogDumper::new(&context).await { Ok(dumper) => Some(dumper), Err(e) => { warn!("Failed to dump system catalog: {}", e); @@ -323,7 +386,7 @@ async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> { } /// Creates a k8s client given a context. If no context is provided, the default context is used. -async fn create_k8s_client(k8s_context: Option) -> Result { +async fn create_k8s_client(k8s_context: Option) -> Result { let kubeconfig_options = KubeConfigOptions { context: k8s_context, ..Default::default() @@ -331,7 +394,7 @@ async fn create_k8s_client(k8s_context: Option) -> Result SystemCatalogDumper<'n> { - pub async fn new(context: &'n Context, connection_string: &str) -> Result { - let (pg_client, pg_conn, pg_tls) = create_postgres_connection(connection_string).await?; + pub async fn new(context: &'n Context) -> Result { + let (pg_client, pg_conn, pg_tls) = + create_postgres_connection(&context.mz_connection_url).await?; - info!("Connected to PostgreSQL server at {}...", connection_string); + info!( + "Connected to PostgreSQL server at {}", + context.mz_connection_url + ); let handle = task::spawn(|| "postgres-connection", pg_conn); From 01d54c2d5575e20a74f47b20d5ecccb0ed9ad298 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Wed, 7 May 2025 16:16:29 -0400 Subject: [PATCH 03/11] Remove redundant enum I kept the trait but realized the enum is a bit redundant, especially since we have the Context enum --- src/mz-debug/src/main.rs | 57 +++++----------------------------------- 1 file changed, 7 insertions(+), 50 deletions(-) diff --git a/src/mz-debug/src/main.rs b/src/mz-debug/src/main.rs index 6fd51cd6dd70c..9102c0fbfc6ad 100644 --- a/src/mz-debug/src/main.rs +++ b/src/mz-debug/src/main.rs @@ -129,41 +129,6 @@ pub struct Args { pub trait ContainerDumper { fn dump_container_resources(&self) -> impl std::future::Future; } -pub enum ContainerServiceDumper<'n> { - K8s(K8sDumper<'n>), - Docker(DockerDumper), -} - -impl<'n> ContainerServiceDumper<'n> { - fn new_k8s_dumper( - context: &'n Context, - client: KubernetesClient, - k8s_namespaces: Vec, - k8s_context: Option, - k8s_dump_secret_values: bool, - ) -> Self { - Self::K8s(K8sDumper::new( - context, - client, - k8s_namespaces, - k8s_context, - k8s_dump_secret_values, - )) - } - - fn new_docker_dumper(context: &'n Context, docker_container_id: String) -> Self { - Self::Docker(DockerDumper::new(context, docker_container_id)) - } -} - -impl<'n> ContainerDumper for ContainerServiceDumper<'n> { - async fn dump_container_resources(&self) { - match self { - ContainerServiceDumper::K8s(dumper) => dumper.dump_container_resources().await, - ContainerServiceDumper::Docker(dumper) => dumper.dump_container_resources().await, - } - } -} #[derive(Clone)] struct SelfManagedContext { dump_k8s: bool, @@ -316,7 +281,7 @@ async fn initialize_context( async fn run(context: Context) -> Result<(), anyhow::Error> { // Depending on if the user is debugging either a k8s environments or docker environment, // dump the respective system's resources - let container_system_dumper = match &context.debug_mode_context { + match &context.debug_mode_context { DebugModeContext::SelfManaged(SelfManagedContext { k8s_client, dump_k8s, @@ -326,15 +291,14 @@ async fn run(context: Context) -> Result<(), anyhow::Error> { .. }) => { if *dump_k8s { - Some(ContainerServiceDumper::new_k8s_dumper( + let dumper = K8sDumper::new( &context, k8s_client.clone(), k8s_namespaces.clone(), k8s_context.clone(), *k8s_dump_secret_values, - )) - } else { - None + ); + dumper.dump_container_resources().await; } } DebugModeContext::Emulator(EmulatorContext { @@ -342,18 +306,11 @@ async fn run(context: Context) -> Result<(), anyhow::Error> { docker_container_id, }) => { if *dump_docker { - Some(ContainerServiceDumper::new_docker_dumper( - &context, - docker_container_id.clone(), - )) - } else { - None - } - } - }; - if let Some(dumper) = container_system_dumper { + let dumper = DockerDumper::new(&context, docker_container_id.clone()); dumper.dump_container_resources().await; } + } + }; if context.dump_system_catalog { // Dump the system catalog. From 5e1a0f08d6bbdd6dca824976f2defccaa7bae712 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Thu, 8 May 2025 00:26:10 -0400 Subject: [PATCH 04/11] Save base_path to context Given we're always creating a PathBuf from start time, it makes sense to just pass the PathBuf itself --- src/mz-debug/src/docker_dumper.rs | 5 +---- src/mz-debug/src/k8s_dumper.rs | 18 ++++++++++-------- src/mz-debug/src/main.rs | 20 ++++++++------------ src/mz-debug/src/system_catalog_dumper.rs | 20 +++++++++----------- src/mz-debug/src/utils.rs | 3 +-- 5 files changed, 29 insertions(+), 37 deletions(-) diff --git a/src/mz-debug/src/docker_dumper.rs b/src/mz-debug/src/docker_dumper.rs index 73f11f686a877..5f48d9171b912 100644 --- a/src/mz-debug/src/docker_dumper.rs +++ b/src/mz-debug/src/docker_dumper.rs @@ -23,7 +23,6 @@ use std::time::Duration; use mz_ore::retry::{self, RetryResult}; use tracing::{info, warn}; -use crate::utils::format_base_path; use crate::{ContainerDumper, Context}; static DOCKER_RESOURCE_DUMP_TIMEOUT: Duration = Duration::from_secs(30); @@ -36,9 +35,7 @@ pub struct DockerDumper { impl DockerDumper { pub fn new(context: &Context, container_id: String) -> Self { Self { - directory_path: format_base_path(context.start_time) - .join("docker") - .join(&container_id), + directory_path: context.base_path.join("docker").join(&container_id), container_id, } } diff --git a/src/mz-debug/src/k8s_dumper.rs b/src/mz-debug/src/k8s_dumper.rs index bfcd19b72ff80..cba72dc7fd8e5 100644 --- a/src/mz-debug/src/k8s_dumper.rs +++ b/src/mz-debug/src/k8s_dumper.rs @@ -22,7 +22,6 @@ use std::io::Write; use std::path::PathBuf; use std::pin::Pin; -use chrono::{DateTime, Utc}; use futures::future::join_all; use k8s_openapi::NamespaceResourceScope; use k8s_openapi::api::admissionregistration::v1::{ @@ -45,7 +44,6 @@ use mz_cloud_resources::crd::materialize::v1alpha1::Materialize; use serde::{Serialize, de::DeserializeOwned}; use tracing::{info, warn}; -use crate::utils::format_base_path; use crate::{ContainerDumper, Context}; struct K8sResourceDumper<'n, K> { @@ -100,7 +98,7 @@ where return Ok(()); } let file_path = format_resource_path( - self.context.start_time, + self.context.base_path.clone(), self.resource_type.as_str(), self.namespace.as_ref(), ); @@ -207,8 +205,11 @@ impl<'n> K8sDumper<'n> { return Ok(()); } - let file_path = - format_resource_path(self.context.start_time, resource_type.as_str(), namespace); + let file_path = format_resource_path( + self.context.base_path.clone(), + resource_type.as_str(), + namespace, + ); let file_name = file_path.join("describe.txt"); create_dir_all(&file_path)?; let mut file = File::create(&file_name)?; @@ -290,7 +291,8 @@ impl<'n> K8sDumper<'n> { } async fn _dump_k8s_pod_logs(&self, namespace: &String) -> Result<(), anyhow::Error> { - let file_path = format_resource_path(self.context.start_time, "logs", Some(namespace)); + let file_path = + format_resource_path(self.context.base_path.clone(), "logs", Some(namespace)); create_dir_all(&file_path)?; let pods: Api = Api::::namespaced(self.client.clone(), namespace); @@ -566,11 +568,11 @@ impl<'n> ContainerDumper for K8sDumper<'n> { } fn format_resource_path( - date_time: DateTime, + base_path: PathBuf, resource_type: &str, namespace: Option<&String>, ) -> PathBuf { - let mut path = format_base_path(date_time).join(resource_type); + let mut path = base_path.join(resource_type); if let Some(namespace) = namespace { path = path.join(namespace); diff --git a/src/mz-debug/src/main.rs b/src/mz-debug/src/main.rs index 9102c0fbfc6ad..38d5fe29e1e5e 100644 --- a/src/mz-debug/src/main.rs +++ b/src/mz-debug/src/main.rs @@ -12,7 +12,7 @@ use std::path::PathBuf; use std::process; use std::sync::{Arc, LazyLock}; -use chrono::{DateTime, Utc}; +use chrono::Utc; use clap::Parser; use kube::config::KubeConfigOptions; use kube::{Client as KubernetesClient, Config}; @@ -170,6 +170,7 @@ async fn main() { }); let start_time = Utc::now(); + let base_path = format_base_path(start_time); // We use tracing_subscriber to display the output of tracing to stdout // and log to a file included in the debug zip. @@ -177,7 +178,7 @@ async fn main() { .with_target(false) .without_time(); - if let Ok(file) = create_tracing_log_file(start_time) { + if let Ok(file) = create_tracing_log_file(base_path.clone()) { let file_layer = tracing_subscriber::fmt::layer() .with_writer(file) .with_ansi(false); @@ -196,7 +197,7 @@ async fn main() { let initialize_then_run = async move { // Preprocess args into contexts - let context = initialize_context(args, start_time).await?; + let context = initialize_context(args, base_path).await?; run(context).await }; @@ -210,10 +211,7 @@ async fn main() { } } -async fn initialize_context( - args: Args, - start_time: DateTime, -) -> Result { +async fn initialize_context(args: Args, base_path: PathBuf) -> Result { let (debug_mode_context, mz_connection_url) = match &args.debug_mode_args { DebugModeArgs::SelfManaged(args) => { let k8s_client = match create_k8s_client(args.k8s_context.clone()).await { @@ -271,7 +269,7 @@ async fn initialize_context( }; Ok(Context { - start_time, + base_path, debug_mode_context, mz_connection_url, dump_system_catalog: args.dump_system_catalog, @@ -329,11 +327,9 @@ async fn run(context: Context) -> Result<(), anyhow::Error> { info!("Zipping debug directory"); - let base_path = format_base_path(context.start_time); - - let zip_file_name = format!("{}.zip", &base_path.display()); + let zip_file_name = format!("{}.zip", &context.base_path.display()); - if let Err(e) = zip_debug_folder(PathBuf::from(&zip_file_name), &base_path) { + if let Err(e) = zip_debug_folder(PathBuf::from(&zip_file_name), &context.base_path) { warn!("Failed to zip debug directory: {}", e); } else { info!("Created zip debug at {}", &zip_file_name); diff --git a/src/mz-debug/src/system_catalog_dumper.rs b/src/mz-debug/src/system_catalog_dumper.rs index d88a011cfb1e0..36aad2541d56a 100644 --- a/src/mz-debug/src/system_catalog_dumper.rs +++ b/src/mz-debug/src/system_catalog_dumper.rs @@ -19,7 +19,6 @@ //! cleaning up / aborting queries much easier. use anyhow::{Context as _, Result}; -use chrono::{DateTime, Utc}; use csv_async::AsyncSerializer; use futures::TryStreamExt; use mz_tls_util::make_tls; @@ -42,7 +41,6 @@ use postgres_openssl::{MakeTlsConnector, TlsStream}; use tracing::{info, warn}; use crate::Context; -use crate::utils::format_base_path; #[derive(Debug, Clone)] pub enum RelationCategory { @@ -619,7 +617,7 @@ pub async fn query_column_names( pub async fn query_relation( transaction: &Transaction<'_>, - start_time: DateTime, + base_path: PathBuf, relation: &Relation, column_names: &Vec, cluster_replica: Option<&ClusterReplica>, @@ -656,14 +654,14 @@ pub async fn query_relation( match relation_category { RelationCategory::Basic => { - let file_path = format_file_path(start_time, None); + let file_path = format_file_path(base_path, None); let file_path_name = file_path.join(relation_name).with_extension("csv"); tokio::fs::create_dir_all(&file_path).await?; copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?; } RelationCategory::Introspection => { - let file_path = format_file_path(start_time, cluster_replica); + let file_path = format_file_path(base_path, cluster_replica); tokio::fs::create_dir_all(&file_path).await?; let file_path_name = file_path.join(relation_name).with_extension("csv"); @@ -672,7 +670,7 @@ pub async fn query_relation( } RelationCategory::Retained => { // Copy the current state and retained subscribe state - let file_path = format_file_path(start_time, None); + let file_path = format_file_path(base_path, None); let file_path_name = file_path .join(format!("{}_subscribe", relation_name)) .with_extension("csv"); @@ -751,7 +749,7 @@ impl<'n> SystemCatalogDumper<'n> { cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" in {}", replica)) ); - let start_time = self.context.start_time; + let base_path = self.context.base_path.clone(); let pg_client = &self.pg_client; let relation_name = relation.name.to_string(); @@ -760,7 +758,7 @@ impl<'n> SystemCatalogDumper<'n> { .max_duration(PG_QUERY_TIMEOUT) .initial_backoff(Duration::from_secs(2)) .retry_async_canceling(|_| { - let start_time = start_time.clone(); + let base_path = base_path.clone(); let relation_name = relation.name; let cluster_replica = cluster_replica.clone(); @@ -776,7 +774,7 @@ impl<'n> SystemCatalogDumper<'n> { let transaction = pg_client.transaction().await?; query_relation( &transaction, - start_time, + base_path, relation, &column_names, cluster_replica, @@ -936,8 +934,8 @@ fn format_catalog_dump_error_message( ) } -fn format_file_path(date_time: DateTime, cluster_replica: Option<&ClusterReplica>) -> PathBuf { - let path = format_base_path(date_time).join("system-catalog"); +fn format_file_path(base_path: PathBuf, cluster_replica: Option<&ClusterReplica>) -> PathBuf { + let path = base_path.join("system-catalog"); if let Some(cluster_replica) = cluster_replica { path.join(cluster_replica.cluster_name.as_str()) .join(cluster_replica.replica_name.as_str()) diff --git a/src/mz-debug/src/utils.rs b/src/mz-debug/src/utils.rs index b13ebc02009b6..356ce9c7a8e38 100644 --- a/src/mz-debug/src/utils.rs +++ b/src/mz-debug/src/utils.rs @@ -33,8 +33,7 @@ pub fn validate_pg_connection_string(connection_string: &str) -> Result) -> Result { - let dir = format_base_path(date_time); +pub fn create_tracing_log_file(dir: PathBuf) -> Result { let log_file = dir.join("tracing.log"); if log_file.exists() { remove_dir_all(&log_file)?; From 70bd3ba2f090b0e0d6c4aba414d703040ac87889 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Tue, 13 May 2025 00:54:38 -0400 Subject: [PATCH 05/11] Refactor kubectl port forwarder - Use stdout instead of stderr - Remove sharing of context - Fix bug where process was being killed on connecting to port forward --- src/mz-debug/src/kubectl_port_forwarder.rs | 16 ++++++++++------ src/mz-debug/src/main.rs | 19 +++++++++++-------- src/mz-debug/src/system_catalog_dumper.rs | 20 ++++++++------------ 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/mz-debug/src/kubectl_port_forwarder.rs b/src/mz-debug/src/kubectl_port_forwarder.rs index e645da286ce1e..0ef0c64676370 100644 --- a/src/mz-debug/src/kubectl_port_forwarder.rs +++ b/src/mz-debug/src/kubectl_port_forwarder.rs @@ -36,6 +36,9 @@ pub struct KubectlPortForwarder { pub struct PortForwardConnection { // tokio process that's killed on drop pub _port_forward_process: tokio::process::Child, + // We need to keep the lines otherwise the process will be killed when new lines + // are added to the stdout. + pub _lines: tokio::io::Lines>, } impl KubectlPortForwarder { @@ -60,18 +63,18 @@ impl KubectlPortForwarder { let child = tokio::process::Command::new("kubectl") .args(args) - // Silence stdout - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::piped()) + // Silence stderr + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::null()) .kill_on_drop(true) .spawn(); if let Ok(mut child) = child { - if let Some(stderr) = child.stderr.take() { - let stderr_reader = tokio::io::BufReader::new(stderr); + if let Some(stdout) = child.stdout.take() { + let stdout_reader = tokio::io::BufReader::new(stdout); + let mut lines = stdout_reader.lines(); // Wait until we know port forwarding is established let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { - let mut lines = stderr_reader.lines(); while let Ok(Some(line)) = lines.next_line().await { if line.contains("Forwarding from") { break; @@ -90,6 +93,7 @@ impl KubectlPortForwarder { ); return Ok(PortForwardConnection { + _lines: lines, _port_forward_process: child, }); } diff --git a/src/mz-debug/src/main.rs b/src/mz-debug/src/main.rs index 38d5fe29e1e5e..9934c9586e8c2 100644 --- a/src/mz-debug/src/main.rs +++ b/src/mz-debug/src/main.rs @@ -129,7 +129,7 @@ pub struct Args { pub trait ContainerDumper { fn dump_container_resources(&self) -> impl std::future::Future; } -#[derive(Clone)] + struct SelfManagedContext { dump_k8s: bool, k8s_client: KubernetesClient, @@ -137,7 +137,7 @@ struct SelfManagedContext { k8s_namespaces: Vec, k8s_dump_secret_values: bool, - _mz_port_forward_process: Arc>, + _mz_port_forward_process: Option, } #[derive(Clone)] struct EmulatorContext { @@ -145,15 +145,13 @@ struct EmulatorContext { docker_container_id: String, } -#[derive(Clone)] enum DebugModeContext { SelfManaged(SelfManagedContext), Emulator(EmulatorContext), } -#[derive(Clone)] pub struct Context { - start_time: DateTime, + base_path: PathBuf, debug_mode_context: DebugModeContext, mz_connection_url: String, @@ -222,7 +220,7 @@ async fn initialize_context(args: Args, base_path: PathBuf) -> Result Result Result<(), anyhow::Error> { if context.dump_system_catalog { // Dump the system catalog. - let catalog_dumper = match system_catalog_dumper::SystemCatalogDumper::new(&context).await { + let catalog_dumper = match system_catalog_dumper::SystemCatalogDumper::new( + &context.mz_connection_url, + context.base_path.clone(), + ) + .await + { Ok(dumper) => Some(dumper), Err(e) => { warn!("Failed to dump system catalog: {}", e); diff --git a/src/mz-debug/src/system_catalog_dumper.rs b/src/mz-debug/src/system_catalog_dumper.rs index 36aad2541d56a..77119f9fb1ae7 100644 --- a/src/mz-debug/src/system_catalog_dumper.rs +++ b/src/mz-debug/src/system_catalog_dumper.rs @@ -474,8 +474,8 @@ impl fmt::Display for ClusterReplica { } } -pub struct SystemCatalogDumper<'n> { - context: &'n Context, +pub struct SystemCatalogDumper { + base_path: PathBuf, pg_client: Arc>, pg_tls: MakeTlsConnector, cluster_replicas: Vec, @@ -682,15 +682,11 @@ pub async fn query_relation( Ok::<(), anyhow::Error>(()) } -impl<'n> SystemCatalogDumper<'n> { - pub async fn new(context: &'n Context) -> Result { - let (pg_client, pg_conn, pg_tls) = - create_postgres_connection(&context.mz_connection_url).await?; +impl SystemCatalogDumper { + pub async fn new(connection_url: &str, base_path: PathBuf) -> Result { + let (pg_client, pg_conn, pg_tls) = create_postgres_connection(connection_url).await?; - info!( - "Connected to PostgreSQL server at {}", - context.mz_connection_url - ); + info!("Connected to PostgreSQL server at {}", connection_url); let handle = task::spawn(|| "postgres-connection", pg_conn); @@ -726,7 +722,7 @@ impl<'n> SystemCatalogDumper<'n> { }; Ok(Self { - context, + base_path, pg_client: Arc::new(Mutex::new(pg_client)), pg_tls, cluster_replicas, @@ -749,7 +745,7 @@ impl<'n> SystemCatalogDumper<'n> { cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" in {}", replica)) ); - let base_path = self.context.base_path.clone(); + let base_path = self.base_path.clone(); let pg_client = &self.pg_client; let relation_name = relation.name.to_string(); From af5efea99cff581216e7b53572bffbd6583ebd54 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Tue, 13 May 2025 02:27:04 -0400 Subject: [PATCH 06/11] Implement internal HTTP dumper - Introduced InternalHttpDumpClient for downloading and saving heap profile data and Prometheus metrics from internal HTTP endpoints. - Changes service for port forwarding from balancerd to environmentd for better consistency with metrics - Refactor "find" methods for port forwarding to be more modular. Separates out port finding / port forwarder creation from service finding --- Cargo.lock | 1 + src/mz-debug/Cargo.toml | 1 + src/mz-debug/src/internal_http_dumper.rs | 296 +++++++++++++++++++++ src/mz-debug/src/kubectl_port_forwarder.rs | 156 ++++++++--- src/mz-debug/src/main.rs | 58 ++-- src/mz-debug/src/system_catalog_dumper.rs | 2 - 6 files changed, 451 insertions(+), 63 deletions(-) create mode 100644 src/mz-debug/src/internal_http_dumper.rs diff --git a/Cargo.lock b/Cargo.lock index 3ade705b6b7b9..133c417b57111 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5562,6 +5562,7 @@ dependencies = [ "mz-ore", "mz-tls-util", "postgres-openssl", + "reqwest 0.11.24", "serde", "serde_yaml", "tokio", diff --git a/src/mz-debug/Cargo.toml b/src/mz-debug/Cargo.toml index 0b2cc5b2eef16..4227749884fa7 100644 --- a/src/mz-debug/Cargo.toml +++ b/src/mz-debug/Cargo.toml @@ -22,6 +22,7 @@ mz-cloud-resources = { path = "../cloud-resources"} mz-ore = { path = "../ore", features = ["cli", "test"] } mz-tls-util = { path = "../tls-util" } postgres-openssl = { version = "0.5.0" } +reqwest = { version = "0.11", features = ["stream"] } serde = "1.0.219" serde_yaml = "0.9.34" tokio = "1.44.1" diff --git a/src/mz-debug/src/internal_http_dumper.rs b/src/mz-debug/src/internal_http_dumper.rs new file mode 100644 index 0000000000000..9021e52a2d73a --- /dev/null +++ b/src/mz-debug/src/internal_http_dumper.rs @@ -0,0 +1,296 @@ +use anyhow::{Context as AnyhowContext, Result}; +use futures::StreamExt; +use k8s_openapi::api::core::v1::ServicePort; +use reqwest::header::{HeaderMap, HeaderValue}; +use std::path::Path; +use tokio::fs::{File, create_dir_all}; +use tokio::io::AsyncWriteExt; +use tracing::{info, warn}; + +use crate::kubectl_port_forwarder::{ + KubectlPortForwarder, find_cluster_services, find_environmentd_service, +}; +use crate::{Context, SelfManagedContext}; + +static PROM_METRICS_ENDPOINT: &str = "metrics"; +static ENVD_HEAP_PROFILE_ENDPOINT: &str = "prof/heap"; +static CLUSTERD_HEAP_PROFILE_ENDPOINT: &str = "heap"; + +/// A struct that handles downloading and saving profile data from HTTP endpoints +pub struct InternalHttpDumpClient<'n> { + context: &'n Context, + http_client: &'n reqwest::Client, +} + +/// A struct that handles downloading and exporting data from our internal HTTP endpoints. +impl<'n> InternalHttpDumpClient<'n> { + pub fn new(context: &'n Context, http_client: &'n reqwest::Client) -> Self { + Self { + context, + http_client, + } + } + + async fn dump_request_to_file( + &self, + relative_url: &str, + headers: HeaderMap, + output_path: &Path, + ) -> Result<(), anyhow::Error> { + // Try HTTPS first, then fall back to HTTP if that fails + let mut url = format!("https://{}", relative_url); + let mut response = self + .http_client + .get(&url) + .headers(headers.clone()) + .send() + .await; + + if response.is_err() { + // Fall back to HTTP if HTTPS fails + url = format!("http://{}", relative_url); + response = self.http_client.get(&url).headers(headers).send().await; + } + + let response = response.with_context(|| format!("Failed to send request to {}", url))?; + + if !response.status().is_success() { + return Err(anyhow::anyhow!( + "Failed to get response from {}: {}", + url, + response.status() + )); + } + + let mut file = File::create(output_path) + .await + .with_context(|| format!("Failed to create file: {}", output_path.display()))?; + + let mut stream = response.bytes_stream(); + while let Some(chunk) = stream.next().await { + let chunk = chunk.with_context(|| "Failed to read chunk from response")?; + file.write_all(&chunk) + .await + .with_context(|| "Failed to write chunk to file")?; + } + + file.flush() + .await + .with_context(|| "Failed to flush file contents")?; + + Ok(()) + } + + /// Downloads and saves heap profile data + pub async fn dump_heap_profile(&self, relative_url: &str, service_name: &str) -> Result<()> { + let output_dir = self.context.base_path.join("profiles"); + create_dir_all(&output_dir).await.with_context(|| { + format!( + "Failed to create output directory: {}", + output_dir.display() + ) + })?; + let output_path = output_dir.join(format!("{}.memprof.pprof", service_name)); + + self.dump_request_to_file( + &relative_url, + { + let mut headers = HeaderMap::new(); + headers.insert( + "Accept", + HeaderValue::from_static("application/octet-stream"), + ); + headers + }, + &output_path, + ) + .await + .with_context(|| format!("Failed to dump heap profile to {}", output_path.display()))?; + + Ok(()) + } + + pub async fn dump_prometheus_metrics( + &self, + relative_url: &str, + service_name: &str, + ) -> Result<()> { + let output_dir = self.context.base_path.join("prom_metrics"); + create_dir_all(&output_dir).await.with_context(|| { + format!( + "Failed to create output directory: {}", + output_dir.display() + ) + })?; + + let output_path = output_dir.join(format!("{}.metrics.txt", service_name)); + self.dump_request_to_file( + &relative_url, + { + let mut headers = HeaderMap::new(); + headers.insert("Accept", HeaderValue::from_static("text/plain")); + headers + }, + &output_path, + ) + .await?; + + Ok(()) + } +} + +// TODO (debug_tool3): Scrape cluster profiles through a proxy when (database-issues#8942) is implemented +pub async fn dump_emulator_http_resources(context: &Context) -> Result<()> { + let http_client = reqwest::Client::new(); + let dump_task = InternalHttpDumpClient::new(context, &http_client); + + if context.dump_heap_profiles { + let resource_name = "environmentd".to_string(); + + // We assume the emulator is exposed on the local network and uses port 6878. + // TODO (debug_tool1): Figure out the correct IP address from the docker container + if let Err(e) = dump_task + .dump_heap_profile( + &format!("127.0.0.1:6878/{}", ENVD_HEAP_PROFILE_ENDPOINT), + &resource_name, + ) + .await + { + warn!("Failed to dump heap profile: {}", e); + } + } + + if context.dump_prometheus_metrics { + let resource_name = "environmentd".to_string(); + + let dump_task = InternalHttpDumpClient::new(context, &http_client); + + if let Err(e) = dump_task + .dump_prometheus_metrics( + &format!("127.0.0.1:6878/{}", PROM_METRICS_ENDPOINT), + &resource_name, + ) + .await + { + warn!("Failed to dump prometheus metrics: {}", e); + } + } + + Ok(()) +} + +pub async fn dump_self_managed_http_resources( + context: &Context, + self_managed_context: &SelfManagedContext, +) -> Result<()> { + let http_client = reqwest::Client::new(); + // TODO (debug_tool3): Allow user to override temporary local address for http port forwarding + let local_address = "127.0.0.1"; + let local_port = 6878; + + let cluster_services = find_cluster_services( + &self_managed_context.k8s_client, + &self_managed_context.k8s_namespaces, + ) + .await + .with_context(|| "Failed to find cluster services")?; + + let environmentd_service = find_environmentd_service( + &self_managed_context.k8s_client, + &self_managed_context.k8s_namespaces, + ) + .await + .with_context(|| "Failed to find environmentd service")?; + + let prom_metrics_endpoint = format!( + "{}:{}/{}", + &local_address, local_port, PROM_METRICS_ENDPOINT + ); + + let clusterd_heap_profile_endpoint = format!( + "{}:{}/{}", + &local_address, local_port, CLUSTERD_HEAP_PROFILE_ENDPOINT + ); + + let envd_heap_profile_endpoint = format!( + "{}:{}/{}", + &local_address, local_port, ENVD_HEAP_PROFILE_ENDPOINT + ); + + let services = cluster_services + .iter() + .map(|service| (service, &clusterd_heap_profile_endpoint)) + .chain(std::iter::once(( + &environmentd_service, + &envd_heap_profile_endpoint, + ))); + + // Scrape each service + for (service_info, profiling_endpoint) in services { + let maybe_internal_http_port = service_info + .service_ports + .iter() + .find_map(find_internal_http_port); + + if let Some(internal_http_port) = maybe_internal_http_port { + let port_forwarder = KubectlPortForwarder { + context: self_managed_context.k8s_context.clone(), + namespace: service_info.namespace.clone(), + service_name: service_info.service_name.clone(), + target_port: internal_http_port.port, + local_address: local_address.to_string(), + local_port, + }; + + let _internal_http_connection = + port_forwarder.spawn_port_forward().await.with_context(|| { + format!( + "Failed to spawn port forwarder for service {}", + service_info.service_name + ) + })?; + + let dump_task = InternalHttpDumpClient::new(context, &http_client); + + if context.dump_heap_profiles { + info!( + "Dumping heap profile for service {}", + service_info.service_name + ); + dump_task + .dump_heap_profile(profiling_endpoint, &service_info.service_name) + .await + .with_context(|| { + format!( + "Failed to dump heap profile for service {}", + service_info.service_name + ) + })?; + } + + if context.dump_prometheus_metrics { + dump_task + .dump_prometheus_metrics(&prom_metrics_endpoint, &service_info.service_name) + .await + .with_context(|| { + format!( + "Failed to dump prometheus metrics for service {}", + service_info.service_name + ) + })?; + } + } + } + + Ok(()) +} + +fn find_internal_http_port(port_info: &ServicePort) -> Option<&ServicePort> { + if let Some(port_name) = &port_info.name { + let port_name = port_name.to_lowercase(); + if port_name == "internal-http" { + return Some(port_info); + } + } + None +} diff --git a/src/mz-debug/src/kubectl_port_forwarder.rs b/src/mz-debug/src/kubectl_port_forwarder.rs index 0ef0c64676370..eb23950632109 100644 --- a/src/mz-debug/src/kubectl_port_forwarder.rs +++ b/src/mz-debug/src/kubectl_port_forwarder.rs @@ -15,8 +15,8 @@ //! Port forwards k8s service via Kubectl -use anyhow::Result; -use k8s_openapi::api::core::v1::Service; +use anyhow::{Context, Result}; +use k8s_openapi::api::core::v1::{Service, ServicePort}; use kube::api::ListParams; use kube::{Api, Client}; use tokio::io::AsyncBufReadExt; @@ -102,62 +102,132 @@ impl KubectlPortForwarder { } } -/// Creates a port forwarder for the external pg wire port of balancerd. -pub async fn create_pg_wire_port_forwarder( +#[derive(Debug)] +pub struct ServiceInfo { + pub service_name: String, + pub service_ports: Vec, + pub namespace: String, +} + +/// Returns ServiceInfo for balancerd +pub async fn find_environmentd_service( client: &Client, - k8s_context: &Option, k8s_namespaces: &Vec, - port_forward_local_address: &String, - port_forward_local_port: i32, -) -> Result { +) -> Result { for namespace in k8s_namespaces { let services: Api = Api::namespaced(client.clone(), namespace); let services = services .list(&ListParams::default().labels("materialize.cloud/mz-resource-id")) - .await?; - // Finds the sql service that contains a port with name "sql" - let maybe_port_info = services + .await + .with_context(|| format!("Failed to list services in namespace {}", namespace))?; + + // Find the first sql service that contains balancerd + let maybe_service = + services + .iter() + .find_map(|service| match (&service.metadata.name, &service.spec) { + (Some(service_name), Some(spec)) => { + if !service_name.to_lowercase().contains("environmentd") { + return None; + } + + if let Some(ports) = &spec.ports { + Some(ServiceInfo { + service_name: service_name.clone(), + service_ports: ports.clone(), + namespace: namespace.clone(), + }) + } else { + None + } + } + _ => None, + }); + + if let Some(service) = maybe_service { + return Ok(service); + } + } + + Err(anyhow::anyhow!("Could not find environmentd service")) +} + +/// Returns Vec<(service_name, ports)> for cluster services +pub async fn find_cluster_services( + client: &Client, + k8s_namespaces: &Vec, +) -> Result> { + for namespace in k8s_namespaces { + let services: Api = Api::namespaced(client.clone(), namespace); + let services = services + .list(&ListParams::default()) + .await + .with_context(|| format!("Failed to list services in namespace {}", namespace))?; + let cluster_services: Vec = services .iter() .filter_map(|service| { - let spec = service.spec.as_ref()?; - let service_name = service.metadata.name.as_ref()?; - if !service_name.to_lowercase().contains("balancerd") { + let name = service.metadata.name.clone()?; + let spec = service.spec.clone()?; + let selector = spec.selector?; + let ports = spec.ports?; + + // Check if this is a cluster service + if selector.get("environmentd.materialize.cloud/namespace")? != "cluster" { return None; } - Some((spec, service_name)) - }) - .flat_map(|(spec, service_name)| { - spec.ports - .iter() - .flatten() - .map(move |port| (port, service_name)) - }) - .find_map(|(port_info, service_name)| { - if let Some(port_name) = &port_info.name { - // We want to find the external SQL port and not the internal one - if port_name.to_lowercase().contains("pgwire") { - return Some(KubectlPortForwarder { - context: k8s_context.clone(), - namespace: namespace.clone(), - service_name: service_name.to_owned(), - target_port: port_info.port, - local_address: port_forward_local_address.clone(), - local_port: port_forward_local_port, - }); - } - } - None - }); + Some(ServiceInfo { + service_name: name, + service_ports: ports, + namespace: namespace.clone(), + }) + }) + .collect(); - if let Some(port_info) = maybe_port_info { - return Ok(port_info); + if !cluster_services.is_empty() { + return Ok(cluster_services); } } - Err(anyhow::anyhow!( - "No SQL port forwarding info found. Set --auto-port-forward to false and point --mz-connection-url to a Materialize instance." - )) + Err(anyhow::anyhow!("Could not find cluster services")) +} + +/// Creates a port forwarder for the external pg wire port of balancerd. +pub async fn create_pg_wire_port_forwarder( + client: &Client, + k8s_context: &Option, + k8s_namespaces: &Vec, + port_forward_local_address: &String, + port_forward_local_port: i32, +) -> Result { + let service_info = find_environmentd_service(client, k8s_namespaces) + .await + .with_context(|| format!("Cannot find ports for environmentd service"))?; + + let maybe_internal_sql_port = service_info.service_ports.iter().find_map(|port_info| { + if let Some(port_name) = &port_info.name { + let port_name = port_name.to_lowercase(); + if port_name == "sql" { + return Some(port_info); + } + } + None + }); + + if let Some(internal_sql_port) = maybe_internal_sql_port { + Ok(KubectlPortForwarder { + context: k8s_context.clone(), + namespace: service_info.namespace, + service_name: service_info.service_name, + target_port: internal_sql_port.port, + local_address: port_forward_local_address.clone(), + local_port: port_forward_local_port, + }) + } else { + Err(anyhow::anyhow!( + "No SQL port forwarding info found. Set --auto-port-forward to false and point --mz-connection-url to a Materialize instance." + )) + } } pub fn create_mz_connection_url( diff --git a/src/mz-debug/src/main.rs b/src/mz-debug/src/main.rs index 9934c9586e8c2..e046782f3204e 100644 --- a/src/mz-debug/src/main.rs +++ b/src/mz-debug/src/main.rs @@ -10,7 +10,7 @@ //! Debug tool for self managed environments. use std::path::PathBuf; use std::process; -use std::sync::{Arc, LazyLock}; +use std::sync::LazyLock; use chrono::Utc; use clap::Parser; @@ -25,6 +25,7 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use crate::docker_dumper::DockerDumper; +use crate::internal_http_dumper::{dump_emulator_http_resources, dump_self_managed_http_resources}; use crate::k8s_dumper::K8sDumper; use crate::kubectl_port_forwarder::{PortForwardConnection, create_pg_wire_port_forwarder}; use crate::utils::{ @@ -63,8 +64,6 @@ pub struct SelfManagedDebugModeArgs { /// If true, the tool will dump the values of secrets in the Kubernetes cluster. #[clap(long, default_value = "false", action = clap::ArgAction::Set)] k8s_dump_secret_values: bool, - // TODO (debug_tool1): Convert port forwarding variables into a map since we'll be - // portforwarding multiple times /// If true, the tool will automatically port-forward the external SQL port in the Kubernetes cluster. /// If dump_k8s is false however, we will not automatically port-forward. #[clap(long, default_value = "true", action = clap::ArgAction::Set)] @@ -124,6 +123,12 @@ pub struct Args { /// If true, the tool will dump the system catalog in Materialize. #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)] dump_system_catalog: bool, + /// If true, the tool will dump the heap profiles in Materialize. + #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)] + dump_heap_profiles: bool, + /// If true, the tool will dump the prometheus metrics in Materialize. + #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)] + dump_prometheus_metrics: bool, } pub trait ContainerDumper { @@ -156,6 +161,8 @@ pub struct Context { mz_connection_url: String, dump_system_catalog: bool, + dump_heap_profiles: bool, + dump_prometheus_metrics: bool, } #[tokio::main] @@ -214,11 +221,11 @@ async fn initialize_context(args: Args, base_path: PathBuf) -> Result { let k8s_client = match create_k8s_client(args.k8s_context.clone()).await { Ok(k8s_client) => k8s_client, - Err(e) => { - error!("Failed to create k8s client: {}", e); - return Err(e); - } - }; + Err(e) => { + error!("Failed to create k8s client: {}", e); + return Err(e); + } + }; let mz_port_forward_process = if args.auto_port_forward { let port_forwarder = create_pg_wire_port_forwarder( @@ -230,16 +237,16 @@ async fn initialize_context(args: Args, base_path: PathBuf) -> Result Some(process), - Err(err) => { - warn!("{}", err); - None - } + Err(err) => { + warn!("{}", err); + None } - } else { - None - }; + } + } else { + None + }; let mz_connection_url = kubectl_port_forwarder::create_mz_connection_url( args.port_forward_local_address.clone(), args.port_forward_local_port, @@ -271,6 +278,8 @@ async fn initialize_context(args: Args, base_path: PathBuf) -> Result Result<(), anyhow::Error> { }) => { if *dump_docker { let dumper = DockerDumper::new(&context, docker_container_id.clone()); - dumper.dump_container_resources().await; - } + dumper.dump_container_resources().await; + } + } + }; + + match &context.debug_mode_context { + DebugModeContext::SelfManaged(self_managed_context) => { + if let Err(e) = dump_self_managed_http_resources(&context, self_managed_context).await { + warn!("Failed to dump self-managed http resources: {}", e); + } + } + DebugModeContext::Emulator(_) => { + if let Err(e) = dump_emulator_http_resources(&context).await { + warn!("Failed to dump emulator http resources: {}", e); + } } }; diff --git a/src/mz-debug/src/system_catalog_dumper.rs b/src/mz-debug/src/system_catalog_dumper.rs index 77119f9fb1ae7..f0d6a0a0ecbb7 100644 --- a/src/mz-debug/src/system_catalog_dumper.rs +++ b/src/mz-debug/src/system_catalog_dumper.rs @@ -40,8 +40,6 @@ use mz_ore::task::{self, JoinHandle}; use postgres_openssl::{MakeTlsConnector, TlsStream}; use tracing::{info, warn}; -use crate::Context; - #[derive(Debug, Clone)] pub enum RelationCategory { /// For relations that belong in the `mz_introspection` schema. From 7a8efaf7e19bed36804de6fbf1112d80f1b70f66 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Tue, 13 May 2025 02:56:44 -0400 Subject: [PATCH 07/11] Run emulator debug using Docker container IP retrieval - Introduced a new asynchronous function `get_container_ip` to fetch the IP address of a Docker container using its ID. Doing this means we don't need the user to map ports when doing `DOCKER RUN` - Changes mz_connection_url for the emulator to an optional argument and global now that we have the container's IP --- src/mz-debug/src/docker_dumper.rs | 32 +++++++ src/mz-debug/src/internal_http_dumper.rs | 35 +++++--- src/mz-debug/src/kubectl_port_forwarder.rs | 11 --- src/mz-debug/src/main.rs | 100 +++++++++++++-------- 4 files changed, 116 insertions(+), 62 deletions(-) diff --git a/src/mz-debug/src/docker_dumper.rs b/src/mz-debug/src/docker_dumper.rs index 5f48d9171b912..8c1046fdd4010 100644 --- a/src/mz-debug/src/docker_dumper.rs +++ b/src/mz-debug/src/docker_dumper.rs @@ -20,6 +20,7 @@ use std::io::Write; use std::path::PathBuf; use std::time::Duration; +use anyhow::Context as AnyhowContext; use mz_ore::retry::{self, RetryResult}; use tracing::{info, warn}; @@ -146,3 +147,34 @@ fn write_output( info!("Exported {}", file_path.display()); Ok(()) } + +/// Gets the IP address of a Docker container using the container ID. +pub async fn get_container_ip(container_id: &str) -> Result { + let output = tokio::process::Command::new("docker") + .args([ + "inspect", + "-f", + "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}", + container_id, + ]) + .output() + .await + .with_context(|| format!("Failed to get container IP address for {}", container_id))?; + + if !output.status.success() { + return Err(anyhow::anyhow!( + "Docker command failed: {}", + String::from_utf8_lossy(&output.stderr) + )); + } + + let ip = String::from_utf8(output.stdout) + .with_context(|| "Failed to convert container IP address to string")? + .trim() + .to_string(); + if ip.is_empty() { + return Err(anyhow::anyhow!("Container IP address not found")); + } + + Ok(ip) +} diff --git a/src/mz-debug/src/internal_http_dumper.rs b/src/mz-debug/src/internal_http_dumper.rs index 9021e52a2d73a..7c75ee9145432 100644 --- a/src/mz-debug/src/internal_http_dumper.rs +++ b/src/mz-debug/src/internal_http_dumper.rs @@ -10,11 +10,13 @@ use tracing::{info, warn}; use crate::kubectl_port_forwarder::{ KubectlPortForwarder, find_cluster_services, find_environmentd_service, }; -use crate::{Context, SelfManagedContext}; +use crate::{Context, EmulatorContext, SelfManagedContext}; static PROM_METRICS_ENDPOINT: &str = "metrics"; static ENVD_HEAP_PROFILE_ENDPOINT: &str = "prof/heap"; static CLUSTERD_HEAP_PROFILE_ENDPOINT: &str = "heap"; +static INTERNAL_HOST_ADDRESS: &str = "127.0.0.1"; +static INTERNAL_HTTP_PORT: i32 = 6878; /// A struct that handles downloading and saving profile data from HTTP endpoints pub struct InternalHttpDumpClient<'n> { @@ -140,7 +142,10 @@ impl<'n> InternalHttpDumpClient<'n> { } // TODO (debug_tool3): Scrape cluster profiles through a proxy when (database-issues#8942) is implemented -pub async fn dump_emulator_http_resources(context: &Context) -> Result<()> { +pub async fn dump_emulator_http_resources( + context: &Context, + emulator_context: &EmulatorContext, +) -> Result<()> { let http_client = reqwest::Client::new(); let dump_task = InternalHttpDumpClient::new(context, &http_client); @@ -148,10 +153,12 @@ pub async fn dump_emulator_http_resources(context: &Context) -> Result<()> { let resource_name = "environmentd".to_string(); // We assume the emulator is exposed on the local network and uses port 6878. - // TODO (debug_tool1): Figure out the correct IP address from the docker container if let Err(e) = dump_task .dump_heap_profile( - &format!("127.0.0.1:6878/{}", ENVD_HEAP_PROFILE_ENDPOINT), + &format!( + "{}:{}/{}", + emulator_context.container_ip, INTERNAL_HTTP_PORT, ENVD_HEAP_PROFILE_ENDPOINT + ), &resource_name, ) .await @@ -167,7 +174,10 @@ pub async fn dump_emulator_http_resources(context: &Context) -> Result<()> { if let Err(e) = dump_task .dump_prometheus_metrics( - &format!("127.0.0.1:6878/{}", PROM_METRICS_ENDPOINT), + &format!( + "{}:{}/{}", + emulator_context.container_ip, INTERNAL_HTTP_PORT, PROM_METRICS_ENDPOINT + ), &resource_name, ) .await @@ -184,9 +194,6 @@ pub async fn dump_self_managed_http_resources( self_managed_context: &SelfManagedContext, ) -> Result<()> { let http_client = reqwest::Client::new(); - // TODO (debug_tool3): Allow user to override temporary local address for http port forwarding - let local_address = "127.0.0.1"; - let local_port = 6878; let cluster_services = find_cluster_services( &self_managed_context.k8s_client, @@ -201,20 +208,20 @@ pub async fn dump_self_managed_http_resources( ) .await .with_context(|| "Failed to find environmentd service")?; - + // TODO (debug_tool3): Allow user to override temporary local address for http port forwarding let prom_metrics_endpoint = format!( "{}:{}/{}", - &local_address, local_port, PROM_METRICS_ENDPOINT + INTERNAL_HOST_ADDRESS, INTERNAL_HTTP_PORT, PROM_METRICS_ENDPOINT ); let clusterd_heap_profile_endpoint = format!( "{}:{}/{}", - &local_address, local_port, CLUSTERD_HEAP_PROFILE_ENDPOINT + INTERNAL_HOST_ADDRESS, INTERNAL_HTTP_PORT, CLUSTERD_HEAP_PROFILE_ENDPOINT ); let envd_heap_profile_endpoint = format!( "{}:{}/{}", - &local_address, local_port, ENVD_HEAP_PROFILE_ENDPOINT + INTERNAL_HOST_ADDRESS, INTERNAL_HTTP_PORT, ENVD_HEAP_PROFILE_ENDPOINT ); let services = cluster_services @@ -238,8 +245,8 @@ pub async fn dump_self_managed_http_resources( namespace: service_info.namespace.clone(), service_name: service_info.service_name.clone(), target_port: internal_http_port.port, - local_address: local_address.to_string(), - local_port, + local_address: INTERNAL_HOST_ADDRESS.to_string(), + local_port: INTERNAL_HTTP_PORT, }; let _internal_http_connection = diff --git a/src/mz-debug/src/kubectl_port_forwarder.rs b/src/mz-debug/src/kubectl_port_forwarder.rs index eb23950632109..e81bb073559ca 100644 --- a/src/mz-debug/src/kubectl_port_forwarder.rs +++ b/src/mz-debug/src/kubectl_port_forwarder.rs @@ -229,14 +229,3 @@ pub async fn create_pg_wire_port_forwarder( )) } } - -pub fn create_mz_connection_url( - local_address: String, - local_port: i32, - connection_url_override: Option, -) -> String { - if let Some(connection_url_override) = connection_url_override { - return connection_url_override; - } - format!("postgres://{}:{}?sslmode=prefer", local_address, local_port) -} diff --git a/src/mz-debug/src/main.rs b/src/mz-debug/src/main.rs index e046782f3204e..bb6afe8179783 100644 --- a/src/mz-debug/src/main.rs +++ b/src/mz-debug/src/main.rs @@ -12,6 +12,7 @@ use std::path::PathBuf; use std::process; use std::sync::LazyLock; +use anyhow::Context as AnyhowContext; use chrono::Utc; use clap::Parser; use kube::config::KubeConfigOptions; @@ -74,16 +75,6 @@ pub struct SelfManagedDebugModeArgs { /// The port to listen on for the port-forward. #[clap(long, default_value = "6875")] port_forward_local_port: i32, - /// The URL of the Materialize SQL connection used to dump the system catalog. - /// An example URL is `postgres://root@127.0.0.1:6875/materialize?sslmode=disable`. - /// By default, we will create a connection URL based on `port_forward_local_address` and `port_forward_local_port`. - // TODO(debug_tool3): Allow users to specify the pgconfig via separate variables - #[clap( - long, - env = "MZ_CONNECTION_URL", - value_parser = validate_pg_connection_string, - )] - mz_connection_url: Option, } #[derive(Parser, Debug, Clone)] @@ -94,17 +85,6 @@ pub struct EmulatorDebugModeArgs { /// The ID of the docker container to dump. #[clap(long)] docker_container_id: String, - /// The URL of the Materialize SQL connection used to dump the system catalog. - /// An example URL is `postgres://root@127.0.0.1:6875/materialize?sslmode=disable`. - // TODO(debug_tool3): Allow users to specify the pgconfig via separate variables - #[clap( - long, - env = "MZ_CONNECTION_URL", - // We assume that the emulator is running on the default port. - default_value = "postgres://127.0.0.1:6875/materialize?sslmode=prefer", - value_parser = validate_pg_connection_string, - )] - mz_connection_url: String, } #[derive(Parser, Debug, Clone)] @@ -129,6 +109,18 @@ pub struct Args { /// If true, the tool will dump the prometheus metrics in Materialize. #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)] dump_prometheus_metrics: bool, + /// The URL of the Materialize SQL connection used to dump the system catalog. + /// An example URL is `postgres://root@127.0.0.1:6875/materialize?sslmode=disable`. + /// By default, we will create a connection URL based on `port_forward_local_address:port_forward_local_port` for self-managed + /// or `docker_container_ip:6875` for the emulator. + // TODO(debug_tool3): Allow users to specify the pgconfig via separate variables + #[clap( + long, + env = "MZ_CONNECTION_URL", + value_parser = validate_pg_connection_string, + global = true + )] + mz_connection_url: Option, } pub trait ContainerDumper { @@ -148,6 +140,7 @@ struct SelfManagedContext { struct EmulatorContext { dump_docker: bool, docker_container_id: String, + container_ip: String, } enum DebugModeContext { @@ -216,8 +209,22 @@ async fn main() { } } -async fn initialize_context(args: Args, base_path: PathBuf) -> Result { - let (debug_mode_context, mz_connection_url) = match &args.debug_mode_args { +fn create_mz_connection_url( + local_address: String, + local_port: i32, + connection_url_override: Option, +) -> String { + if let Some(connection_url_override) = connection_url_override { + return connection_url_override; + } + format!("postgres://{}:{}?sslmode=prefer", local_address, local_port) +} + +async fn initialize_context( + global_args: Args, + base_path: PathBuf, +) -> Result { + let (debug_mode_context, mz_connection_url) = match &global_args.debug_mode_args { DebugModeArgs::SelfManaged(args) => { let k8s_client = match create_k8s_client(args.k8s_context.clone()).await { Ok(k8s_client) => k8s_client, @@ -247,10 +254,10 @@ async fn initialize_context(args: Args, base_path: PathBuf) -> Result Result ( - DebugModeContext::Emulator(EmulatorContext { - dump_docker: args.dump_docker, - docker_container_id: args.docker_container_id.clone(), - }), - args.mz_connection_url.clone(), - ), + DebugModeArgs::Emulator(args) => { + let container_ip = docker_dumper::get_container_ip(&args.docker_container_id) + .await + .with_context(|| { + format!( + "Failed to get IP for container {}", + args.docker_container_id + ) + })?; + + let mz_connection_url = create_mz_connection_url( + container_ip.clone(), + 6875, + global_args.mz_connection_url.clone(), + ); + + ( + DebugModeContext::Emulator(EmulatorContext { + dump_docker: args.dump_docker, + docker_container_id: args.docker_container_id.clone(), + container_ip, + }), + mz_connection_url, + ) + } }; Ok(Context { base_path, debug_mode_context, mz_connection_url, - dump_system_catalog: args.dump_system_catalog, - dump_heap_profiles: args.dump_heap_profiles, - dump_prometheus_metrics: args.dump_prometheus_metrics, + dump_system_catalog: global_args.dump_system_catalog, + dump_heap_profiles: global_args.dump_heap_profiles, + dump_prometheus_metrics: global_args.dump_prometheus_metrics, }) } @@ -309,6 +334,7 @@ async fn run(context: Context) -> Result<(), anyhow::Error> { DebugModeContext::Emulator(EmulatorContext { dump_docker, docker_container_id, + .. }) => { if *dump_docker { let dumper = DockerDumper::new(&context, docker_container_id.clone()); @@ -323,8 +349,8 @@ async fn run(context: Context) -> Result<(), anyhow::Error> { warn!("Failed to dump self-managed http resources: {}", e); } } - DebugModeContext::Emulator(_) => { - if let Err(e) = dump_emulator_http_resources(&context).await { + DebugModeContext::Emulator(emulator_context) => { + if let Err(e) = dump_emulator_http_resources(&context, emulator_context).await { warn!("Failed to dump emulator http resources: {}", e); } } From a79040c3124830ab5bd2868fede85a411e43521a Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Tue, 13 May 2025 03:08:59 -0400 Subject: [PATCH 08/11] Fix clippy and documentation - Update CLI flags in docs and clippy/lint errors --- Cargo.lock | 2 +- doc/user/data/mz-debug/emulator_options.yml | 16 +++--------- doc/user/data/mz-debug/mz_debug_option.yml | 26 +++++++++++++++++++ .../data/mz-debug/self_managed_options.yml | 12 --------- src/mz-debug/Cargo.toml | 2 +- src/mz-debug/src/internal_http_dumper.rs | 14 ++++++++-- src/mz-debug/src/kubectl_port_forwarder.rs | 2 +- 7 files changed, 45 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 133c417b57111..16fccf45b0ab6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5548,7 +5548,7 @@ dependencies = [ [[package]] name = "mz-debug" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "chrono", diff --git a/doc/user/data/mz-debug/emulator_options.yml b/doc/user/data/mz-debug/emulator_options.yml index c04188df43954..1a86f4e3394db 100644 --- a/doc/user/data/mz-debug/emulator_options.yml +++ b/doc/user/data/mz-debug/emulator_options.yml @@ -2,24 +2,16 @@ columns: - column: "Option" - column: "Description" rows: - - Option: "`--dump-docker `" - Description: | - - If `true`, dump debug information from the Docker container. - - Defaults to `true`. - - Option: "`--docker-container-id `" Description: | The Docker container to dump. - Required if [`--dump-docker`](#dump-docker) is true. + Required. - - Option: "`--mz-connection-url `" + - Option: "`--dump-docker `" Description: | - The URL of the Materialize's SQL - connection. + If `true`, dump debug information from the Docker container. - Defaults to `postgres://127.0.0.1:6875/materialize?sslmode=prefer`. + Defaults to `true`. diff --git a/doc/user/data/mz-debug/mz_debug_option.yml b/doc/user/data/mz-debug/mz_debug_option.yml index f854110c56933..21b13c477fa3d 100644 --- a/doc/user/data/mz-debug/mz_debug_option.yml +++ b/doc/user/data/mz-debug/mz_debug_option.yml @@ -2,6 +2,22 @@ columns: - column: "Option" - column: "Description" rows: + - Option: "`--dump-heap-profiles `" + Description: | + + If `true`, dump heap profiles (.pprof) from + your Materialize instance. + + Defaults to `true`. + - Option: "`--dump-prometheus-metrics `" + Description: | + + If `true`, dump prometheus metrics from + your Materialize instance. + + Defaults to `true`. + + - Option: "`--dump-system-catalog `" Description: | @@ -9,3 +25,13 @@ rows: your Materialize instance. Defaults to `true`. + + + - Option: "`--mz-connection-url `" + Description: | + + The Materialize instance's [PostgreSQL + connection + URL](https://www.postgresql.org/docs/14/libpq-connect.html#LIBPQ-CONNSTRING). + + Defaults to `postgres://127.0.0.1:6875/materialize?sslmode=prefer`. \ No newline at end of file diff --git a/doc/user/data/mz-debug/self_managed_options.yml b/doc/user/data/mz-debug/self_managed_options.yml index 0fe278ffa47e7..34680491f0235 100644 --- a/doc/user/data/mz-debug/self_managed_options.yml +++ b/doc/user/data/mz-debug/self_managed_options.yml @@ -57,18 +57,6 @@ rows: Defaults to `6875`. - - Option: "`--mz-connection-url `" - Description: | - - The Materialize instance's [PostgreSQL - connection - URL](https://www.postgresql.org/docs/14/libpq-connect.html#LIBPQ-CONNSTRING). - - Defaults to a connection URL constructed from: - - [`--port-forward-local-address`](#port-forward-local-address) and - [`--port-forward-local-port`](#port-forward-local-port) values. - - Option: "`-h`, `--help`" Description: | diff --git a/src/mz-debug/Cargo.toml b/src/mz-debug/Cargo.toml index 4227749884fa7..27052e865e62a 100644 --- a/src/mz-debug/Cargo.toml +++ b/src/mz-debug/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-debug" description = "Debug tool for self-managed Materialize." -version = "0.1.0" +version = "0.2.0" edition.workspace = true rust-version.workspace = true publish = false diff --git a/src/mz-debug/src/internal_http_dumper.rs b/src/mz-debug/src/internal_http_dumper.rs index 7c75ee9145432..d960d4a9163c5 100644 --- a/src/mz-debug/src/internal_http_dumper.rs +++ b/src/mz-debug/src/internal_http_dumper.rs @@ -1,3 +1,13 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Dumps internal http debug information to files. use anyhow::{Context as AnyhowContext, Result}; use futures::StreamExt; use k8s_openapi::api::core::v1::ServicePort; @@ -95,7 +105,7 @@ impl<'n> InternalHttpDumpClient<'n> { let output_path = output_dir.join(format!("{}.memprof.pprof", service_name)); self.dump_request_to_file( - &relative_url, + relative_url, { let mut headers = HeaderMap::new(); headers.insert( @@ -127,7 +137,7 @@ impl<'n> InternalHttpDumpClient<'n> { let output_path = output_dir.join(format!("{}.metrics.txt", service_name)); self.dump_request_to_file( - &relative_url, + relative_url, { let mut headers = HeaderMap::new(); headers.insert("Accept", HeaderValue::from_static("text/plain")); diff --git a/src/mz-debug/src/kubectl_port_forwarder.rs b/src/mz-debug/src/kubectl_port_forwarder.rs index e81bb073559ca..8473a4d544637 100644 --- a/src/mz-debug/src/kubectl_port_forwarder.rs +++ b/src/mz-debug/src/kubectl_port_forwarder.rs @@ -202,7 +202,7 @@ pub async fn create_pg_wire_port_forwarder( ) -> Result { let service_info = find_environmentd_service(client, k8s_namespaces) .await - .with_context(|| format!("Cannot find ports for environmentd service"))?; + .with_context(|| "Cannot find ports for environmentd service")?; let maybe_internal_sql_port = service_info.service_ports.iter().find_map(|port_info| { if let Some(port_name) = &port_info.name { From d83447d56c608706a97ec461446caba60bfd3c43 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Wed, 14 May 2025 23:26:57 -0400 Subject: [PATCH 09/11] Fix lint errors --- doc/user/data/mz-debug/mz_debug_option.yml | 4 ++-- src/mz-debug/BUILD.bazel | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/user/data/mz-debug/mz_debug_option.yml b/doc/user/data/mz-debug/mz_debug_option.yml index 21b13c477fa3d..a1e200c8d9658 100644 --- a/doc/user/data/mz-debug/mz_debug_option.yml +++ b/doc/user/data/mz-debug/mz_debug_option.yml @@ -5,7 +5,7 @@ rows: - Option: "`--dump-heap-profiles `" Description: | - If `true`, dump heap profiles (.pprof) from + If `true`, dump heap profiles (.pprof.gz) from your Materialize instance. Defaults to `true`. @@ -34,4 +34,4 @@ rows: connection URL](https://www.postgresql.org/docs/14/libpq-connect.html#LIBPQ-CONNSTRING). - Defaults to `postgres://127.0.0.1:6875/materialize?sslmode=prefer`. \ No newline at end of file + Defaults to `postgres://127.0.0.1:6875/materialize?sslmode=prefer`. diff --git a/src/mz-debug/BUILD.bazel b/src/mz-debug/BUILD.bazel index 8c9a7b23b8dbf..6848d696a6247 100644 --- a/src/mz-debug/BUILD.bazel +++ b/src/mz-debug/BUILD.bazel @@ -34,7 +34,7 @@ rust_binary( "@//misc/bazel/platforms:xlang_lto_enabled": ["-Clinker-plugin-lto"], "//conditions:default": [], }), - version = "0.1.0", + version = "0.2.0", deps = [ "//src/build-info:mz_build_info", "//src/cloud-resources:mz_cloud_resources", From 013aeaf21ea92dd54d5adea2e3598514fec7f1ad Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Wed, 14 May 2025 23:30:14 -0400 Subject: [PATCH 10/11] Parameterize relative directory paths in debug tool - Introduced constants for directory paths in `docker_dumper.rs`, `internal_http_dumper.rs`, and `system_catalog_dumper.rs` for readability - Updated the output file naming convention for heap profiles in `internal_http_dumper.rs` to use a `.gz` extension since they're really gzip files --- src/mz-debug/src/docker_dumper.rs | 3 ++- src/mz-debug/src/internal_http_dumper.rs | 10 ++++++---- src/mz-debug/src/system_catalog_dumper.rs | 3 ++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/mz-debug/src/docker_dumper.rs b/src/mz-debug/src/docker_dumper.rs index 8c1046fdd4010..3dd84f243101b 100644 --- a/src/mz-debug/src/docker_dumper.rs +++ b/src/mz-debug/src/docker_dumper.rs @@ -26,6 +26,7 @@ use tracing::{info, warn}; use crate::{ContainerDumper, Context}; +static DOCKER_DUMP_DIR: &str = "docker"; static DOCKER_RESOURCE_DUMP_TIMEOUT: Duration = Duration::from_secs(30); pub struct DockerDumper { @@ -36,7 +37,7 @@ pub struct DockerDumper { impl DockerDumper { pub fn new(context: &Context, container_id: String) -> Self { Self { - directory_path: context.base_path.join("docker").join(&container_id), + directory_path: context.base_path.join(DOCKER_DUMP_DIR).join(&container_id), container_id, } } diff --git a/src/mz-debug/src/internal_http_dumper.rs b/src/mz-debug/src/internal_http_dumper.rs index d960d4a9163c5..d0bc894f08dcc 100644 --- a/src/mz-debug/src/internal_http_dumper.rs +++ b/src/mz-debug/src/internal_http_dumper.rs @@ -22,13 +22,15 @@ use crate::kubectl_port_forwarder::{ }; use crate::{Context, EmulatorContext, SelfManagedContext}; +static HEAP_PROFILES_DIR: &str = "profiles"; +static PROM_METRICS_DIR: &str = "prom_metrics"; static PROM_METRICS_ENDPOINT: &str = "metrics"; static ENVD_HEAP_PROFILE_ENDPOINT: &str = "prof/heap"; static CLUSTERD_HEAP_PROFILE_ENDPOINT: &str = "heap"; static INTERNAL_HOST_ADDRESS: &str = "127.0.0.1"; static INTERNAL_HTTP_PORT: i32 = 6878; -/// A struct that handles downloading and saving profile data from HTTP endpoints +/// A struct that handles downloading and saving profile data from HTTP endpoints. pub struct InternalHttpDumpClient<'n> { context: &'n Context, http_client: &'n reqwest::Client, @@ -95,14 +97,14 @@ impl<'n> InternalHttpDumpClient<'n> { /// Downloads and saves heap profile data pub async fn dump_heap_profile(&self, relative_url: &str, service_name: &str) -> Result<()> { - let output_dir = self.context.base_path.join("profiles"); + let output_dir = self.context.base_path.join(HEAP_PROFILES_DIR); create_dir_all(&output_dir).await.with_context(|| { format!( "Failed to create output directory: {}", output_dir.display() ) })?; - let output_path = output_dir.join(format!("{}.memprof.pprof", service_name)); + let output_path = output_dir.join(format!("{}.memprof.pprof.gz", service_name)); self.dump_request_to_file( relative_url, @@ -127,7 +129,7 @@ impl<'n> InternalHttpDumpClient<'n> { relative_url: &str, service_name: &str, ) -> Result<()> { - let output_dir = self.context.base_path.join("prom_metrics"); + let output_dir = self.context.base_path.join(PROM_METRICS_DIR); create_dir_all(&output_dir).await.with_context(|| { format!( "Failed to create output directory: {}", diff --git a/src/mz-debug/src/system_catalog_dumper.rs b/src/mz-debug/src/system_catalog_dumper.rs index f0d6a0a0ecbb7..e0b9d6b7e646d 100644 --- a/src/mz-debug/src/system_catalog_dumper.rs +++ b/src/mz-debug/src/system_catalog_dumper.rs @@ -57,6 +57,7 @@ pub struct Relation { pub category: RelationCategory, } +static SYSTEM_CATALOG_DUMP_DIR: &str = "system_catalog"; /// This list is used to determine which relations to dump. /// The relations are grouped and delimited by their category (i.e. Basic object information) static RELATIONS: &[Relation] = &[ @@ -929,7 +930,7 @@ fn format_catalog_dump_error_message( } fn format_file_path(base_path: PathBuf, cluster_replica: Option<&ClusterReplica>) -> PathBuf { - let path = base_path.join("system-catalog"); + let path = base_path.join(SYSTEM_CATALOG_DUMP_DIR); if let Some(cluster_replica) = cluster_replica { path.join(cluster_replica.cluster_name.as_str()) .join(cluster_replica.replica_name.as_str()) From 09454c5b2aa7a08b44c956174f3982c995518ee2 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Thu, 15 May 2025 00:08:04 -0400 Subject: [PATCH 11/11] Add `url` dependency and update error logging format - Added `url` crate for URL parsing. - Updated warn messages in to print context for improved debugging --- Cargo.lock | 1 + src/mz-debug/Cargo.toml | 1 + src/mz-debug/src/internal_http_dumper.rs | 21 +++++++++++++++------ src/mz-debug/src/main.rs | 12 ++++++------ 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 16fccf45b0ab6..7358f3a92ff26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5570,6 +5570,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "url", "walkdir", "workspace-hack", "zip", diff --git a/src/mz-debug/Cargo.toml b/src/mz-debug/Cargo.toml index 27052e865e62a..ff417c92987f5 100644 --- a/src/mz-debug/Cargo.toml +++ b/src/mz-debug/Cargo.toml @@ -30,6 +30,7 @@ tokio-postgres = { version = "0.7.8" } tokio-util = { version = "0.7.15", features = ["io"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.19", default-features = false, features = ["env-filter", "fmt"] } +url = { version = "2.3.1", features = ["serde"] } walkdir = "2.5" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } zip = { version = "2.6", default-features = false, features=["deflate-flate2", "flate2"]} diff --git a/src/mz-debug/src/internal_http_dumper.rs b/src/mz-debug/src/internal_http_dumper.rs index d0bc894f08dcc..19de3df21ddcc 100644 --- a/src/mz-debug/src/internal_http_dumper.rs +++ b/src/mz-debug/src/internal_http_dumper.rs @@ -16,6 +16,7 @@ use std::path::Path; use tokio::fs::{File, create_dir_all}; use tokio::io::AsyncWriteExt; use tracing::{info, warn}; +use url::Url; use crate::kubectl_port_forwarder::{ KubectlPortForwarder, find_cluster_services, find_environmentd_service, @@ -52,18 +53,26 @@ impl<'n> InternalHttpDumpClient<'n> { output_path: &Path, ) -> Result<(), anyhow::Error> { // Try HTTPS first, then fall back to HTTP if that fails - let mut url = format!("https://{}", relative_url); + let mut url = Url::parse(&format!("https://{}", relative_url)) + .with_context(|| format!("Failed to parse URL: https://{}", relative_url))?; + let mut response = self .http_client - .get(&url) + .get(url.to_string()) .headers(headers.clone()) .send() .await; if response.is_err() { // Fall back to HTTP if HTTPS fails - url = format!("http://{}", relative_url); - response = self.http_client.get(&url).headers(headers).send().await; + let _ = url.set_scheme("http"); + + response = self + .http_client + .get(url.to_string()) + .headers(headers) + .send() + .await; } let response = response.with_context(|| format!("Failed to send request to {}", url))?; @@ -175,7 +184,7 @@ pub async fn dump_emulator_http_resources( ) .await { - warn!("Failed to dump heap profile: {}", e); + warn!("Failed to dump heap profile: {:#}", e); } } @@ -194,7 +203,7 @@ pub async fn dump_emulator_http_resources( ) .await { - warn!("Failed to dump prometheus metrics: {}", e); + warn!("Failed to dump prometheus metrics: {:#}", e); } } diff --git a/src/mz-debug/src/main.rs b/src/mz-debug/src/main.rs index bb6afe8179783..7abbfd910af40 100644 --- a/src/mz-debug/src/main.rs +++ b/src/mz-debug/src/main.rs @@ -247,7 +247,7 @@ async fn initialize_context( match port_forwarder.spawn_port_forward().await { Ok(process) => Some(process), Err(err) => { - warn!("{}", err); + warn!("{:#}", err); None } } @@ -346,12 +346,12 @@ async fn run(context: Context) -> Result<(), anyhow::Error> { match &context.debug_mode_context { DebugModeContext::SelfManaged(self_managed_context) => { if let Err(e) = dump_self_managed_http_resources(&context, self_managed_context).await { - warn!("Failed to dump self-managed http resources: {}", e); + warn!("Failed to dump self-managed http resources: {:#}", e); } } DebugModeContext::Emulator(emulator_context) => { if let Err(e) = dump_emulator_http_resources(&context, emulator_context).await { - warn!("Failed to dump emulator http resources: {}", e); + warn!("Failed to dump emulator http resources: {:#}", e); } } }; @@ -366,7 +366,7 @@ async fn run(context: Context) -> Result<(), anyhow::Error> { { Ok(dumper) => Some(dumper), Err(e) => { - warn!("Failed to dump system catalog: {}", e); + warn!("Failed to dump system catalog: {:#}", e); None } }; @@ -381,9 +381,9 @@ async fn run(context: Context) -> Result<(), anyhow::Error> { let zip_file_name = format!("{}.zip", &context.base_path.display()); if let Err(e) = zip_debug_folder(PathBuf::from(&zip_file_name), &context.base_path) { - warn!("Failed to zip debug directory: {}", e); + warn!("Failed to zip debug directory: {:#}", e); } else { - info!("Created zip debug at {}", &zip_file_name); + info!("Created zip debug at {:#}", &zip_file_name); } Ok(())