Skip to content

Commit 754036d

Browse files
committed
Remove hacky sleep when port forwarding
- Instead of waiting until port forwarding, we read from stderr until we know for sure port forwarding has started - Renamed `create_kubectl_port_forwarder` to `create_pg_wire_port_forwarder` since we'll be port forwarding more than once - Removed the retry mechanism given we don't see the port forwarding disconnecting too often and we can't do it AND initial acknowledgement together easily - Tie child process to a struct instead of a tokio spawn.
1 parent a7bb9a2 commit 754036d

File tree

2 files changed

+85
-98
lines changed

2 files changed

+85
-98
lines changed

src/mz-debug/src/kubectl_port_forwarder.rs

Lines changed: 61 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@ use anyhow::Result;
1919
use k8s_openapi::api::core::v1::Service;
2020
use kube::api::ListParams;
2121
use kube::{Api, Client};
22+
use tokio::io::AsyncBufReadExt;
2223

23-
use std::time::Duration;
24-
25-
use mz_ore::retry::{self, RetryResult};
26-
use tracing::{info, warn};
24+
use tracing::info;
2725

2826
use crate::SelfManagedDebugMode;
29-
#[derive(Debug, Clone)]
27+
#[derive(Debug)]
3028
pub struct KubectlPortForwarder {
3129
pub namespace: String,
3230
pub service_name: String,
@@ -36,89 +34,73 @@ pub struct KubectlPortForwarder {
3634
pub context: Option<String>,
3735
}
3836

39-
impl KubectlPortForwarder {
40-
/// Port forwards a given k8s service via Kubectl.
41-
/// The process will retry if the port-forwarding fails and
42-
/// will terminate once the port forwarding reaches the max number of retries.
43-
/// We retry since kubectl port-forward is flaky.
44-
pub async fn port_forward(&self) {
45-
if let Err(err) = retry::Retry::default()
46-
.max_duration(Duration::from_secs(60))
47-
.retry_async(|retry_state| {
48-
let k8s_context = self.context.clone();
49-
let namespace = self.namespace.clone();
50-
let service_name = self.service_name.clone();
51-
let local_address = self.local_address.clone();
52-
let local_port = self.local_port;
53-
let target_port = self.target_port;
54-
55-
info!(
56-
"Spawning port forwarding process for {} from ports {}:{} -> {}",
57-
service_name, local_address, local_port, target_port
58-
);
37+
pub struct PortForwardConnection {
38+
// tokio process that's killed on drop
39+
pub _port_forward_process: tokio::process::Child,
40+
}
5941

60-
async move {
61-
let port_arg_str = format!("{}:{}", &local_port, &target_port);
62-
let service_name_arg_str = format!("services/{}", &service_name);
63-
let mut args = vec![
64-
"port-forward",
65-
&service_name_arg_str,
66-
&port_arg_str,
67-
"-n",
68-
&namespace,
69-
"--address",
70-
&local_address,
71-
];
72-
73-
if let Some(k8s_context) = &k8s_context {
74-
args.extend(["--context", k8s_context]);
75-
}
42+
impl KubectlPortForwarder {
43+
/// Spawns a port forwarding process that resolves when
44+
/// the port forward is established.
45+
pub async fn spawn_port_forward(&mut self) -> Result<PortForwardConnection, anyhow::Error> {
46+
let port_arg_str = format!("{}:{}", &self.local_port, &self.target_port);
47+
let service_name_arg_str = format!("services/{}", &self.service_name);
48+
let mut args = vec![
49+
"port-forward",
50+
&service_name_arg_str,
51+
&port_arg_str,
52+
"-n",
53+
&self.namespace,
54+
"--address",
55+
&self.local_address,
56+
];
57+
58+
if let Some(k8s_context) = &self.context {
59+
args.extend(["--context", k8s_context]);
60+
}
7661

77-
match tokio::process::Command::new("kubectl")
78-
.args(args)
79-
// Silence stdout/stderr
80-
.stdout(std::process::Stdio::null())
81-
.stderr(std::process::Stdio::null())
82-
.kill_on_drop(true)
83-
.output()
84-
.await
85-
{
86-
Ok(output) => {
87-
if !output.status.success() {
88-
let retry_err_msg = format!(
89-
"Failed to port-forward{}: {}",
90-
retry_state.next_backoff.map_or_else(
91-
|| "".to_string(),
92-
|d| format!(", retrying in {:?}", d)
93-
),
94-
String::from_utf8_lossy(&output.stderr)
95-
);
96-
warn!("{}", retry_err_msg);
97-
98-
return RetryResult::RetryableErr(anyhow::anyhow!(retry_err_msg));
99-
}
100-
}
101-
Err(err) => {
102-
return RetryResult::RetryableErr(anyhow::anyhow!(
103-
"Failed to port-forward: {}",
104-
err
105-
));
62+
let child = tokio::process::Command::new("kubectl")
63+
.args(args)
64+
// Silence stdout
65+
.stdout(std::process::Stdio::null())
66+
.stderr(std::process::Stdio::piped())
67+
.kill_on_drop(true)
68+
.spawn();
69+
70+
if let Ok(mut child) = child {
71+
if let Some(stderr) = child.stderr.take() {
72+
let stderr_reader = tokio::io::BufReader::new(stderr);
73+
// Wait until we know port forwarding is established
74+
let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
75+
let mut lines = stderr_reader.lines();
76+
while let Ok(Some(line)) = lines.next_line().await {
77+
if line.contains("Forwarding from") {
78+
break;
10679
}
10780
}
108-
// The kubectl subprocess's future will only resolve on error, thus the
109-
// code here is unreachable. We return RetryResult::Ok to satisfy
110-
// the type checker.
111-
RetryResult::Ok(())
81+
})
82+
.await;
83+
84+
if timeout.is_err() {
85+
return Err(anyhow::anyhow!("Port forwarding timed out after 5 seconds"));
11286
}
113-
})
114-
.await
115-
{
116-
warn!("{}", err);
87+
88+
info!(
89+
"Port forwarding established for {} from ports {}:{} -> {}",
90+
&self.service_name, &self.local_address, &self.local_port, &self.target_port
91+
);
92+
93+
return Ok(PortForwardConnection {
94+
_port_forward_process: child,
95+
});
96+
}
11797
}
98+
Err(anyhow::anyhow!("Failed to spawn port forwarding process"))
11899
}
119100
}
120101

121-
pub async fn create_kubectl_port_forwarder(
102+
/// Creates a port forwarder for the external pg wire port of balancerd.
103+
pub async fn create_pg_wire_port_forwarder(
122104
client: &Client,
123105
args: &SelfManagedDebugMode,
124106
) -> Result<KubectlPortForwarder, anyhow::Error> {

src/mz-debug/src/main.rs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ use kube::{Client, Config};
1919
use mz_build_info::{BuildInfo, build_info};
2020
use mz_ore::cli::{self, CliConfig};
2121
use mz_ore::error::ErrorExt;
22-
use mz_ore::task;
2322
use tracing::{error, info, warn};
2423
use tracing_subscriber::EnvFilter;
2524
use tracing_subscriber::layer::SubscriberExt;
2625
use tracing_subscriber::util::SubscriberInitExt;
2726

2827
use crate::docker_dumper::DockerDumper;
2928
use crate::k8s_dumper::K8sDumper;
30-
use crate::kubectl_port_forwarder::create_kubectl_port_forwarder;
29+
use crate::kubectl_port_forwarder::create_pg_wire_port_forwarder;
3130
use crate::utils::{
3231
create_tracing_log_file, format_base_path, validate_pg_connection_string, zip_debug_folder,
3332
};
@@ -63,6 +62,8 @@ pub struct SelfManagedDebugMode {
6362
/// If true, the tool will dump the values of secrets in the Kubernetes cluster.
6463
#[clap(long, default_value = "false", action = clap::ArgAction::Set)]
6564
k8s_dump_secret_values: bool,
65+
// TODO (debug_tool1): Convert port forwarding variables into a map since we'll be
66+
// portforwarding multiple times
6667
/// If true, the tool will automatically port-forward the external SQL port in the Kubernetes cluster.
6768
/// If dump_k8s is false however, we will not automatically port-forward.
6869
#[clap(long, default_value = "true", action = clap::ArgAction::Set)]
@@ -221,8 +222,9 @@ async fn main() {
221222

222223
async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> {
223224
// Depending on if the user is debugging either a k8s environments or docker environment,
224-
// dump the respective system's resources.
225-
let container_system_dumper = match &args.debug_mode {
225+
// dump the respective system's resources and spin up a port forwarder if auto port forwarding.
226+
// We'd like to keep the port forwarder alive until the end of the program.
227+
let (container_system_dumper, _pg_wire_port_forward_connection) = match &args.debug_mode {
226228
DebugMode::SelfManaged(args) => {
227229
if args.dump_k8s {
228230
let client = match create_k8s_client(args.k8s_context.clone()).await {
@@ -232,27 +234,30 @@ async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> {
232234
return Err(e);
233235
}
234236
};
235-
236-
if args.auto_port_forward {
237-
let port_forwarder = create_kubectl_port_forwarder(&client, args).await?;
238-
task::spawn(|| "port-forwarding", async move {
239-
port_forwarder.port_forward().await;
240-
});
241-
// There may be a delay between when the port forwarding process starts and when it's ready
242-
// to use. We wait a few seconds to ensure that port forwarding is ready.
243-
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
244-
}
245-
237+
let port_forward_connection = if args.auto_port_forward {
238+
let mut port_forwarder = create_pg_wire_port_forwarder(&client, args).await?;
239+
240+
match port_forwarder.spawn_port_forward().await {
241+
Ok(connection) => Some(connection),
242+
Err(err) => {
243+
warn!("{}", err);
244+
None
245+
}
246+
}
247+
} else {
248+
None
249+
};
250+
// Parker: Return it as a tuple (dumper, port_forwarder)
246251
let dumper = ContainerServiceDumper::new_k8s_dumper(
247252
&context,
248253
client,
249254
args.k8s_namespaces.clone(),
250255
args.k8s_context.clone(),
251256
args.k8s_dump_secret_values,
252257
);
253-
Some(dumper)
258+
(Some(dumper), port_forward_connection)
254259
} else {
255-
None
260+
(None, None)
256261
}
257262
}
258263
DebugMode::Emulator(args) => {
@@ -263,9 +268,9 @@ async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> {
263268
.expect("docker_container_id is required");
264269
let dumper =
265270
ContainerServiceDumper::new_docker_dumper(&context, docker_container_id);
266-
Some(dumper)
271+
(Some(dumper), None)
267272
} else {
268-
None
273+
(None, None)
269274
}
270275
}
271276
};

0 commit comments

Comments
 (0)