Skip to content

Commit a53464d

Browse files
committed
add standalone clean subcommand
Signed-off-by: drindr <dreamchancn@qq.com>
1 parent 9573637 commit a53464d

3 files changed

Lines changed: 134 additions & 63 deletions

File tree

binaries/cli/src/command/clean.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use super::{
2+
Executable, default_tracing,
3+
list::{OutputEntry, display_entries},
4+
};
5+
use crate::{
6+
LOCALHOST,
7+
common::{connect_and_check_version, rpc},
8+
formatting::OutputFormat,
9+
};
10+
use clap::Args;
11+
use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT;
12+
use dora_message::{cli_to_coordinator::CliControlClient, tarpc};
13+
use eyre::Context;
14+
15+
#[derive(Debug, Args)]
16+
/// Remove finished and failed dataflows from the list.
17+
pub struct CleanArgs {
18+
/// Address of the dora coordinator
19+
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
20+
pub coordinator_addr: std::net::IpAddr,
21+
/// Port number of the coordinator control server
22+
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
23+
pub coordinator_port: u16,
24+
/// Output format
25+
#[clap(long, value_name = "FORMAT", default_value_t = OutputFormat::Table)]
26+
pub format: OutputFormat,
27+
}
28+
29+
impl Executable for CleanArgs {
30+
async fn execute(self) -> eyre::Result<()> {
31+
default_tracing()?;
32+
33+
let client = connect_and_check_version(self.coordinator_addr, self.coordinator_port)
34+
.await
35+
.wrap_err("failed to connect to dora coordinator")?;
36+
37+
clean(&client, self.format).await
38+
}
39+
}
40+
41+
async fn clean(client: &CliControlClient, format: OutputFormat) -> eyre::Result<()> {
42+
let list = rpc(
43+
"clean finished dataflows",
44+
client.clean(tarpc::context::current()),
45+
)
46+
.await?;
47+
48+
let entries: Vec<OutputEntry> = list
49+
.0
50+
.into_iter()
51+
.map(|entry| OutputEntry {
52+
uuid: entry.id.uuid,
53+
name: entry.id.name.unwrap_or_default(),
54+
status: entry.status,
55+
nodes: 0,
56+
cpu: 0.0,
57+
memory: 0.0,
58+
})
59+
.collect();
60+
61+
if entries.is_empty() {
62+
println!("No finished or failed dataflows to clean.");
63+
return Ok(());
64+
}
65+
66+
display_entries(&entries, format, false)
67+
}

binaries/cli/src/command/list.rs

Lines changed: 62 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ pub struct ListArgs {
3737
/// Sort by field (memory, cpu)
3838
#[clap(long, value_name = "FIELD")]
3939
pub sort_by: Option<String>,
40-
#[clap(long)]
41-
pub clean: bool,
4240
}
4341

4442
impl Executable for ListArgs {
@@ -49,26 +47,18 @@ impl Executable for ListArgs {
4947
.await
5048
.wrap_err("failed to connect to dora coordinator")?;
5149

52-
list(
53-
&client,
54-
self.format,
55-
self.status,
56-
self.name,
57-
self.sort_by,
58-
self.clean,
59-
)
60-
.await
50+
list(&client, self.format, self.status, self.name, self.sort_by).await
6151
}
6252
}
6353

6454
#[derive(Serialize)]
65-
struct OutputEntry {
66-
uuid: Uuid,
67-
name: String,
68-
status: DataflowStatus,
69-
nodes: usize,
70-
cpu: f64,
71-
memory: f64,
55+
pub(super) struct OutputEntry {
56+
pub uuid: Uuid,
57+
pub name: String,
58+
pub status: DataflowStatus,
59+
pub nodes: usize,
60+
pub cpu: f64,
61+
pub memory: f64,
7262
}
7363

7464
#[derive(Default)]
@@ -78,23 +68,66 @@ struct DataflowMetrics {
7868
total_memory_mb: f64,
7969
}
8070

71+
/// Render a list of [`OutputEntry`] values to stdout in either table or JSON format.
72+
///
73+
/// When `show_metrics` is `false` the Nodes, CPU and Memory columns are omitted.
74+
pub(super) fn display_entries(
75+
entries: &[OutputEntry],
76+
format: OutputFormat,
77+
show_metrics: bool,
78+
) -> eyre::Result<()> {
79+
match format {
80+
OutputFormat::Table => {
81+
let mut tw = TabWriter::new(std::io::stdout().lock());
82+
if show_metrics {
83+
tw.write_all(b"UUID\tName\tStatus\tNodes\tCPU\tMemory\n")?;
84+
} else {
85+
tw.write_all(b"UUID\tName\tStatus\n")?;
86+
}
87+
for entry in entries {
88+
let status = match entry.status {
89+
DataflowStatus::Running => "Running",
90+
DataflowStatus::Finished => "Succeeded",
91+
DataflowStatus::Failed => "Failed",
92+
};
93+
if show_metrics {
94+
tw.write_all(
95+
format!(
96+
"{}\t{}\t{}\t{}\t{}\t{}\n",
97+
entry.uuid,
98+
entry.name,
99+
status,
100+
entry.nodes,
101+
format!("{:.1}%", entry.cpu),
102+
format!("{:.1} GB", entry.memory),
103+
)
104+
.as_bytes(),
105+
)?;
106+
} else {
107+
tw.write_all(
108+
format!("{}\t{}\t{}\n", entry.uuid, entry.name, status).as_bytes(),
109+
)?;
110+
}
111+
}
112+
tw.flush()?;
113+
}
114+
OutputFormat::Json => {
115+
for entry in entries {
116+
println!("{}", serde_json::to_string(entry)?);
117+
}
118+
}
119+
}
120+
Ok(())
121+
}
122+
81123
async fn list(
82124
client: &CliControlClient,
83125
format: OutputFormat,
84126
status_filter: Option<String>,
85127
name_filter: Option<String>,
86128
sort_by: Option<String>,
87-
clean: bool,
88129
) -> Result<(), eyre::ErrReport> {
89-
let list = if !clean {
90-
query_running_dataflows(client).await
91-
} else {
92-
rpc(
93-
"clean finished dataflows",
94-
client.clean(tarpc::context::current()),
95-
)
96-
.await
97-
}?;
130+
let list = query_running_dataflows(client).await?;
98131

99132
// Get node information via tarpc
100133
let node_infos = rpc(
@@ -197,39 +230,5 @@ async fn list(
197230
}
198231
}
199232

200-
match format {
201-
OutputFormat::Table => {
202-
let mut tw = TabWriter::new(std::io::stdout().lock());
203-
// Header
204-
tw.write_all(format!("UUID\tName\tStatus\tNodes\tCPU\tMemory\n").as_bytes())?;
205-
for entry in entries {
206-
let status = match entry.status {
207-
DataflowStatus::Running => "Running",
208-
DataflowStatus::Finished => "Succeeded",
209-
DataflowStatus::Failed => "Failed",
210-
};
211-
212-
tw.write_all(
213-
format!(
214-
"{}\t{}\t{}\t{}\t{}\t{}\n",
215-
entry.uuid,
216-
entry.name,
217-
status,
218-
entry.nodes,
219-
format!("{:.1}%", entry.cpu),
220-
format!("{:.1} GB", entry.memory)
221-
)
222-
.as_bytes(),
223-
)?;
224-
}
225-
tw.flush()?;
226-
}
227-
OutputFormat::Json => {
228-
for entry in entries {
229-
println!("{}", serde_json::to_string(&entry)?);
230-
}
231-
}
232-
}
233-
234-
Ok(())
233+
display_entries(&entries, format, true)
235234
}

binaries/cli/src/command/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod build;
2+
mod clean;
23
mod completion;
34
mod coordinator;
45
mod daemon;
@@ -23,6 +24,7 @@ pub use build::{build, build_async};
2324
pub use run::{Run, run, run_func};
2425

2526
use build::Build;
27+
use clean::CleanArgs;
2628
use completion::Completion;
2729
use coordinator::Coordinator;
2830
use daemon::Daemon;
@@ -60,6 +62,8 @@ pub enum Command {
6062
Stop(Stop),
6163
#[clap(alias = "ps")]
6264
List(ListArgs),
65+
/// Remove finished and failed dataflows from the list
66+
Clean(CleanArgs),
6367
// Planned for future releases:
6468
// Dashboard,
6569
#[command(allow_missing_positional = true)]
@@ -119,6 +123,7 @@ impl Executable for Command {
119123
Command::Start(args) => args.execute().await,
120124
Command::Stop(args) => args.execute().await,
121125
Command::List(args) => args.execute().await,
126+
Command::Clean(args) => args.execute().await,
122127
Command::Logs(args) => args.execute().await,
123128
Command::Inspect(args) => args.execute().await,
124129
Command::Daemon(args) => args.execute().await,

0 commit comments

Comments
 (0)