Skip to content

Commit 42653ac

Browse files
authored
add port forwarding (#53)
* refactor cli * add port forwarding on remote * add local port forwarding command
1 parent 98b5257 commit 42653ac

11 files changed

Lines changed: 323 additions & 119 deletions

File tree

coman/.config/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ workdir = "{{container_workdir}}"
5050
{% if iroh_secret%}
5151
COMAN_IROH_SECRET="{{iroh_secret}}"
5252
{% endif %}
53+
{% if port_forward %}
54+
COMAN_FORWARDED_PORTS="{{port_forward}}"
55+
{% endif %}
5356
5457
[annotations]
5558
{% if ssh_public_key %}
Lines changed: 21 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
1-
use std::{error::Error, path::PathBuf, str::FromStr, thread, time::Duration};
1+
use std::{error::Error, path::PathBuf, str::FromStr, thread};
22

3-
use base64::prelude::*;
43
use clap::{Args, Command, Parser, Subcommand, ValueHint, builder::TypedValueParser};
54
use clap_complete::{ArgValueCompleter, CompletionCandidate, Generator, Shell, generate};
6-
use color_eyre::{Report, Result, eyre::eyre};
7-
use iroh_ssh::IrohSsh;
5+
use color_eyre::{Report, Result};
86
use itertools::Itertools;
9-
use pid1::Pid1Settings;
10-
use rust_supervisor::{ChildType, Supervisor, SupervisorConfig};
117
use strum::VariantNames;
128
use tokio::sync::mpsc;
139

@@ -16,9 +12,9 @@ use crate::{
1612
cscs::{
1713
api_client::{
1814
client::{EdfSpec as EdfSpecEnum, ScriptSpec as ScriptSpecEnum},
19-
types::{JobStatus, PathType},
15+
types::PathType,
2016
},
21-
handlers::{cscs_file_list, cscs_job_details, cscs_job_list, file_system_roots},
17+
handlers::{cscs_file_list, cscs_job_list, file_system_roots},
2218
},
2319
util::types::DockerImageUrl,
2420
};
@@ -133,6 +129,18 @@ pub enum CscsCommands {
133129
#[command(subcommand)]
134130
command: CscsSystemCommands,
135131
},
132+
#[clap(
133+
alias("pf"),
134+
about = "Forward a local port to a remote port for a job. Note that the port needs to have been exposed with the -P flag on job submission [aliases: pf]"
135+
)]
136+
PortForward {
137+
#[arg(short, long, help = "Local port to forward from")]
138+
source_port: u16,
139+
#[arg(short, long, help = "Remote port to forward to")]
140+
destination_port: u16,
141+
#[arg(help="id or name of the job (name uses newest job of that name)", add = ArgValueCompleter::new(job_id_or_name_completer))]
142+
job: JobIdOrName,
143+
},
136144
}
137145

138146
#[derive(Args, Clone, Debug)]
@@ -257,6 +265,11 @@ pub enum CscsJobCommands {
257265
help="Environment variables to set in the container",
258266
value_hint=ValueHint::Other)]
259267
env: Vec<(String, String)>,
268+
#[clap(short='P',
269+
value_name="TARGET",
270+
help="Ports to forward from the container",
271+
value_hint=ValueHint::Other)]
272+
port_forward: Vec<u16>,
260273
#[clap(short='M',
261274
value_name="PATH:CONTAINER_PATH",
262275
value_parser=parse_key_val_colon::<String,String>,
@@ -543,79 +556,3 @@ fn is_bare_string(value_str: &str) -> bool {
543556
pub fn print_completions<G: Generator>(generator: G, cmd: &mut Command) {
544557
generate(generator, cmd, cmd.get_name().to_string(), &mut std::io::stdout());
545558
}
546-
547-
/// Runs a wrapped command in a container-safe way and potentially runs background processes like iroh-ssh
548-
pub(crate) async fn cli_exec_command(command: Vec<String>) -> Result<()> {
549-
// Pid1 takes care of proper terminating of processes and signal handling when running in a container
550-
Pid1Settings::new()
551-
.enable_log(true)
552-
.timeout(Duration::from_secs(2))
553-
.launch()
554-
.expect("Launch failed");
555-
556-
let mut supervisor = Supervisor::new(SupervisorConfig::default());
557-
supervisor.add_process("iroh-ssh", ChildType::Permanent, || {
558-
thread::spawn(|| {
559-
let rt = tokio::runtime::Builder::new_current_thread()
560-
.enable_all()
561-
.build()
562-
.expect("couldn't start tokio");
563-
564-
// Call the asynchronous connect method using the runtime.
565-
rt.block_on(async move {
566-
let mut builder = IrohSsh::builder().accept_incoming(true).accept_port(15263);
567-
if let Ok(secret) = std::env::var("COMAN_IROH_SECRET") {
568-
let secret_key = BASE64_STANDARD.decode(secret).unwrap();
569-
let secret_key: &[u8; 32] = secret_key[0..32].try_into().unwrap();
570-
builder = builder.secret_key(secret_key);
571-
}
572-
573-
let server = builder.build().await.expect("couldn't create iroh server");
574-
println!("{}@{}", whoami::username(), server.node_id());
575-
loop {
576-
tokio::time::sleep(Duration::from_secs(60)).await;
577-
}
578-
});
579-
})
580-
});
581-
supervisor.add_process("main-process", ChildType::Temporary, move || {
582-
let command = command.clone();
583-
thread::spawn(move || {
584-
let mut child = std::process::Command::new(command[0].clone())
585-
.args(&command[1..])
586-
.spawn()
587-
.expect("Failed to start compute job");
588-
child.wait().expect("Failed to wait on compute job");
589-
})
590-
});
591-
592-
let supervisor = supervisor.start_monitoring();
593-
loop {
594-
thread::sleep(Duration::from_secs(1));
595-
596-
if let Some(rust_supervisor::ProcessState::Failed | rust_supervisor::ProcessState::Stopped) =
597-
supervisor.get_process_state("main-process")
598-
{
599-
break;
600-
}
601-
}
602-
Ok(())
603-
}
604-
605-
/// Thin wrapper around iroh proxy
606-
pub(crate) async fn cli_proxy_command(system: String, job_id: i64) -> Result<()> {
607-
let data_dir = get_data_dir();
608-
let job_info = cscs_job_details(job_id, Some(system.clone()), None).await?;
609-
if job_info.is_none() {
610-
return Err(eyre!("remote job does not exist!"));
611-
} else if let Some(job_info) = job_info
612-
&& job_info.status != JobStatus::Running
613-
{
614-
return Err(eyre!("remote job is not in running state, connection not available"));
615-
}
616-
let endpoint_id = std::fs::read_to_string(data_dir.join(format!("{}_{}.endpoint", system, job_id)))?;
617-
println!("{}", endpoint_id);
618-
iroh_ssh::api::proxy_mode(iroh_ssh::ProxyArgs { node_id: endpoint_id })
619-
.await
620-
.map_err(|e| eyre!("couldn't proxy ssh connection: {:?}", e))
621-
}

coman/src/cli/exec.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
use std::{thread, time::Duration};
2+
3+
use base64::prelude::*;
4+
use color_eyre::Result;
5+
use iroh::{Endpoint, SecretKey};
6+
use iroh_ssh::IrohSsh;
7+
use pid1::Pid1Settings;
8+
use rust_supervisor::{ChildType, Supervisor, SupervisorConfig};
9+
use tokio::{net::TcpStream, task::JoinSet};
10+
11+
const SECRET_KEY_ENV: &str = "COMAN_IROH_SECRET";
12+
const PORT_FORWARD_ENV: &str = "COMAN_FORWARDED_PORTS";
13+
14+
fn get_secret_key() -> Option<Vec<u8>> {
15+
if let Ok(secret) = std::env::var(SECRET_KEY_ENV) {
16+
let secret_key = BASE64_STANDARD.decode(secret).unwrap();
17+
Some(secret_key)
18+
} else {
19+
None
20+
}
21+
}
22+
23+
#[tokio::main]
24+
async fn run_ssh() -> Result<()> {
25+
let mut builder = IrohSsh::builder().accept_incoming(true).accept_port(15263);
26+
if let Some(secret_key) = get_secret_key() {
27+
let secret_key: &[u8; 32] = secret_key[0..32].try_into().unwrap();
28+
builder = builder.secret_key(secret_key);
29+
}
30+
let server = builder.build().await.expect("couldn't create iroh server");
31+
println!("{}@{}", whoami::username(), server.node_id());
32+
loop {
33+
tokio::time::sleep(Duration::from_secs(60)).await;
34+
}
35+
}
36+
37+
#[tokio::main]
38+
async fn port_forward() -> Result<()> {
39+
let Some(secret_key) = get_secret_key() else {
40+
return Ok(());
41+
};
42+
let secret_key: &[u8; 32] = secret_key[0..32].try_into().unwrap();
43+
let secret_key = SecretKey::from_bytes(secret_key);
44+
if let Ok(forwarded_ports) = std::env::var(PORT_FORWARD_ENV) {
45+
let mut join_set = JoinSet::new();
46+
for port in forwarded_ports.split(',') {
47+
let alpn: Vec<u8> = format!("/coman/{port}").into_bytes();
48+
let endpoint = Endpoint::builder()
49+
.secret_key(secret_key.clone())
50+
.alpns(vec![alpn])
51+
.bind()
52+
.await?;
53+
let port = port.to_owned();
54+
join_set.spawn(async move {
55+
while let Some(incoming) = endpoint.accept().await {
56+
let connection = incoming.await.unwrap();
57+
match connection.accept_bi().await {
58+
Ok((mut iroh_send, mut iroh_recv)) => {
59+
match TcpStream::connect(format!("127.0.0.1:{port}")).await {
60+
Ok(mut stream) => {
61+
let (mut local_read, mut local_write) = stream.split();
62+
let a_to_b = async move { tokio::io::copy(&mut local_read, &mut iroh_send).await };
63+
let b_to_a = async move { tokio::io::copy(&mut iroh_recv, &mut local_write).await };
64+
65+
tokio::select! {
66+
result = a_to_b => {
67+
println!("{port}->Iroh stream ended: {result:?}");
68+
},
69+
result = b_to_a => {
70+
println!("Iroh->{port} stream ended: {result:?}");
71+
},
72+
};
73+
}
74+
Err(e) => {
75+
println!("Failed to connect to {port}: {e:?}");
76+
}
77+
}
78+
}
79+
Err(e) => {
80+
println!("Failed to accept stream to {port}: {e:?}");
81+
}
82+
}
83+
}
84+
});
85+
}
86+
while let Some(res) = join_set.join_next().await {
87+
println!("Task joined: {res:?}");
88+
}
89+
}
90+
91+
Ok(())
92+
}
93+
94+
/// Runs a wrapped command in a container-safe way and potentially runs background processes like iroh-ssh
95+
pub(crate) async fn cli_exec_command(command: Vec<String>) -> Result<()> {
96+
// Pid1 takes care of proper terminating of processes and signal handling when running in a container
97+
Pid1Settings::new()
98+
.enable_log(true)
99+
.timeout(Duration::from_secs(2))
100+
.launch()
101+
.expect("Launch failed");
102+
103+
let mut supervisor = Supervisor::new(SupervisorConfig::default());
104+
supervisor.add_process("iroh-ssh", ChildType::Permanent, || {
105+
thread::spawn(|| {
106+
let _ = run_ssh();
107+
})
108+
});
109+
supervisor.add_process("port-forward", ChildType::Permanent, || {
110+
thread::spawn(|| {
111+
let _ = port_forward();
112+
})
113+
});
114+
supervisor.add_process("main-process", ChildType::Temporary, move || {
115+
let command = command.clone();
116+
thread::spawn(move || {
117+
let mut child = std::process::Command::new(command[0].clone())
118+
.args(&command[1..])
119+
.spawn()
120+
.expect("Failed to start compute job");
121+
child.wait().expect("Failed to wait on compute job");
122+
})
123+
});
124+
125+
let supervisor = supervisor.start_monitoring();
126+
loop {
127+
thread::sleep(Duration::from_secs(1));
128+
129+
if let Some(rust_supervisor::ProcessState::Failed | rust_supervisor::ProcessState::Stopped) =
130+
supervisor.get_process_state("main-process")
131+
{
132+
break;
133+
}
134+
}
135+
Ok(())
136+
}

coman/src/cli/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
pub mod app;
2+
pub mod exec;
3+
pub mod port_forward;
4+
pub mod proxy;

coman/src/cli/port_forward.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

coman/src/cli/proxy.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use color_eyre::{Result, eyre::eyre};
2+
3+
use crate::{
4+
config::get_data_dir,
5+
cscs::{api_client::types::JobStatus, handlers::cscs_job_details},
6+
};
7+
8+
/// Thin wrapper around iroh proxy
9+
pub(crate) async fn cli_proxy_command(system: String, job_id: i64) -> Result<()> {
10+
let data_dir = get_data_dir();
11+
let job_info = cscs_job_details(job_id, Some(system.clone()), None).await?;
12+
if job_info.is_none() {
13+
return Err(eyre!("remote job does not exist!"));
14+
} else if let Some(job_info) = job_info
15+
&& job_info.status != JobStatus::Running
16+
{
17+
return Err(eyre!("remote job is not in running state, connection not available"));
18+
}
19+
let endpoint_id = std::fs::read_to_string(data_dir.join(format!("{}_{}.endpoint", system, job_id)))?;
20+
println!("{}", endpoint_id);
21+
iroh_ssh::api::proxy_mode(iroh_ssh::ProxyArgs { node_id: endpoint_id })
22+
.await
23+
.map_err(|e| eyre!("couldn't proxy ssh connection: {:?}", e))
24+
}

coman/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ pub struct CscsConfig {
5858
#[serde(default)]
5959
pub env: HashMap<String, String>,
6060
#[serde(default)]
61+
pub port_forward: Vec<u16>,
62+
#[serde(default)]
6163
pub image: Option<String>,
6264
#[serde(default)]
6365
pub edf_file_template: String,

coman/src/cscs/api_client/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub struct JobStartOptions {
4747
pub stderr: Option<PathBuf>,
4848
pub container_workdir: Option<String>,
4949
pub env: Vec<(String, String)>,
50+
pub port_forward: Vec<u16>,
5051
pub mount: Vec<(String, String)>,
5152
pub edf_spec: EdfSpec,
5253
pub script_spec: ScriptSpec,

coman/src/cscs/cli.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ use tokio::{
1616
};
1717

1818
use crate::{
19-
cli::JobIdOrName,
19+
cli::app::JobIdOrName,
2020
config::ComputePlatform,
2121
cscs::{
2222
api_client::{client::JobStartOptions, types::JobStatus},
2323
handlers::{
2424
cscs_file_delete, cscs_file_download, cscs_file_list, cscs_file_upload, cscs_job_cancel, cscs_job_details,
25-
cscs_job_list, cscs_job_log, cscs_job_start, cscs_login, cscs_system_list, cscs_system_set,
25+
cscs_job_list, cscs_job_log, cscs_job_start, cscs_login, cscs_port_forward, cscs_system_list,
26+
cscs_system_set,
2627
},
2728
},
2829
};
@@ -123,6 +124,17 @@ pub(crate) async fn cli_cscs_job_log(
123124
}
124125
}
125126

127+
pub(crate) async fn cli_cscs_port_forward(
128+
source_port: u16,
129+
destination_port: u16,
130+
job: JobIdOrName,
131+
system: Option<String>,
132+
platform: Option<ComputePlatform>,
133+
) -> Result<()> {
134+
let job_id = maybe_job_id_from_name(job, system.clone(), platform.clone()).await?;
135+
cscs_port_forward(job_id, source_port, destination_port, system).await
136+
}
137+
126138
#[allow(clippy::too_many_arguments)]
127139
pub(crate) async fn cli_cscs_job_start(
128140
name: Option<String>,

0 commit comments

Comments
 (0)