Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ command = ["sleep", "1"] # command to execute within the container, i.e. the job

workdir = "/scratch" # working directory within container

port_forward = [12345, 8080] # ports to open in container for port-forwarding

# the sbatch script you want to execute
# this gets templated with values specified in the {{}} and {% %} expressions (see https://keats.github.io/tera/docs/#templates for
# more information on the template language). Note, this can also just be hardcoded without any template parameters.
Expand Down Expand Up @@ -358,6 +360,24 @@ Creating the ssh connection involves several steps, all handled by coman:
- Including the SSH config in `.ssh/config` so it's accessible in other tools
- Garbage collecting old SSH connections for jobs that are not running anymore

### Port Forwarding

Port forwarding consists of two steps:
- configuring ports in the container that can be forwarded to
- forwarding a local port to one of the configured ports

To configure forwardable ports in the container, either use the `-P <port>` flag or the `cscs.port_forward` config value.

To forward a local port, use the `coman cscs port-forward` command.

Example:
```shell
coman cscs job submit -i python -P 32100 -n myjob -- python3 -m http.server 32100 # run python built-in http server and add port for forwarding.
coman cscs port-forward -s 32100 -d 32100 myjob # forward local 32100 to remote 32100 for job `myjob`

# open http://localhost:32100 in your browser, you should see a file listing
```

## Development

### Prerequisites
Expand Down
1 change: 1 addition & 0 deletions coman/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ dirs = "6.0.0"
iroh = "0.95.1"
rand = "0.9.2"
regex = "1.12.2"
sha2 = "0.10.9"

[build-dependencies]
anyhow = "1.0.90"
Expand Down
2 changes: 2 additions & 0 deletions coman/src/cli/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ pub enum CscsSystemCommands {
},
}

pub const COMAN_VERSION: &str = env!("CARGO_PKG_VERSION");

const VERSION_MESSAGE: &str = concat!(
env!("CARGO_PKG_VERSION"),
"-",
Expand Down
95 changes: 52 additions & 43 deletions coman/src/cli/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ use base64::prelude::*;
use color_eyre::Result;
use iroh::{
Endpoint, SecretKey,
endpoint::ConnectionError,
protocol::{ProtocolHandler, Router},
};
use iroh_ssh::IrohSsh;
use pid1::Pid1Settings;
use rust_supervisor::{ChildType, Supervisor, SupervisorConfig};
use tokio::{net::TcpStream, task::JoinSet};
use tokio::{io::AsyncWriteExt, net::TcpStream};

const SECRET_KEY_ENV: &str = "COMAN_IROH_SECRET";
const PORT_FORWARD_ENV: &str = "COMAN_FORWARDED_PORTS";
const SSH_PORT: u16 = 15263;

fn get_secret_key() -> Option<Vec<u8>> {
if let Ok(secret) = std::env::var(SECRET_KEY_ENV) {
Expand All @@ -23,19 +24,6 @@ fn get_secret_key() -> Option<Vec<u8>> {
}
}

#[tokio::main]
async fn run_ssh() -> Result<()> {
let mut builder = IrohSsh::builder().accept_incoming(true).accept_port(15263);
if let Some(secret_key) = get_secret_key() {
let secret_key: &[u8; 32] = secret_key[0..32].try_into().unwrap();
builder = builder.secret_key(secret_key);
}
let server = builder.build().await.expect("couldn't create iroh server");
println!("{}@{}", whoami::username(), server.node_id());
tokio::signal::ctrl_c().await?;
Ok(())
}

#[derive(Debug)]
struct PortForwardHandler {
port: u16,
Expand All @@ -56,7 +44,15 @@ impl ProtocolHandler for PortForwardHandler {

let (mut local_read, mut local_write) = output_stream.split();

let a_to_b = async move { tokio::io::copy(&mut local_read, &mut iroh_send).await };
let a_to_b = async move {
let res = tokio::io::copy(&mut local_read, &mut iroh_send).await;
if res.is_ok() {
iroh_send.flush().await.expect("couldn't flush stream");
iroh_send.finish().expect("couldn't finish stream");
iroh_send.stopped().await.expect("stream not properly stopped");
}
res
};
let b_to_a = async move { tokio::io::copy(&mut iroh_recv, &mut local_write).await };

tokio::select! {
Expand All @@ -67,6 +63,19 @@ impl ProtocolHandler for PortForwardHandler {
println!("Iroh->{port} stream ended: {result:?}");
},
};
// wait for client to close connection so we don't close prematurely
let res = tokio::time::timeout(Duration::from_secs(3), async move {
let closed = connection.closed().await;
if !matches!(closed, ConnectionError::ApplicationClosed(_)) {
println!("endpoint disconnected witn an error: {closed:#}");
} else {
println!("connection closed");
}
})
.await;
if res.is_err() {
println!("endpoint did not disconnect within 3 seconds");
}
}
Err(e) => {
println!("Failed to connect to local server {port}: {e}");
Expand All @@ -88,30 +97,35 @@ async fn port_forward() -> Result<()> {
};
let secret_key: &[u8; 32] = secret_key[0..32].try_into().unwrap();
let secret_key = SecretKey::from_bytes(secret_key);
if let Ok(forwarded_ports) = std::env::var(PORT_FORWARD_ENV) {
println!("setting up port forwarding...");
let mut join_set = JoinSet::new();
for port in forwarded_ports.split(',') {
let alpn: Vec<u8> = format!("/coman/{port}").into_bytes();
let endpoint = Endpoint::builder()
.secret_key(secret_key.clone())
.alpns(vec![alpn.clone()])
.bind()
.await?;

let port = port.to_owned();
join_set.spawn(async move {
let handler = PortForwardHandler {
port: port.parse::<u16>().expect("couldn't parse port"),
};
Router::builder(endpoint.clone()).accept(&alpn, handler).spawn();
});
}
while let Some(res) = join_set.join_next().await {
println!("Task joined: {res:?}");
}
let mut forwarded_ports = vec!["ssh".to_owned()];
if let Ok(env_ports) = std::env::var(PORT_FORWARD_ENV) {
forwarded_ports.extend(env_ports.split(',').map(|p| p.to_owned()).collect::<Vec<String>>());
}
let endpoint = Endpoint::builder().secret_key(secret_key.clone()).bind().await?;
let id = endpoint.id();
println!("endpoint: {id}");

println!("setting up port forwarding...");
let mut builder = Router::builder(endpoint.clone());
for port in forwarded_ports {
let (port, alpn) = if port == "ssh" {
(SSH_PORT, "/iroh/ssh".to_string())
} else {
(
port.parse::<u16>().expect("couldn't parse port"),
format!("/coman/{port}"),
)
};

let handler = PortForwardHandler { port };
builder = builder.accept(alpn.clone().into_bytes(), handler);
println!("set up port forwarding for port {port} ({alpn})");
}
let _router = builder.spawn();
println!("port forwarding started");

let _ = tokio::signal::ctrl_c().await;
println!("port forwarding stopped");
Ok(())
}

Expand All @@ -125,11 +139,6 @@ pub(crate) async fn cli_exec_command(command: Vec<String>) -> Result<()> {
.expect("Launch failed");

let mut supervisor = Supervisor::new(SupervisorConfig::default());
supervisor.add_process("iroh-ssh", ChildType::Permanent, || {
thread::spawn(|| {
let _ = run_ssh();
})
});
supervisor.add_process("port-forward", ChildType::Permanent, || {
thread::spawn(|| {
let _ = port_forward();
Expand Down
1 change: 0 additions & 1 deletion coman/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod app;
pub mod exec;
pub mod port_forward;
pub mod proxy;
1 change: 0 additions & 1 deletion coman/src/cli/port_forward.rs

This file was deleted.

9 changes: 7 additions & 2 deletions coman/src/cscs/api_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use firecrest_client::{
get_compute_system_jobs, post_compute_system_job,
},
filesystem_api::{
delete_filesystem_ops_rm, get_filesystem_ops_download, get_filesystem_ops_ls, get_filesystem_ops_stat,
get_filesystem_ops_tail, post_filesystem_ops_mkdir, post_filesystem_ops_upload,
delete_filesystem_ops_rm, get_filesystem_ops_checksum, get_filesystem_ops_download, get_filesystem_ops_ls,
get_filesystem_ops_stat, get_filesystem_ops_tail, post_filesystem_ops_mkdir, post_filesystem_ops_upload,
post_filesystem_transfer_download, post_filesystem_transfer_upload, put_filesystem_ops_chmod,
},
status_api::{get_status_systems, get_status_userinfo},
Expand Down Expand Up @@ -231,6 +231,11 @@ impl CscsApi {
None => Ok(vec![]),
}
}
pub async fn checksum(&self, system_name: &str, path: PathBuf) -> Result<Option<String>> {
get_filesystem_ops_checksum(&self.client, system_name, path)
.await
.wrap_err("couldn't stat file")
}
pub async fn stat_path(&self, system_name: &str, path: PathBuf) -> Result<Option<FileStat>> {
let result = get_filesystem_ops_stat(&self.client, system_name, path)
.await
Expand Down
1 change: 1 addition & 0 deletions coman/src/cscs/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub(crate) async fn cli_cscs_port_forward(
platform: Option<ComputePlatform>,
) -> Result<()> {
let job_id = maybe_job_id_from_name(job, system.clone(), platform.clone()).await?;
println!("running port forward for job {job_id}");
cscs_port_forward(job_id, source_port, destination_port, system).await
}

Expand Down
Loading