Skip to content

Commit c52f440

Browse files
committed
add download and upload functionality on the cli
1 parent 12c35d1 commit c52f440

16 files changed

Lines changed: 1003 additions & 648 deletions

File tree

Cargo.lock

Lines changed: 814 additions & 583 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coman/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ docker_credential = "1.3.2"
7070
chrono = "0.4.42"
7171
openssl = { version = "0.10.75", features = ["vendored"] }
7272
tui-realm-treeview = "3.0.0"
73-
rust-s3 = "0.37.0"
73+
aws-sdk-s3 = "1.115.0"
7474

7575
[build-dependencies]
7676
anyhow = "1.0.90"

coman/src/app/model.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use color_eyre::owo_colors::OwoColorize;
21
use eyre::{Context, Report};
32
use tokio::sync::mpsc;
43
use tuirealm::{
@@ -19,7 +18,7 @@ use crate::{
1918
},
2019
components::{
2120
context_menu::ContextMenu, error_popup::ErrorPopup, info_popup::InfoPopup, login_popup::LoginPopup,
22-
system_select_popup::SystemSelectPopup, toolbar::Toolbar, workload_list::WorkloadList,
21+
system_select_popup::SystemSelectPopup, workload_list::WorkloadList,
2322
workload_log::WorkloadLog,
2423
},
2524
cscs::handlers::{cscs_login, cscs_system_set},

coman/src/cli.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,19 @@ pub enum CscsFileCommands {
8080
#[clap(alias("ls"))]
8181
List { path: PathBuf },
8282
#[clap(alias("dl"))]
83-
Download { remote: PathBuf, local: PathBuf },
83+
Download {
84+
#[clap(short, long, help = "account/project to use")]
85+
account: Option<String>,
86+
remote: PathBuf,
87+
local: PathBuf,
88+
},
8489
#[clap(alias("ul"))]
85-
Upload { local: PathBuf, remote: PathBuf },
90+
Upload {
91+
#[clap(short, long, help = "account/project to use")]
92+
account: Option<String>,
93+
local: PathBuf,
94+
remote: PathBuf,
95+
},
8696
}
8797

8898
#[derive(Subcommand, Debug)]

coman/src/components/context_menu.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use tuirealm::{
88

99
use crate::app::{
1010
messages::{MenuMsg, Msg, View},
11-
user_events::{FileEvent, UserEvent},
11+
user_events::UserEvent,
1212
};
1313

1414
#[derive(MockComponent)]

coman/src/components/file_tree.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl FileTree {
8181
}
8282
impl Component<Msg, UserEvent> for FileTree {
8383
fn on(&mut self, ev: Event<UserEvent>) -> Option<Msg> {
84-
let result = match ev {
84+
match ev {
8585
Event::Keyboard(KeyEvent {
8686
code: Key::Left,
8787
modifiers: KeyModifiers::NONE,

coman/src/components/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub(crate) mod context_menu;
12
pub(crate) mod error_popup;
23
pub(crate) mod file_tree;
34
pub(crate) mod global_listener;
@@ -7,4 +8,3 @@ pub(crate) mod system_select_popup;
78
pub(crate) mod toolbar;
89
pub(crate) mod workload_list;
910
pub(crate) mod workload_log;
10-
pub(crate) mod context_menu;

coman/src/config.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use eyre::eyre;
88
use lazy_static::lazy_static;
99
use serde::{Deserialize, Serialize};
1010

11-
use crate::trace_dbg;
1211

1312
const DEFAULT_CONFIG_TOML: &str = include_str!("../.config/config.toml");
1413

coman/src/cscs/api_client.rs

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ use firecrest_client::{
1818
types::{
1919
DownloadFileResponseTransferDirectives, File as CSCSFile, FileStat as CSCSFileStat,
2020
FileSystem as CSCSFileSystem, FileSystemDataType, HPCCluster, HealthCheckType, JobMetadataModel, JobModel,
21-
SchedulerServiceHealth, UserInfoResponse,
21+
S3TransferResponse, SchedulerServiceHealth, UserInfoResponse,
2222
},
2323
};
24+
use reqwest::Url;
2425
use strum::Display;
2526

2627
use crate::trace_dbg;
@@ -121,7 +122,7 @@ where
121122

122123
impl From<CSCSFile> for PathEntry {
123124
fn from(value: CSCSFile) -> Self {
124-
let size = match usize::from_str_radix(&value.size, 10) {
125+
let size = match value.size.parse::<usize>() {
125126
Ok(size) => size,
126127
Err(err) => panic!("Couldn't parse file size {}: {:?}", value.size, err),
127128
};
@@ -334,6 +335,31 @@ fn display_health(h: &Option<Vec<ServicesHealth>>) -> String {
334335
.unwrap_or("".to_string())
335336
}
336337

338+
#[derive(Debug, Clone)]
339+
pub struct S3Upload {
340+
pub parts_upload_urls: Vec<Url>,
341+
pub complete_upload_url: Url,
342+
pub part_size: u64,
343+
pub num_parts: u64,
344+
}
345+
346+
impl S3Upload {
347+
fn convert(value: S3TransferResponse, size: u64) -> Result<Self> {
348+
let complete_url = value.complete_upload_url.ok_or(eyre!("no upload completion url set"))?;
349+
let part_urls = value.parts_upload_urls.ok_or(eyre!("no part upload urls set"))?;
350+
let part_size = value.max_part_size.ok_or(eyre!("couldn't parse size"))? as u64;
351+
Ok(Self {
352+
parts_upload_urls: part_urls
353+
.iter()
354+
.map(|u| Url::parse(u).wrap_err("couldn't parse url"))
355+
.collect::<Result<Vec<Url>>>()?,
356+
part_size,
357+
complete_upload_url: Url::parse(&complete_url)?,
358+
num_parts: (size.div_ceil(part_size)),
359+
})
360+
}
361+
}
362+
337363
pub struct CscsApi {
338364
client: FirecrestClient,
339365
}
@@ -442,16 +468,17 @@ impl CscsApi {
442468
account: &str,
443469
path: PathBuf,
444470
size: i64,
445-
) -> Result<(i64, Vec<String>, i64)> {
471+
) -> Result<(i64, S3Upload)> {
472+
if account.is_empty() {
473+
return Err(eyre!(
474+
"An account is required, set it in the config file or with the --account flag"
475+
));
476+
}
446477
let job = post_filesystem_transfer_upload(&self.client, system_name, account, path, size)
447478
.await
448479
.wrap_err("couldn't upload file")?;
449480
if let DownloadFileResponseTransferDirectives::S3(directives) = job.transfer_directives {
450-
Ok((
451-
job.transfer_job.job_id,
452-
directives.parts_upload_urls.unwrap(),
453-
directives.max_part_size.unwrap(),
454-
))
481+
Ok((job.transfer_job.job_id, S3Upload::convert(directives, size as u64)?))
455482
} else {
456483
trace_dbg!(job);
457484
Err(eyre!("didn't get S3 transfer directive"))
@@ -463,12 +490,18 @@ impl CscsApi {
463490
.wrap_err("couldn't download file")?;
464491
Ok(content)
465492
}
466-
pub async fn transfer_download(&self, system_name: &str, path: PathBuf) -> Result<(i64, String)> {
467-
let job = post_filesystem_transfer_download(&self.client, system_name, path)
493+
pub async fn transfer_download(&self, system_name: &str, account: &str, path: PathBuf) -> Result<(i64, Url)> {
494+
if account.is_empty() {
495+
return Err(eyre!(
496+
"An account is required, set it in the config file or with the --account flag"
497+
));
498+
}
499+
let job = post_filesystem_transfer_download(&self.client, system_name, account, path)
468500
.await
469501
.wrap_err("couldn't transfer file")?;
470502
if let DownloadFileResponseTransferDirectives::S3(directives) = job.transfer_directives {
471-
Ok((job.transfer_job.job_id, directives.download_url.unwrap()))
503+
let download_url = Url::parse(&directives.download_url.unwrap())?;
504+
Ok((job.transfer_job.job_id, download_url))
472505
} else {
473506
Err(eyre!("didn't get S3 transfer directive"))
474507
}

coman/src/cscs/cli.rs

Lines changed: 83 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
1-
use std::{path::PathBuf, time::Duration};
1+
use std::{
2+
io::{SeekFrom, Write},
3+
path::PathBuf,
4+
time::{Duration, Instant},
5+
};
26

37
use color_eyre::{Result, eyre::Context};
48
use eyre::eyre;
9+
use futures::StreamExt;
510
use inquire::{Password, Text};
611
use itertools::Itertools;
712
use reqwest::Url;
13+
use tokio::{
14+
fs::File,
15+
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader},
16+
};
817

918
use crate::{
1019
cscs::{
@@ -14,7 +23,6 @@ use crate::{
1423
cscs_job_log, cscs_login, cscs_start_job, cscs_system_list, cscs_system_set,
1524
},
1625
},
17-
trace_dbg,
1826
util::types::DockerImageUrl,
1927
};
2028

@@ -123,8 +131,8 @@ pub(crate) async fn cli_cscs_file_list(path: PathBuf) -> Result<()> {
123131
}
124132
}
125133

126-
pub(crate) async fn cli_cscs_file_download(remote: PathBuf, local: PathBuf) -> Result<()> {
127-
match cscs_file_download(remote, local.clone()).await {
134+
pub(crate) async fn cli_cscs_file_download(remote: PathBuf, local: PathBuf, account: Option<String>) -> Result<()> {
135+
match cscs_file_download(remote, local.clone(), account).await {
128136
Ok(None) => {
129137
println!("File successfully downloaded");
130138
Ok(())
@@ -148,44 +156,91 @@ pub(crate) async fn cli_cscs_file_download(remote: PathBuf, local: PathBuf) -> R
148156

149157
// download from s3
150158
println!("Downloading file from s3, this might take a while");
151-
let credentials = s3::creds::Credentials::default()?;
152-
let url = Url::parse(&job_data.1)?;
153-
let region = s3::region::Region::Custom {
154-
region: "cscs-zonegroup".to_owned(),
155-
endpoint: "https://rgw.cscs.ch".to_owned(),
156-
};
157-
let mut segments = url.path_segments().unwrap();
158-
let bucket_name = segments.next().unwrap();
159-
let path = segments.join("/");
160-
let bucket = s3::bucket::Bucket::create_with_path_style(
161-
bucket_name,
162-
region,
163-
credentials,
164-
s3::BucketConfiguration::default(),
165-
)
166-
.await?
167-
.bucket;
168-
let mut async_output_file = tokio::fs::File::create(&local).await.expect("Unable to create file");
169-
let status = bucket.get_object_to_writer(path, &mut async_output_file).await?;
170-
println!("download finished with status {}", status);
159+
160+
let mut output = File::create(local).await?;
161+
let mut stream = reqwest::get(job_data.1).await?.bytes_stream();
162+
let mut progress = 0;
163+
let mut start_time = Instant::now();
164+
while let Some(chunk_result) = stream.next().await {
165+
let chunk = chunk_result?;
166+
output.write_all(&chunk).await?;
167+
progress += chunk.len();
168+
169+
if start_time.elapsed() >= Duration::from_secs(1) {
170+
print!("\rDownloaded {}/{}Mb", progress / 1024 / 1024, job_data.2 / 1024 / 1024);
171+
std::io::stdout().flush()?;
172+
start_time = Instant::now();
173+
}
174+
}
175+
output.flush().await?;
176+
println!(); //force newline
177+
println!("Download complete");
171178

172179
Ok(())
173180
}
174181
Err(e) => Err(e),
175182
}
176183
}
177184

178-
pub(crate) async fn cli_cscs_file_upload(local: PathBuf, remote: PathBuf) -> Result<()> {
179-
match cscs_file_upload(local, remote).await {
185+
pub(crate) async fn cli_cscs_file_upload(local: PathBuf, remote: PathBuf, account: Option<String>) -> Result<()> {
186+
match cscs_file_upload(local.clone(), remote, account).await {
180187
Ok(None) => {
181188
println!("File successfully uploaded");
182189
Ok(())
183190
}
184191
Ok(Some(transfer_data)) => {
185192
println!("starting file transfer, this might take a while");
186-
let transfer_data = trace_dbg!(transfer_data);
187-
Ok(())
193+
let mut etags: Vec<String> = Vec::new();
194+
let client = reqwest::Client::new();
195+
let num_parts = transfer_data.1.num_parts;
196+
for (chunk_id, transfer_url) in transfer_data.1.parts_upload_urls.into_iter().enumerate() {
197+
println!(
198+
"Uploading part {}/{} ({}Mb)",
199+
chunk_id + 1,
200+
num_parts,
201+
transfer_data.1.part_size / 1024 / 1024
202+
);
203+
let etag = upload_chunk(
204+
local.clone(),
205+
(chunk_id as u64) * transfer_data.1.part_size,
206+
transfer_data.1.part_size,
207+
transfer_url,
208+
)
209+
.await?;
210+
etags.push(etag);
211+
}
212+
213+
let body = etags
214+
.into_iter()
215+
.enumerate()
216+
.map(|(i, etag)| (i + 1, etag))
217+
.map(|(i, etag)| format!("<Part><PartNumber>{}</PartNumber><ETag>{}</ETag></Part>", i, etag))
218+
.join("");
219+
let body = format!("<CompleteMultipartUpload>{}</CompleteMultipartUpload>", body);
220+
let req = client.post(transfer_data.1.complete_upload_url).body(body).build()?;
221+
let resp = client.execute(req).await?;
222+
match resp.error_for_status() {
223+
Ok(_) => {
224+
println!("done");
225+
Ok(())
226+
}
227+
Err(e) => Err(e).wrap_err("failed to complete upload"),
228+
}
188229
}
189230
Err(e) => Err(e),
190231
}
191232
}
233+
234+
async fn upload_chunk(path: PathBuf, offset: u64, size: u64, url: Url) -> Result<String> {
235+
let client = reqwest::Client::new();
236+
237+
let source_file = File::open(path).await?;
238+
let mut buf = vec![];
239+
let mut reader = BufReader::new(source_file);
240+
reader.seek(SeekFrom::Start(offset)).await?;
241+
let mut chunk = reader.take(size);
242+
chunk.read_to_end(&mut buf).await?;
243+
let req = client.put(url).body(buf).build()?;
244+
let resp = client.execute(req).await?;
245+
Ok(resp.headers()["etag"].to_str()?.to_owned())
246+
}

0 commit comments

Comments
 (0)