Skip to content

Commit 4d2067c

Browse files
committed
add garbage collection for ssh connections
1 parent 703d49b commit 4d2067c

7 files changed

Lines changed: 170 additions & 60 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Table of contents
1717
* [Logging in](#logging-in)
1818
* [CLI](#cli)
1919
* [Terminal UI](#tui)
20+
* [SSH](#ssh)
2021
* [coman.toml config file](#comantoml-config-file)
2122
* [Editing the config](#editing-the-config)
2223
* [Development](#development)
@@ -161,7 +162,8 @@ To execute a job on CSCS, run a command like
161162
coman cscs job submit -i ubuntu:latest -- echo test
162163
```
163164
This will run the command `echo test` using the `ubuntu:latest` docker image and default settings.
164-
See `coman cscs job submit -h` for more options.
165+
See `coman cscs job submit -h` for more options. This will also automatically set up an ssh connection for
166+
the job (use `--no-ssh` to prevent this), see the [SSH](#ssh) section for more details.
165167

166168
You can list your jobs using
167169

@@ -270,7 +272,7 @@ name = "myproject" # the name of the project, used to generate job names
270272
current_system = "daint" # what system/cluster to execute commands on
271273
current_platform = "HPC" # what platform to execute commands on (valid: HPC, ML or CW)
272274
account = "..." # the project/group account to use on cscs
273-
275+
ssh_key = "path/to/ssh/public/key.pub" # To use a different public key for SSH connections, other than the default auto-detected id_dsa, id_rsa or id_ecdsa
274276

275277
image = "ubuntu" # default docker image to use
276278

@@ -336,6 +338,27 @@ coman config get cscs.current_system
336338
coman config set cscs.current_system "daint"
337339
```
338340

341+
### SSH
342+
343+
`coman cscs job submit` will automatically create an SSH connection for the job. It will search for an
344+
`id_dsa.pub`, `id_rsa.pub` or `id_ecdsa.pub` file in your `.ssh` folder and use that for the connection,
345+
unless you specify another key using the `--ssh-key` argument or the `cscs.ssh_key` setting in the config file.
346+
347+
Creating the ssh connection involves several steps, all handled by coman:
348+
349+
- Uploading your ssh public key into the remote coman project folder
350+
- Setting the public key in the [CSCS SSH hook](https://docs.cscs.ch/software/container-engine/resource-hook/#ssh-hook)
351+
- Uploading a squash file containing the coman executable to the remote coman project folder
352+
- Mounting the coman squash file into the container so the coman executable is available in the container
353+
- Creating an [iroh](https://github.com/n0-computer/iroh) secret key to use for the [QUIC tunnel](https://en.wikipedia.org/wiki/QUIC)
354+
- Using the coman executable as the entrypoint of the container (wrapping the original command), which
355+
allows coman to create an iroh/QUIC tunnel for remote connections, as well as properly handling pid1
356+
process signals in the container
357+
- Creating a local SSH config in the coman data dir (`~/.local/share/coman` by default) containing connection
358+
information and the correct iroh proxy command
359+
- Including the SSH config in `.ssh/config` so it's accessible in other tools
360+
- Garbage collecting old SSH connections for jobs that are not running anymore
361+
339362
## Development
340363

341364
### Prerequisites

coman/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ base64 = "0.22.1"
8282
dirs = "6.0.0"
8383
iroh = "0.95.1"
8484
rand = "0.9.2"
85+
regex = "1.12.2"
8586

8687
[build-dependencies]
8788
anyhow = "1.0.90"

coman/src/cli.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ pub enum CliCommands {
7373
command: Vec<String>,
7474
},
7575
#[clap(hide = true)]
76-
Proxy { job_id: i64 },
76+
Proxy { system: String, job_id: i64 },
7777
}
7878

7979
#[derive(Subcommand, Debug)]
@@ -449,9 +449,9 @@ pub(crate) async fn cli_exec_command(command: Vec<String>) -> Result<()> {
449449
}
450450

451451
/// Thin wrapper around iroh proxy
452-
pub(crate) async fn cli_proxy_command(job_id: i64) -> Result<()> {
452+
pub(crate) async fn cli_proxy_command(system: String, job_id: i64) -> Result<()> {
453453
let data_dir = get_data_dir();
454-
let endpoint_id = std::fs::read_to_string(data_dir.join(format!("{}.endpoint", job_id)))?;
454+
let endpoint_id = std::fs::read_to_string(data_dir.join(format!("{}_{}.endpoint", system, job_id)))?;
455455
println!("{}", endpoint_id);
456456
iroh_ssh::api::proxy_mode(iroh_ssh::ProxyArgs { node_id: endpoint_id })
457457
.await

coman/src/config.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub struct CscsConfig {
6262
#[serde(default)]
6363
pub edf_file_template: String,
6464
#[serde(default)]
65+
pub ssh_key: Option<PathBuf>,
66+
#[serde(default)]
6567
pub command: Vec<String>,
6668

6769
#[serde(default)]
@@ -111,9 +113,8 @@ impl Layer {
111113
let root = self.data.as_item();
112114
let item = lookup_entry(key_path_parsed, root)?;
113115
let item = item
114-
.map(|i| i.clone().into_value())
116+
.map(|i| i.clone().into_value().map_err(|e| eyre!("{:?}", e)))
115117
.transpose()
116-
.map_err(|e| eyre!(format!("{:?}", e)))
117118
.wrap_err("couldn't convert config item to value")?;
118119

119120
Ok(item.map(|val| match val {

coman/src/cscs/handlers.rs

Lines changed: 136 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::os::unix::fs::MetadataExt;
33
#[cfg(target_family = "windows")]
44
use std::os::windows::fs::MetadataExt;
55
use std::{
6-
collections::HashMap,
6+
collections::{HashMap, HashSet},
77
io::{BufWriter, Read, Write},
88
path::{Path, PathBuf},
99
};
@@ -14,6 +14,7 @@ use eyre::Context;
1414
use futures::StreamExt;
1515
use iroh::SecretKey;
1616
use itertools::Itertools;
17+
use regex::Regex;
1718
use reqwest::Url;
1819
use tokio::{fs::File, io::AsyncWriteExt};
1920

@@ -23,7 +24,9 @@ use crate::{
2324
cscs::{
2425
api_client::{
2526
client::{CscsApi, JobStartOptions},
26-
types::{FileStat, FileSystemType, Job, JobDetail, PathEntry, PathType, S3Upload, System, UserInfo},
27+
types::{
28+
FileStat, FileSystemType, Job, JobDetail, JobStatus, PathEntry, PathType, S3Upload, System, UserInfo,
29+
},
2730
},
2831
cli::upload_chunk,
2932
oauth2::{
@@ -172,13 +175,14 @@ async fn setup_ssh(
172175
base_path: &Path,
173176
current_system: &str,
174177
options: &JobStartOptions,
178+
config: &Config,
175179
) -> Result<Option<(PathBuf, SecretKey)>> {
176180
if options.no_ssh {
177181
return Ok(None);
178182
}
179183
let secret = SecretKey::generate(&mut rand::rng());
180184

181-
let ssh_key = if let Some(path) = options.ssh_key.clone() {
185+
let ssh_key = if let Some(path) = options.ssh_key.clone().or(config.values.cscs.ssh_key.clone()) {
182186
path.canonicalize().map(Some).wrap_err("couldn't get ssh key path")?
183187
} else {
184188
// try to figure our ssh key
@@ -202,13 +206,119 @@ async fn setup_ssh(
202206

203207
api_client
204208
.upload(current_system, remote_path.clone(), public_key.into_bytes())
205-
.await?;
209+
.await
210+
.wrap_err(eyre!("couldn't upload ssh public key"))?;
206211
Ok(Some((remote_path, secret)))
207212
}
208213
None => Err(eyre!("couldn't find ssh public key, use `--ssh_key` to specify it")),
209214
}
210215
}
211216

217+
async fn garbage_collect_ssh(api_client: &CscsApi, current_system: &str) -> Result<()> {
218+
let data_dir = get_data_dir();
219+
if !data_dir.exists() {
220+
return Ok(());
221+
}
222+
let jobs = api_client.list_jobs(current_system, None).await?;
223+
let job_entries: HashSet<_> = jobs
224+
.iter()
225+
.filter(|j| j.status == JobStatus::Pending || j.status == JobStatus::Running)
226+
.map(|j| format!("{}_{}", current_system, j.id))
227+
.collect();
228+
let outdated_endpoints: Vec<_> = std::fs::read_dir(&data_dir)?
229+
.filter(|d| {
230+
d.as_ref().is_ok_and(|e| {
231+
e.path().is_file()
232+
&& e.file_name().to_string_lossy().ends_with(".endpoint")
233+
&& e.file_name().to_string_lossy().starts_with(current_system)
234+
&& !job_entries.contains(e.file_name().to_string_lossy().split_once('.').unwrap().0)
235+
})
236+
})
237+
.map(|d| d.unwrap())
238+
.collect();
239+
240+
// delete connection files
241+
for d in outdated_endpoints.iter() {
242+
std::fs::remove_file(d.path())?;
243+
}
244+
245+
// cleanup ssh config
246+
let coman_ssh_config_path = data_dir.join("ssh_config");
247+
if !coman_ssh_config_path.exists() {
248+
return Ok(());
249+
}
250+
let mut ssh_content = std::fs::read_to_string(&coman_ssh_config_path)?;
251+
for d in outdated_endpoints {
252+
let re = Regex::new(
253+
format!(
254+
r"(?ms)#Start {0}_[^\s]_{1}.*?#End {0}_[^\s]_{1}\n",
255+
current_system,
256+
d.file_name()
257+
.to_string_lossy()
258+
.split_once('.')
259+
.unwrap()
260+
.0
261+
.rsplit('_')
262+
.next()
263+
.unwrap()
264+
)
265+
.as_str(),
266+
)?;
267+
ssh_content = re.replace(&ssh_content, "").to_string();
268+
}
269+
270+
std::fs::write(coman_ssh_config_path, ssh_content)?;
271+
272+
Ok(())
273+
}
274+
275+
async fn store_ssh_information(
276+
current_system: &str,
277+
user_info: &UserInfo,
278+
job_id: &i64,
279+
job_name: &str,
280+
secret_key: &SecretKey,
281+
) -> Result<String> {
282+
let data_dir = get_data_dir();
283+
std::fs::write(
284+
data_dir.join(format!("{}_{}.endpoint", current_system, job_id)),
285+
format!("{}", secret_key.public()),
286+
)?;
287+
let coman_ssh_config_path = data_dir.join("ssh_config");
288+
let coman_ssh_config = std::fs::OpenOptions::new()
289+
.create(true)
290+
.append(true)
291+
.open(coman_ssh_config_path.clone())?;
292+
let connection_name = format!("{}-{}-{}", current_system, job_name, job_id);
293+
let mut writer = BufWriter::new(coman_ssh_config);
294+
write!(
295+
writer,
296+
"\n#Start {0}\nHost {0}\n Hostname {1}\n User {2}\n ProxyCommand coman proxy {3}{4}\n#End {0}",
297+
connection_name,
298+
secret_key.public(),
299+
user_info.name,
300+
current_system,
301+
job_id
302+
)?;
303+
let ssh_dir = dirs::home_dir().ok_or(eyre!("couldn't find home dir"))?.join(".ssh");
304+
let ssh_config_path = ssh_dir.join("config");
305+
let mut ssh_config = std::fs::OpenOptions::new()
306+
.read(true)
307+
.append(true)
308+
.open(ssh_config_path)?;
309+
let mut content = String::new();
310+
ssh_config.read_to_string(&mut content)?;
311+
if !content.contains(&format!("Include {}", coman_ssh_config_path.clone().display())) {
312+
let mut writer = BufWriter::new(ssh_config);
313+
write!(
314+
writer,
315+
"\n\n#coman include\nMatch all\nInclude {}",
316+
coman_ssh_config_path.display()
317+
)?;
318+
}
319+
Ok(connection_name)
320+
}
321+
212322
async fn inject_coman_squash(
213323
api_client: &CscsApi,
214324
base_path: &Path,
@@ -272,10 +382,19 @@ async fn inject_coman_squash(
272382
#[cfg(target_family = "windows")]
273383
let size = file_meta.file_size() as usize;
274384

385+
let existing = api_client.list_path(current_system, target.clone()).await?;
386+
if !existing.is_empty() {
387+
//squash file already present on remote, don't upload if it's the same
388+
let entry = existing.first().unwrap();
389+
if entry.size.unwrap_or_default() == size {
390+
return Ok(Some(target));
391+
}
392+
}
275393
//upload squash file
276394
let transfer_data = api_client
277395
.transfer_upload(current_system, config.values.cscs.account, target.clone(), size as i64)
278-
.await?;
396+
.await
397+
.wrap_err(eyre!("couldn't upload coman squash file"))?;
279398
let mut etags: Vec<String> = Vec::new();
280399
let client = reqwest::Client::new();
281400
let num_parts = transfer_data.1.num_parts;
@@ -452,11 +571,11 @@ pub async fn cscs_job_start(
452571
Ok(access_token) => {
453572
let api_client = CscsApi::new(access_token.0, platform).unwrap();
454573
let config = Config::new()?;
455-
let current_system = &system.unwrap_or(config.values.cscs.current_system);
456-
let account = account.or(config.values.cscs.account);
574+
let current_system = &system.unwrap_or(config.values.cscs.current_system.clone());
575+
let account = account.or(config.values.cscs.account.clone());
457576
let user_info = api_client.get_userinfo(current_system).await?;
458577
let job_name = name
459-
.or(config.values.name)
578+
.or(config.values.name.clone())
460579
.unwrap_or(format!("{}-coman", user_info.name));
461580
let current_system_info = api_client.get_system(current_system).await?;
462581
let scratch = match current_system_info {
@@ -476,15 +595,16 @@ pub async fn cscs_job_start(
476595
let container_workdir = options
477596
.container_workdir
478597
.clone()
479-
.unwrap_or(config.values.cscs.workdir.unwrap_or("/scratch".to_owned()));
598+
.unwrap_or(config.values.cscs.workdir.clone().unwrap_or("/scratch".to_owned()));
480599
let base_path = scratch.join(user_info.name.clone()).join(&job_name);
481600

482601
let mut envvars = config.values.cscs.env.clone();
483602
envvars.extend(options.env.clone());
484603

485-
let (ssh_public_key_path, secret_key) = setup_ssh(&api_client, &base_path, current_system, &options)
486-
.await?
487-
.unzip();
604+
let (ssh_public_key_path, secret_key) =
605+
setup_ssh(&api_client, &base_path, current_system, &options, &config)
606+
.await?
607+
.unzip();
488608
let coman_squash = inject_coman_squash(&api_client, &base_path, current_system, &options).await?;
489609

490610
let environment_path = handle_edf(
@@ -520,46 +640,10 @@ pub async fn cscs_job_start(
520640

521641
if let Some(secret_key) = secret_key {
522642
// store connection information in data dir and set up ssh connection
523-
let data_dir = get_data_dir();
524-
std::fs::write(
525-
data_dir.join(format!("{}.endpoint", job_id)),
526-
format!("{}", secret_key.public()),
527-
)?;
528-
let coman_ssh_config_path = data_dir.join("ssh_config");
529-
let coman_ssh_config = std::fs::OpenOptions::new()
530-
.create(true)
531-
.append(true)
532-
.open(coman_ssh_config_path.clone())?;
533-
let mut writer = BufWriter::new(coman_ssh_config);
534-
write!(
535-
writer,
536-
"\nHost {}-{}\n Hostname {}\n User {}\n ProxyCommand coman proxy {}\n",
537-
job_name,
538-
job_id,
539-
secret_key.public(),
540-
user_info.name,
541-
job_id
542-
)?;
543-
let ssh_dir = dirs::home_dir().ok_or(eyre!("couldn't find home dir"))?.join(".ssh");
544-
let ssh_config_path = ssh_dir.join("config");
545-
let mut ssh_config = std::fs::OpenOptions::new()
546-
.read(true)
547-
.append(true)
548-
.open(ssh_config_path)?;
549-
let mut content = String::new();
550-
ssh_config.read_to_string(&mut content)?;
551-
if !content.contains(&format!("Include {}", coman_ssh_config_path.clone().display())) {
552-
let mut writer = BufWriter::new(ssh_config);
553-
write!(
554-
writer,
555-
"\n\n#coman include\nMatch all\nInclude {}",
556-
coman_ssh_config_path.display()
557-
)?;
558-
}
559-
println!(
560-
"Use ssh {}@{}-{} to connect to the job",
561-
user_info.name, job_name, job_id
562-
);
643+
garbage_collect_ssh(&api_client, current_system).await?;
644+
let connection_name =
645+
store_ssh_information(current_system, &user_info, &job_id, &job_name, &secret_key).await?;
646+
println!("Use ssh {}@{} to connect to the job", user_info.name, connection_name);
563647
}
564648

565649
Ok(())

coman/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ async fn main() -> Result<()> {
142142
},
143143
cli::CliCommands::Init { destination, name } => Config::create_project_config(destination, name)?,
144144
cli::CliCommands::Exec { command } => cli_exec_command(command).await?,
145-
cli::CliCommands::Proxy { job_id } => cli_proxy_command(job_id).await?,
145+
cli::CliCommands::Proxy { system, job_id } => cli_proxy_command(system, job_id).await?,
146146
},
147147
None => run_tui(args.tick_rate)?,
148148
}

0 commit comments

Comments
 (0)