Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions coman/src/app/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ pub enum CscsMsg {
}
#[derive(Debug, PartialEq)]
pub enum JobMsg {
ShowLog(usize),
CloseLog,
Show(usize),
Switch,
Close,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, PartialOrd, Eq, Ord, strum::Display)]
pub enum View {
Expand Down
21 changes: 14 additions & 7 deletions coman/src/app/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
},
cscs::{
handlers::{cscs_login, cscs_system_set},
ports::TreeAction,
ports::{JobLogAction, TreeAction},
},
trace_dbg,
util::ui::{draw_area_in_absolute, draw_area_in_absolute_fixed_height},
Expand Down Expand Up @@ -56,7 +56,7 @@ where

/// Triggers watching job logs
/// sending None stops watching
pub job_log_tx: mpsc::Sender<Option<usize>>,
pub job_log_tx: mpsc::Sender<JobLogAction>,

/// Allows creating user events based on messages
pub user_event_tx: mpsc::Sender<UserEvent>,
Expand All @@ -74,7 +74,7 @@ where
bridge: TerminalBridge<T>,
error_tx: mpsc::Sender<String>,
select_system_tx: mpsc::Sender<()>,
job_log_tx: mpsc::Sender<Option<usize>>,
job_log_tx: mpsc::Sender<JobLogAction>,
user_event_tx: mpsc::Sender<UserEvent>,
file_tree_tx: mpsc::Sender<TreeAction>,
) -> Self {
Expand Down Expand Up @@ -305,7 +305,7 @@ where
}
fn handle_job_msg(&mut self, msg: JobMsg) -> Option<Msg> {
match msg {
JobMsg::ShowLog(jobid) => {
JobMsg::Show(jobid) => {
if self.app.mounted(&Id::WorkloadList) {
assert!(self.app.umount(&Id::WorkloadList).is_ok());
}
Expand All @@ -319,11 +319,18 @@ where
assert!(self.app.active(&Id::WorkloadLogs).is_ok());
let job_log_tx = self.job_log_tx.clone();
tokio::spawn(async move {
job_log_tx.send(Some(jobid)).await.unwrap();
job_log_tx.send(JobLogAction::Job(jobid)).await.unwrap();
});
None
}
JobMsg::CloseLog => {
JobMsg::Switch => {
let job_log_tx = self.job_log_tx.clone();
tokio::spawn(async move {
job_log_tx.send(JobLogAction::SwitchLog).await.unwrap();
});
None
}
JobMsg::Close => {
if self.app.mounted(&Id::WorkloadLogs) {
assert!(self.app.umount(&Id::WorkloadLogs).is_ok());
}
Expand All @@ -338,7 +345,7 @@ where
let job_log_tx = self.job_log_tx.clone();
tokio::spawn(async move {
// stopp polling for logs
job_log_tx.send(None).await.unwrap();
job_log_tx.send(JobLogAction::Stop).await.unwrap();
});
None
}
Expand Down
6 changes: 5 additions & 1 deletion coman/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ pub enum CscsJobCommands {
#[clap(alias("g"), about = "Get metadata for a specific job [aliases: g]")]
Get { job_id: i64 },
#[clap(about = "Get the stdout of a job")]
Log { job_id: i64 },
Log {
#[clap(short, long, action, help = "whether to get stderr instead of stdout")]
stderr: bool,
job_id: i64,
},

#[clap(alias("s"), about = "Submit a new compute job [aliases: s]")]
Submit {
Expand Down
19 changes: 16 additions & 3 deletions coman/src/components/file_tree.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{iter, path::PathBuf};

use tokio::sync::mpsc;
use tui_realm_treeview::{Node, NodeValue, TREE_CMD_CLOSE, TREE_CMD_OPEN, Tree, TreeView};
use tui_realm_treeview::{Node, NodeValue, TREE_CMD_CLOSE, TREE_CMD_OPEN, TREE_INITIAL_NODE, Tree, TreeView};
use tuirealm::{
Component, Event, MockComponent, State, StateValue,
AttrValue, Attribute, Component, Event, MockComponent, State, StateValue,
command::{Cmd, CmdResult, Direction, Position},
event::{Key, KeyEvent, KeyModifiers},
props::{Alignment, BorderType, Borders, Color, Style},
Expand Down Expand Up @@ -84,7 +84,20 @@ impl Component<Msg, UserEvent> for FileTree {
Event::Keyboard(KeyEvent {
code: Key::Left,
modifiers: KeyModifiers::NONE,
}) => self.perform(Cmd::Custom(TREE_CMD_CLOSE)),
}) => {
let current_id = self.state().unwrap_one().unwrap_string();
let node = self.component.tree().root().query(&current_id).unwrap();
if self.component.tree_state().is_closed(node) {
// current node is already closed, so we select and close the parent
if let Some(parent) = self.component.tree().root().parent(node.id()) {
self.attr(
Attribute::Custom(TREE_INITIAL_NODE),
AttrValue::String(parent.id().clone()),
);
}
}
self.perform(Cmd::Custom(TREE_CMD_CLOSE))
}
Event::Keyboard(KeyEvent {
code: Key::Right,
modifiers: KeyModifiers::NONE,
Expand Down
2 changes: 1 addition & 1 deletion coman/src/components/toolbar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
},
trace_dbg,
};
const WORKLOAD_TOOLTIP: &str = "q: quit, Esc: close/back, l: logs, f: File view, x: menu, ?: help";
const WORKLOAD_TOOLTIP: &str = "q: quit, Esc: close/back, l: logs, f: File view, x: menu, tab: switch view, ?: help";
const FILETREE_TOOLTIP: &str = "q: quit, ↑↓: navigate,←→: collapse/expand, x: menu, ?: help";

#[derive(MockComponent)]
Expand Down
2 changes: 1 addition & 1 deletion coman/src/components/workload_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Component<Msg, UserEvent> for WorkloadList {
&& !self.jobs.is_empty()
{
let job = self.jobs[index].clone();
return Some(Msg::Job(JobMsg::ShowLog(job.id)));
return Some(Msg::Job(JobMsg::Show(job.id)));
}
CmdResult::None
}
Expand Down
34 changes: 24 additions & 10 deletions coman/src/components/workload_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,32 @@ use tuirealm::{
props::{Alignment, BorderType, Borders, Color, PropPayload, PropValue, TextSpan},
};

use crate::{
app::{
messages::{JobMsg, Msg},
user_events::{CscsEvent, UserEvent},
},
trace_dbg,
use crate::app::{
messages::{JobMsg, Msg},
user_events::{CscsEvent, UserEvent},
};

#[derive(MockComponent)]
pub struct WorkloadLog {
component: Textarea,
stderr: bool,
}

impl WorkloadLog {
pub fn new() -> Self {
Self {
component: Textarea::default()
.borders(Borders::default().modifiers(BorderType::Rounded).color(Color::Yellow))
.title("Workload Log", Alignment::Center)
.title("Workload Log (stdout)", Alignment::Center)
.step(4),
stderr: false,
}
}
}
impl Component<Msg, UserEvent> for WorkloadLog {
fn on(&mut self, ev: tuirealm::Event<UserEvent>) -> Option<Msg> {
let _ = match ev {
Event::User(UserEvent::Cscs(CscsEvent::GotJobLog(log))) => {
let _ = trace_dbg!("got log component");
let log = trace_dbg!(log);
self.attr(
Attribute::Text,
AttrValue::Payload(PropPayload::Vec(
Expand All @@ -51,8 +48,25 @@ impl Component<Msg, UserEvent> for WorkloadLog {
Event::Keyboard(KeyEvent { code: Key::PageUp, .. }) => self.perform(Cmd::Scroll(Direction::Up)),
Event::Keyboard(KeyEvent { code: Key::Home, .. }) => self.perform(Cmd::GoTo(Position::Begin)),
Event::Keyboard(KeyEvent { code: Key::End, .. }) => self.perform(Cmd::GoTo(Position::End)),
Event::Keyboard(KeyEvent { code: Key::Tab, .. }) => {
self.stderr = !self.stderr;
if self.stderr {
self.attr(
Attribute::Title,
AttrValue::Title(("Workload Log (stderr)".to_owned(), Alignment::Center)),
);
} else {
self.attr(
Attribute::Title,
AttrValue::Title(("Workload Log (stdout)".to_owned(), Alignment::Center)),
);
}
// empty log view
self.attr(Attribute::Text, AttrValue::Payload(PropPayload::Vec(vec![])));
return Some(Msg::Job(JobMsg::Switch));
}
Event::Keyboard(KeyEvent { code: Key::Esc, .. }) => {
return Some(Msg::Job(JobMsg::CloseLog));
return Some(Msg::Job(JobMsg::Close));
}
_ => CmdResult::None,
};
Expand Down
8 changes: 7 additions & 1 deletion coman/src/cscs/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ pub(crate) async fn cli_cscs_job_detail(

pub(crate) async fn cli_cscs_job_log(
job_id: i64,
stderr: bool,
system: Option<String>,
platform: Option<ComputePlatform>,
) -> Result<()> {
match cscs_job_log(job_id, system, platform).await {
match cscs_job_log(job_id, stderr, system, platform).await {
Ok(content) => {
println!("{}", content);
Ok(())
Expand Down Expand Up @@ -171,6 +172,11 @@ pub(crate) async fn cli_cscs_file_download(
system: Option<String>,
platform: Option<ComputePlatform>,
) -> Result<()> {
let local = if local.is_dir() {
local.join(remote.file_name().ok_or(eyre!("couldn't get name of remote file"))?)
} else {
local
};
match cscs_file_download(remote, local.clone(), account, system.clone(), platform.clone()).await {
Ok(None) => {
println!("File successfully downloaded");
Expand Down
36 changes: 27 additions & 9 deletions coman/src/cscs/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use reqwest::Url;
use crate::{
config::{ComputePlatform, Config},
cscs::{
api_client::{CscsApi, FileStat, FileSystemType, Job, JobDetail, PathEntry, S3Upload, System, UserInfo},
api_client::{
CscsApi, FileStat, FileSystemType, Job, JobDetail, PathEntry, PathType, S3Upload, System, UserInfo,
},
oauth2::{
CLIENT_ID_SECRET_NAME, CLIENT_SECRET_SECRET_NAME, client_credentials_login, finish_cscs_device_login,
start_cscs_device_login,
Expand Down Expand Up @@ -115,7 +117,12 @@ pub async fn cscs_job_details(
}
}

pub async fn cscs_job_log(job_id: i64, system: Option<String>, platform: Option<ComputePlatform>) -> Result<String> {
pub async fn cscs_job_log(
job_id: i64,
stderr: bool,
system: Option<String>,
platform: Option<ComputePlatform>,
) -> Result<String> {
match get_access_token().await {
Ok(access_token) => {
let api_client = CscsApi::new(access_token.0, platform).unwrap();
Expand All @@ -125,9 +132,12 @@ pub async fn cscs_job_log(job_id: i64, system: Option<String>, platform: Option<
if job.is_none() {
return Err(eyre!("couldn't find job {}", job_id));
}
api_client
.tail(current_system, PathBuf::from(job.unwrap().stdout), 100)
.await
let path = if stderr {
PathBuf::from(job.unwrap().stderr)
} else {
PathBuf::from(job.unwrap().stdout)
};
api_client.tail(current_system, path, 100).await
}
Err(e) => Err(e),
}
Expand Down Expand Up @@ -321,6 +331,16 @@ pub async fn cscs_file_upload(
Ok(access_token) => {
let api_client = CscsApi::new(access_token.0, platform).unwrap();
let config = Config::new().unwrap();
let current_system = &system.unwrap_or(config.cscs.current_system);
let existing = api_client.list_path(current_system, remote.clone()).await?;
let remote = if !existing.is_empty() {
if existing.len() == 1 && existing[0].path_type == PathType::File {
return Err(eyre!("remote file already exists"));
}
remote.join(local.file_name().ok_or(eyre!("couldn't get filename for local file"))?)
} else {
remote
};

let file_meta = std::fs::metadata(local.clone())?;

Expand All @@ -333,15 +353,13 @@ pub async fn cscs_file_upload(
if size < CSCS_MAX_DIRECT_SIZE {
// upload directly
let contents = std::fs::read(local)?;
api_client
.upload(&system.unwrap_or(config.cscs.current_system), remote, contents)
.await?;
api_client.upload(current_system, remote, contents).await?;
Ok(None)
} else {
// upload via s3
let account = account.or(config.cscs.account);
let transfer_data = api_client
.transfer_upload(&config.cscs.current_system, account, remote, size as i64)
.transfer_upload(current_system, account, remote, size as i64)
.await?;

Ok(Some(transfer_data))
Expand Down
26 changes: 22 additions & 4 deletions coman/src/cscs/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,25 @@ impl PollAsync<UserEvent> for AsyncSelectSystemPort {
}
}

pub enum JobLogAction {
Job(usize),
SwitchLog,
Stop,
}

/// This port handles polling the logs of a CSCS job
pub(crate) struct AsyncJobLogPort {
receiver: mpsc::Receiver<Option<usize>>,
receiver: mpsc::Receiver<JobLogAction>,
current_job: Option<usize>,
stderr: bool,
}

impl AsyncJobLogPort {
pub fn new(receiver: mpsc::Receiver<Option<usize>>) -> Self {
pub fn new(receiver: mpsc::Receiver<JobLogAction>) -> Self {
Self {
receiver,
current_job: None,
stderr: false,
}
}
}
Expand All @@ -176,11 +184,21 @@ impl PollAsync<UserEvent> for AsyncJobLogPort {
}
if !self.receiver.is_empty() {
if let Some(val) = self.receiver.recv().await {
self.current_job = val;
match val {
JobLogAction::Job(jobid) => {
self.current_job = Some(jobid);
}
JobLogAction::SwitchLog => {
self.stderr = !self.stderr;
}
JobLogAction::Stop => {
self.current_job = None;
}
}
}
Ok(Some(Event::None))
} else if let Some(job_id) = self.current_job {
match cscs_job_log(job_id as i64, None, None).await {
match cscs_job_log(job_id as i64, self.stderr, None, None).await {
Ok(log) => {
let log = trace_dbg!(log);
Ok(Some(Event::User(UserEvent::Cscs(CscsEvent::GotJobLog(log)))))
Expand Down
4 changes: 3 additions & 1 deletion coman/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ async fn main() -> Result<()> {
cli::CscsCommands::Job { command } => match command {
cli::CscsJobCommands::List => cli_cscs_job_list(system, platform).await?,
cli::CscsJobCommands::Get { job_id } => cli_cscs_job_detail(job_id, system, platform).await?,
cli::CscsJobCommands::Log { job_id } => cli_cscs_job_log(job_id, system, platform).await?,
cli::CscsJobCommands::Log { job_id, stderr } => {
cli_cscs_job_log(job_id, stderr, system, platform).await?
}
cli::CscsJobCommands::Submit {
script_file,
image,
Expand Down