Skip to content

Commit ea91f26

Browse files
committed
add garbage collection for ssh connections
1 parent 703d49b commit ea91f26

7 files changed

Lines changed: 162 additions & 57 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Table of contents
1717
* [Logging in](#logging-in)
1818
* [CLI](#cli)
1919
* [Terminal UI](#tui)
20+
* [SSH](#ssh)
2021
* [coman.toml config file](#comantoml-config-file)
2122
* [Editing the config](#editing-the-config)
2223
* [Development](#development)
@@ -161,7 +162,8 @@ To execute a job on CSCS, run a command like
161162
coman cscs job submit -i ubuntu:latest -- echo test
162163
```
163164
This will run the command `echo test` using the `ubuntu:latest` docker image and default settings.
164-
See `coman cscs job submit -h` for more options.
165+
See `coman cscs job submit -h` for more options. This will also automatically set up an ssh connection for
166+
the job (use `--no-ssh` to prevent this), see the [SSH](#ssh) section for more details.
165167

166168
You can list your jobs using
167169

@@ -270,7 +272,7 @@ name = "myproject" # the name of the project, used to generate job names
270272
current_system = "daint" # what system/cluster to execute commands on
271273
current_platform = "HPC" # what platform to execute commands on (valid: HPC, ML or CW)
272274
account = "..." # the project/group account to use on cscs
273-
275+
ssh_key = "path/to/ssh/public/key.pub" # To use a different public key for SSH connections, other than the default auto-detected id_dsa, id_rsa or id_ecdsa
274276

275277
image = "ubuntu" # default docker image to use
276278

@@ -336,6 +338,27 @@ coman config get cscs.current_system
336338
coman config set cscs.current_system "daint"
337339
```
338340

341+
### SSH
342+
343+
`coman cscs job submit` will automatically create an SSH connection for the job. It will search for an
344+
`id_dsa.pub`, `id_rsa.pub` or `id_ecdsa.pub` file in your `.ssh` folder and use that for the connection,
345+
unless you specify another key using the `--ssh-key` argument or the `cscs.ssh_key` setting in the config file.
346+
347+
Creating the ssh connection involves several steps, all handled by coman:
348+
349+
- Uploading your ssh public key into the remote coman project folder
350+
- Setting the public key in the [CSCS SSH hook](https://docs.cscs.ch/software/container-engine/resource-hook/#ssh-hook)
351+
- Uploading a squash file containing the coman executable to the remote coman project folder
352+
- Mounting the coman squash file into the container so the coman executable is available in the container
353+
- Creating an [iroh](https://github.com/n0-computer/iroh) secret key to use for the [QUIC tunnel](https://en.wikipedia.org/wiki/QUIC)
354+
- Using the coman executable as the entrypoint of the container (wrapping the original command), which
355+
allows coman to create an iroh/QUIC tunnel for remote connections, as well as properly handling pid1
356+
process signals in the container
357+
- Creating a local SSH config in the coman data dir (`~/.local/share/coman` by default) containing connection
358+
information and the correct iroh proxy command
359+
- Including the SSH config in `.ssh/config` so it's accessible in other tools
360+
- Garbage collecting old SSH connections for jobs that are not running anymore
361+
339362
## Development
340363

341364
### Prerequisites

coman/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ base64 = "0.22.1"
8282
dirs = "6.0.0"
8383
iroh = "0.95.1"
8484
rand = "0.9.2"
85+
regex = "1.12.2"
8586

8687
[build-dependencies]
8788
anyhow = "1.0.90"

coman/src/cli.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ pub enum CliCommands {
7373
command: Vec<String>,
7474
},
7575
#[clap(hide = true)]
76-
Proxy { job_id: i64 },
76+
Proxy { system: String, job_id: i64 },
7777
}
7878

7979
#[derive(Subcommand, Debug)]
@@ -449,9 +449,9 @@ pub(crate) async fn cli_exec_command(command: Vec<String>) -> Result<()> {
449449
}
450450

451451
/// Thin wrapper around iroh proxy
452-
pub(crate) async fn cli_proxy_command(job_id: i64) -> Result<()> {
452+
pub(crate) async fn cli_proxy_command(system: String, job_id: i64) -> Result<()> {
453453
let data_dir = get_data_dir();
454-
let endpoint_id = std::fs::read_to_string(data_dir.join(format!("{}.endpoint", job_id)))?;
454+
let endpoint_id = std::fs::read_to_string(data_dir.join(format!("{}_{}.endpoint", system, job_id)))?;
455455
println!("{}", endpoint_id);
456456
iroh_ssh::api::proxy_mode(iroh_ssh::ProxyArgs { node_id: endpoint_id })
457457
.await

coman/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub struct CscsConfig {
6262
#[serde(default)]
6363
pub edf_file_template: String,
6464
#[serde(default)]
65+
pub ssh_key: Option<PathBuf>,
66+
#[serde(default)]
6567
pub command: Vec<String>,
6668

6769
#[serde(default)]

coman/src/cscs/handlers.rs

Lines changed: 129 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::os::unix::fs::MetadataExt;
33
#[cfg(target_family = "windows")]
44
use std::os::windows::fs::MetadataExt;
55
use std::{
6-
collections::HashMap,
6+
collections::{HashMap, HashSet},
77
io::{BufWriter, Read, Write},
88
path::{Path, PathBuf},
99
};
@@ -14,6 +14,7 @@ use eyre::Context;
1414
use futures::StreamExt;
1515
use iroh::SecretKey;
1616
use itertools::Itertools;
17+
use regex::Regex;
1718
use reqwest::Url;
1819
use tokio::{fs::File, io::AsyncWriteExt};
1920

@@ -172,13 +173,14 @@ async fn setup_ssh(
172173
base_path: &Path,
173174
current_system: &str,
174175
options: &JobStartOptions,
176+
config: &Config,
175177
) -> Result<Option<(PathBuf, SecretKey)>> {
176178
if options.no_ssh {
177179
return Ok(None);
178180
}
179181
let secret = SecretKey::generate(&mut rand::rng());
180182

181-
let ssh_key = if let Some(path) = options.ssh_key.clone() {
183+
let ssh_key = if let Some(path) = options.ssh_key.clone().or(config.values.cscs.ssh_key.clone()) {
182184
path.canonicalize().map(Some).wrap_err("couldn't get ssh key path")?
183185
} else {
184186
// try to figure our ssh key
@@ -202,13 +204,115 @@ async fn setup_ssh(
202204

203205
api_client
204206
.upload(current_system, remote_path.clone(), public_key.into_bytes())
205-
.await?;
207+
.await
208+
.wrap_err(eyre!("couldn't upload ssh public key"))?;
206209
Ok(Some((remote_path, secret)))
207210
}
208211
None => Err(eyre!("couldn't find ssh public key, use `--ssh_key` to specify it")),
209212
}
210213
}
211214

215+
async fn garbage_collect_ssh(api_client: &CscsApi, current_system: &str) -> Result<()> {
216+
let data_dir = get_data_dir();
217+
if !data_dir.exists() {
218+
return Ok(());
219+
}
220+
let jobs = api_client.list_jobs(current_system, None).await?;
221+
let job_entries: HashSet<_> = jobs.iter().map(|j| format!("{}_{}", current_system, j.id)).collect();
222+
let outdated_endpoints: Vec<_> = std::fs::read_dir(&data_dir)?
223+
.filter(|d| {
224+
d.as_ref().is_ok_and(|e| {
225+
e.path().is_file()
226+
&& e.file_name().to_string_lossy().ends_with(".endpoint")
227+
&& e.file_name().to_string_lossy().starts_with(current_system)
228+
&& !job_entries.contains(e.file_name().to_string_lossy().split_once('.').unwrap().0)
229+
})
230+
})
231+
.map(|d| d.unwrap())
232+
.collect();
233+
234+
// delete connection files
235+
for d in outdated_endpoints.iter() {
236+
std::fs::remove_file(d.path())?;
237+
}
238+
239+
// cleanup ssh config
240+
let coman_ssh_config_path = data_dir.join("ssh_config");
241+
if !coman_ssh_config_path.exists() {
242+
return Ok(());
243+
}
244+
let mut ssh_content = std::fs::read_to_string(&coman_ssh_config_path)?;
245+
for d in outdated_endpoints {
246+
let re = Regex::new(
247+
format!(
248+
r"(?ms)#Start {0}_[^\s]_{1}.*?#End {0}_[^\s]_{1}\n",
249+
current_system,
250+
d.file_name()
251+
.to_string_lossy()
252+
.split_once('.')
253+
.unwrap()
254+
.0
255+
.rsplit('_')
256+
.next()
257+
.unwrap()
258+
)
259+
.as_str(),
260+
)?;
261+
ssh_content = re.replace(&ssh_content, "").to_string();
262+
}
263+
264+
std::fs::write(coman_ssh_config_path, ssh_content)?;
265+
266+
Ok(())
267+
}
268+
269+
async fn store_ssh_information(
270+
current_system: &str,
271+
user_info: &UserInfo,
272+
job_id: &i64,
273+
job_name: &str,
274+
secret_key: &SecretKey,
275+
) -> Result<String> {
276+
let data_dir = get_data_dir();
277+
std::fs::write(
278+
data_dir.join(format!("{}_{}.endpoint", current_system, job_id)),
279+
format!("{}", secret_key.public()),
280+
)?;
281+
let coman_ssh_config_path = data_dir.join("ssh_config");
282+
let coman_ssh_config = std::fs::OpenOptions::new()
283+
.create(true)
284+
.append(true)
285+
.open(coman_ssh_config_path.clone())?;
286+
let connection_name = format!("{}-{}-{}", current_system, job_name, job_id);
287+
let mut writer = BufWriter::new(coman_ssh_config);
288+
write!(
289+
writer,
290+
"\n#Start {0}\nHost {0}\n Hostname {1}\n User {2}\n ProxyCommand coman proxy {3}{4}\n#End {0}",
291+
connection_name,
292+
secret_key.public(),
293+
user_info.name,
294+
current_system,
295+
job_id
296+
)?;
297+
let ssh_dir = dirs::home_dir().ok_or(eyre!("couldn't find home dir"))?.join(".ssh");
298+
let ssh_config_path = ssh_dir.join("config");
299+
let mut ssh_config = std::fs::OpenOptions::new()
300+
.read(true)
301+
.append(true)
302+
.open(ssh_config_path)?;
303+
let mut content = String::new();
304+
ssh_config.read_to_string(&mut content)?;
305+
if !content.contains(&format!("Include {}", coman_ssh_config_path.clone().display())) {
306+
let mut writer = BufWriter::new(ssh_config);
307+
write!(
308+
writer,
309+
"\n\n#coman include\nMatch all\nInclude {}",
310+
coman_ssh_config_path.display()
311+
)?;
312+
}
313+
Ok(connection_name)
314+
}
315+
212316
async fn inject_coman_squash(
213317
api_client: &CscsApi,
214318
base_path: &Path,
@@ -272,10 +376,19 @@ async fn inject_coman_squash(
272376
#[cfg(target_family = "windows")]
273377
let size = file_meta.file_size() as usize;
274378

379+
let existing = api_client.list_path(current_system, target.clone()).await?;
380+
if !existing.is_empty() {
381+
//squash file already present on remote, don't upload if it's the same
382+
let entry = existing.first().unwrap();
383+
if entry.size.unwrap_or_default() == size {
384+
return Ok(Some(target));
385+
}
386+
}
275387
//upload squash file
276388
let transfer_data = api_client
277389
.transfer_upload(current_system, config.values.cscs.account, target.clone(), size as i64)
278-
.await?;
390+
.await
391+
.wrap_err(eyre!("couldn't upload coman squash file"))?;
279392
let mut etags: Vec<String> = Vec::new();
280393
let client = reqwest::Client::new();
281394
let num_parts = transfer_data.1.num_parts;
@@ -452,11 +565,11 @@ pub async fn cscs_job_start(
452565
Ok(access_token) => {
453566
let api_client = CscsApi::new(access_token.0, platform).unwrap();
454567
let config = Config::new()?;
455-
let current_system = &system.unwrap_or(config.values.cscs.current_system);
456-
let account = account.or(config.values.cscs.account);
568+
let current_system = &system.unwrap_or(config.values.cscs.current_system.clone());
569+
let account = account.or(config.values.cscs.account.clone());
457570
let user_info = api_client.get_userinfo(current_system).await?;
458571
let job_name = name
459-
.or(config.values.name)
572+
.or(config.values.name.clone())
460573
.unwrap_or(format!("{}-coman", user_info.name));
461574
let current_system_info = api_client.get_system(current_system).await?;
462575
let scratch = match current_system_info {
@@ -476,15 +589,16 @@ pub async fn cscs_job_start(
476589
let container_workdir = options
477590
.container_workdir
478591
.clone()
479-
.unwrap_or(config.values.cscs.workdir.unwrap_or("/scratch".to_owned()));
592+
.unwrap_or(config.values.cscs.workdir.clone().unwrap_or("/scratch".to_owned()));
480593
let base_path = scratch.join(user_info.name.clone()).join(&job_name);
481594

482595
let mut envvars = config.values.cscs.env.clone();
483596
envvars.extend(options.env.clone());
484597

485-
let (ssh_public_key_path, secret_key) = setup_ssh(&api_client, &base_path, current_system, &options)
486-
.await?
487-
.unzip();
598+
let (ssh_public_key_path, secret_key) =
599+
setup_ssh(&api_client, &base_path, current_system, &options, &config)
600+
.await?
601+
.unzip();
488602
let coman_squash = inject_coman_squash(&api_client, &base_path, current_system, &options).await?;
489603

490604
let environment_path = handle_edf(
@@ -520,46 +634,10 @@ pub async fn cscs_job_start(
520634

521635
if let Some(secret_key) = secret_key {
522636
// store connection information in data dir and set up ssh connection
523-
let data_dir = get_data_dir();
524-
std::fs::write(
525-
data_dir.join(format!("{}.endpoint", job_id)),
526-
format!("{}", secret_key.public()),
527-
)?;
528-
let coman_ssh_config_path = data_dir.join("ssh_config");
529-
let coman_ssh_config = std::fs::OpenOptions::new()
530-
.create(true)
531-
.append(true)
532-
.open(coman_ssh_config_path.clone())?;
533-
let mut writer = BufWriter::new(coman_ssh_config);
534-
write!(
535-
writer,
536-
"\nHost {}-{}\n Hostname {}\n User {}\n ProxyCommand coman proxy {}\n",
537-
job_name,
538-
job_id,
539-
secret_key.public(),
540-
user_info.name,
541-
job_id
542-
)?;
543-
let ssh_dir = dirs::home_dir().ok_or(eyre!("couldn't find home dir"))?.join(".ssh");
544-
let ssh_config_path = ssh_dir.join("config");
545-
let mut ssh_config = std::fs::OpenOptions::new()
546-
.read(true)
547-
.append(true)
548-
.open(ssh_config_path)?;
549-
let mut content = String::new();
550-
ssh_config.read_to_string(&mut content)?;
551-
if !content.contains(&format!("Include {}", coman_ssh_config_path.clone().display())) {
552-
let mut writer = BufWriter::new(ssh_config);
553-
write!(
554-
writer,
555-
"\n\n#coman include\nMatch all\nInclude {}",
556-
coman_ssh_config_path.display()
557-
)?;
558-
}
559-
println!(
560-
"Use ssh {}@{}-{} to connect to the job",
561-
user_info.name, job_name, job_id
562-
);
637+
garbage_collect_ssh(&api_client, current_system).await?;
638+
let connection_name =
639+
store_ssh_information(current_system, &user_info, &job_id, &job_name, &secret_key).await?;
640+
println!("Use ssh {}@{} to connect to the job", user_info.name, connection_name);
563641
}
564642

565643
Ok(())

coman/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ async fn main() -> Result<()> {
142142
},
143143
cli::CliCommands::Init { destination, name } => Config::create_project_config(destination, name)?,
144144
cli::CliCommands::Exec { command } => cli_exec_command(command).await?,
145-
cli::CliCommands::Proxy { job_id } => cli_proxy_command(job_id).await?,
145+
cli::CliCommands::Proxy { system, job_id } => cli_proxy_command(system, job_id).await?,
146146
},
147147
None => run_tui(args.tick_rate)?,
148148
}

0 commit comments

Comments
 (0)