Skip to content

Commit 058965e

Browse files
author
Bo Maryniuk
committed
Add ljx tool for working with logjet files
1 parent 8cffdc4 commit 058965e

13 files changed

Lines changed: 665 additions & 0 deletions

File tree

ljx/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "ljx"
3+
version = "0.1.0"
4+
edition = "2024"
5+
license = "Apache-2.0"
6+
7+
[dependencies]
8+
clap = { version = "4.5", features = ["derive", "wrap_help"] }
9+
colored = "3"
10+
logjet = { path = ".." }
11+
regex = "1.12"

ljx/src/cli.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use std::path::PathBuf;
2+
3+
use clap::builder::styling;
4+
use clap::{Args, CommandFactory, Parser, Subcommand, ValueEnum};
5+
use colored::Colorize;
6+
7+
use crate::predicate::PredicateArgs;
8+
9+
#[derive(Debug, Parser)]
10+
#[command(
11+
name = "ljx",
12+
version,
13+
about = "Offline toolbox for .logjet files",
14+
long_about = None
15+
)]
16+
pub struct Cli {
17+
#[command(subcommand)]
18+
pub command: Command,
19+
}
20+
21+
pub fn build_cli() -> clap::Command {
22+
let appname = "ljx";
23+
let styles = styling::Styles::styled()
24+
.header(styling::AnsiColor::Yellow.on_default())
25+
.usage(styling::AnsiColor::Yellow.on_default())
26+
.literal(styling::AnsiColor::BrightGreen.on_default())
27+
.placeholder(styling::AnsiColor::BrightMagenta.on_default());
28+
29+
Cli::command()
30+
.styles(styles)
31+
.about(format!(
32+
"{} - {}",
33+
appname.bright_magenta().bold(),
34+
"offline toolbox for .logjet streams"
35+
))
36+
.override_usage(format!(
37+
"{appname} <COMMAND> [OPTIONS] [ARGS]"
38+
))
39+
.after_help(
40+
"Examples:\n ljx count telemetry.logjet -F error -i\n ljx filter telemetry.logjet -o only-logs.logjet -e 'java\\..*\\.bs'",
41+
)
42+
}
43+
44+
#[derive(Debug, Subcommand)]
45+
pub enum Command {
46+
Split(SplitArgs),
47+
Join(JoinArgs),
48+
Filter(FilterArgs),
49+
Count(CountArgs),
50+
Stats(StatsArgs),
51+
Cat(CatArgs),
52+
}
53+
54+
#[derive(Debug, Clone, Args)]
55+
pub struct CountArgs {
56+
#[arg(value_name = "INPUT", help = "Input .logjet file or - for stdin")]
57+
pub input: PathBuf,
58+
59+
#[command(flatten)]
60+
pub predicate: PredicateArgs,
61+
}
62+
63+
#[derive(Debug, Clone, Args)]
64+
pub struct FilterArgs {
65+
#[arg(value_name = "INPUT", help = "Input .logjet file or - for stdin")]
66+
pub input: PathBuf,
67+
68+
#[arg(
69+
short,
70+
long,
71+
value_name = "OUTPUT",
72+
help = "Output .logjet file or - for stdout"
73+
)]
74+
pub output: PathBuf,
75+
76+
#[arg(long, value_enum, default_value_t = OutputCodec::Lz4)]
77+
pub codec: OutputCodec,
78+
79+
#[arg(
80+
long,
81+
default_value_t = logjet::DEFAULT_BLOCK_TARGET_SIZE,
82+
help = "Target uncompressed bytes per output block"
83+
)]
84+
pub block_target_size: usize,
85+
86+
#[command(flatten)]
87+
pub predicate: PredicateArgs,
88+
}
89+
90+
#[derive(Debug, Clone, Args)]
91+
pub struct SplitArgs {
92+
#[arg(value_name = "INPUT")]
93+
pub input: PathBuf,
94+
95+
#[arg(value_name = "OUTPUT_PREFIX")]
96+
pub output_prefix: PathBuf,
97+
98+
#[arg(long)]
99+
pub max_bytes: Option<u64>,
100+
101+
#[arg(long)]
102+
pub max_records: Option<u64>,
103+
104+
#[arg(long, help = "RFC 3339 or unix-ns range support to be added")]
105+
pub timestamp_range: Option<String>,
106+
}
107+
108+
#[derive(Debug, Clone, Args)]
109+
pub struct JoinArgs {
110+
#[arg(value_name = "INPUT", required = true)]
111+
pub inputs: Vec<PathBuf>,
112+
113+
#[arg(short, long, value_name = "OUTPUT")]
114+
pub output: PathBuf,
115+
116+
#[arg(long)]
117+
pub validate_sequence_continuity: bool,
118+
}
119+
120+
#[derive(Debug, Clone, Args)]
121+
pub struct StatsArgs {
122+
#[arg(value_name = "INPUT")]
123+
pub input: PathBuf,
124+
125+
#[arg(long, help = "Compute payload size summaries by record type")]
126+
pub field_stats: bool,
127+
}
128+
129+
#[derive(Debug, Clone, Args)]
130+
pub struct CatArgs {
131+
#[arg(value_name = "INPUT")]
132+
pub input: PathBuf,
133+
134+
#[arg(long, default_value_t = false)]
135+
pub hex_payload: bool,
136+
}
137+
138+
#[derive(Debug, Clone, Copy, ValueEnum)]
139+
pub enum OutputCodec {
140+
None,
141+
Lz4,
142+
}
143+
144+
impl From<OutputCodec> for logjet::Codec {
145+
fn from(value: OutputCodec) -> Self {
146+
match value {
147+
OutputCodec::None => Self::None,
148+
OutputCodec::Lz4 => Self::Lz4,
149+
}
150+
}
151+
}

ljx/src/commands/cat.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use crate::cli::CatArgs;
2+
use crate::error::{Error, Result};
3+
4+
pub fn run(_args: CatArgs) -> Result<()> {
5+
Err(Error::Unimplemented(
6+
"cat is not implemented yet; the CLI shape is defined but record rendering still needs to be added",
7+
))
8+
}

ljx/src/commands/count.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use logjet::LogjetReader;
2+
3+
use crate::cli::CountArgs;
4+
use crate::error::Result;
5+
use crate::input::InputHandle;
6+
7+
pub fn run(args: CountArgs) -> Result<()> {
8+
let predicate = args.predicate.build()?;
9+
let input = InputHandle::open(&args.input)?;
10+
let mut reader = LogjetReader::new(input.into_buf_reader());
11+
12+
let mut count = 0u64;
13+
while let Some(record) = reader.next_record()? {
14+
if predicate.matches(&record) {
15+
count = count
16+
.checked_add(1)
17+
.ok_or(logjet::Error::NumericOverflow("count"))?;
18+
}
19+
}
20+
21+
println!("{count}");
22+
Ok(())
23+
}

ljx/src/commands/filter.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use std::io::Write;
2+
3+
use logjet::{LogjetReader, LogjetWriter, WriterConfig};
4+
5+
use crate::cli::FilterArgs;
6+
use crate::error::Result;
7+
use crate::input::{InputHandle, open_output};
8+
9+
pub fn run(args: FilterArgs) -> Result<()> {
10+
let predicate = args.predicate.build()?;
11+
let input = InputHandle::open(&args.input)?;
12+
let output = open_output(&args.output)?;
13+
let config = WriterConfig {
14+
block_target_size: args.block_target_size,
15+
codec: args.codec.into(),
16+
sync_marker: logjet::DEFAULT_SYNC_MARKER,
17+
};
18+
19+
let mut reader = LogjetReader::new(input.into_buf_reader());
20+
let mut writer = LogjetWriter::with_config(output, config);
21+
22+
while let Some(record) = reader.next_record()? {
23+
if predicate.matches(&record) {
24+
writer.push(
25+
record.record_type,
26+
record.seq,
27+
record.ts_unix_ns,
28+
&record.payload,
29+
)?;
30+
}
31+
}
32+
33+
let mut output = writer.into_inner()?;
34+
output.flush()?;
35+
Ok(())
36+
}

ljx/src/commands/join.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use crate::cli::JoinArgs;
2+
use crate::error::{Error, Result};
3+
4+
pub fn run(_args: JoinArgs) -> Result<()> {
5+
Err(Error::Unimplemented(
6+
"join is not implemented yet; the CLI shape is defined but ordered multi-segment merging still needs to be added",
7+
))
8+
}

ljx/src/commands/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pub mod cat;
2+
pub mod count;
3+
pub mod filter;
4+
pub mod join;
5+
pub mod split;
6+
pub mod stats;

ljx/src/commands/split.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use crate::cli::SplitArgs;
2+
use crate::error::{Error, Result};
3+
4+
pub fn run(_args: SplitArgs) -> Result<()> {
5+
Err(Error::Unimplemented(
6+
"split is not implemented yet; the CLI shape is defined but block-preserving file partitioning still needs to be added",
7+
))
8+
}

ljx/src/commands/stats.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use crate::cli::StatsArgs;
2+
use crate::error::{Error, Result};
3+
4+
pub fn run(_args: StatsArgs) -> Result<()> {
5+
Err(Error::Unimplemented(
6+
"stats is not implemented yet; the CLI shape is defined but streaming summaries still need to be added",
7+
))
8+
}

ljx/src/error.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use std::fmt::{Display, Formatter};
2+
use std::io;
3+
4+
pub type Result<T> = std::result::Result<T, Error>;
5+
6+
#[derive(Debug)]
7+
pub enum Error {
8+
Io(io::Error),
9+
Logjet(logjet::Error),
10+
Usage(String),
11+
Unimplemented(&'static str),
12+
}
13+
14+
impl Display for Error {
15+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
16+
match self {
17+
Self::Io(err) => write!(f, "{err}"),
18+
Self::Logjet(err) => write!(f, "{err}"),
19+
Self::Usage(msg) => write!(f, "{msg}"),
20+
Self::Unimplemented(msg) => write!(f, "{msg}"),
21+
}
22+
}
23+
}
24+
25+
impl std::error::Error for Error {}
26+
27+
impl From<io::Error> for Error {
28+
fn from(value: io::Error) -> Self {
29+
Self::Io(value)
30+
}
31+
}
32+
33+
impl From<logjet::Error> for Error {
34+
fn from(value: logjet::Error) -> Self {
35+
Self::Logjet(value)
36+
}
37+
}

0 commit comments

Comments
 (0)