Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion demo/kill-bill/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ From this directory:
The script does this:

1. starts `ljd` in file mode
2. writes 100 numbered OTLP log messages into one `killbill.logjet` file
2. writes 100 numbered OTLP log messages into one `killbill.logjet` file with a short pacing delay so several physical blocks are created
3. stops `ljd`
4. cuts out the middle third of the file by raw bytes
5. saves only that byte slice as `./damaged/killbill.logjet`
Expand All @@ -40,6 +40,9 @@ The script does this:

The damaged file does not start on a real block boundary. Its front is just a
raw byte slice from the middle of the original file.
The pacing delay is intentional: it makes the demo deterministic across fast
machines by ensuring the original file contains several independently
recoverable blocks before the byte cut.

## Expected Result

Expand Down
3 changes: 3 additions & 0 deletions demo/kill-bill/run-demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ LOG_DIR="$SCRIPT_DIR/logs"
DAMAGED_DIR="$SCRIPT_DIR/damaged"
ORIGINAL_FILE="$LOG_DIR/killbill.logjet"
DAMAGED_FILE="$DAMAGED_DIR/killbill.logjet"
EMIT_DELAY_S=0.03

for bin in "$LJD" "$EMITTER" "$COLLECTOR"; do
if [ ! -x "$bin" ]; then
Expand All @@ -34,6 +35,7 @@ rm -rf "$LOG_DIR" "$DAMAGED_DIR"
mkdir -p "$LOG_DIR" "$DAMAGED_DIR"

echo "starting ljd to write one .logjet file with 100 messages"
echo "pacing writes by ${EMIT_DELAY_S}s so the demo reliably produces multiple recoverable blocks"
"$LJD" --config "$CONFIG" &
LJD_PID=$!

Expand All @@ -43,6 +45,7 @@ i=1
while [ "$i" -le 100 ]; do
MESSAGE=$(printf 'KILL BILL %03d: the reader should recover this if its block survives the byte cut' "$i")
"$EMITTER" 127.0.0.1:4318 --once --service-name KILL-BILL --message "$MESSAGE" >/dev/null
sleep "$EMIT_DELAY_S"
i=$((i + 1))
done

Expand Down
51 changes: 42 additions & 9 deletions ljx/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub enum Command {
Stats(StatsArgs),
#[command(name = "view", alias = "cat", about = "Interactively browse filtered records in a terminal UI")]
View(ViewArgs),
#[command(about = "Deduplicate log records, collapsing identical or similar bodies")]
#[command(about = "Deduplicate log records across the whole selection or collapse nearby bursts")]
Dedup(DedupArgs),
}

Expand All @@ -67,26 +67,36 @@ pub struct DedupArgs {
#[arg(short, long, value_name = "OUTPUT", help = "Output .logjet file or - for stdout")]
pub output: PathBuf,

#[arg(long, value_enum, default_value_t = DedupModeArg::Hash2)]
pub mode: DedupModeArg,
#[arg(long, value_enum, default_value_t = DedupBehaviorArg::Distinct, help = "Dedup behavior: distinct across the whole selection, or collapse nearby bursts")]
pub mode: DedupBehaviorArg,

#[arg(long, value_name = "KEYS", help = "Comma-separated bucket extensions: scope, source_line")]
#[arg(long = "match", alias = "matcher", value_enum, default_value_t = DedupMatchArg::Canon, help = "Matcher level inside the selected mode: exact, canon, or full")]
pub matcher: DedupMatchArg,

#[arg(long, value_name = "KEYS", help = "Comma-separated bucket extensions: scope, source_line (used by collapse mode)")]
pub bucket_by: Option<String>,

#[arg(long, value_name = "FLOAT", help = "Drain3 similarity threshold (full mode only) [default: 0.7]")]
#[arg(long, value_name = "FLOAT", help = "Drain3 similarity threshold (full match only) [default: 0.7]")]
pub sim_th: Option<f64>,

#[arg(long, value_name = "INT", help = "Drain3 prefix tree depth (full mode only) [default: 3]")]
#[arg(long, value_name = "INT", help = "Drain3 prefix tree depth (full match only) [default: 3]")]
pub drain_depth: Option<i64>,

#[arg(long, value_name = "DELIMS", help = "Drain3 extra delimiters, comma-separated (full mode only)")]
#[arg(long, value_name = "DELIMS", help = "Drain3 extra delimiters, comma-separated (full match only)")]
pub extra_delimiters: Option<String>,
}

#[derive(Debug, Clone, Copy, ValueEnum)]
pub enum DedupModeArg {
pub enum DedupBehaviorArg {
Distinct,
Collapse,
}

#[derive(Debug, Clone, Copy, ValueEnum)]
pub enum DedupMatchArg {
Exact,
Hash2,
#[value(alias = "hash2")]
Canon,
Full,
}

Expand Down Expand Up @@ -183,3 +193,26 @@ impl From<OutputCodec> for logjet::Codec {
}
}
}

#[cfg(test)]
mod cli_utst {
use super::{Cli, Command, DedupBehaviorArg, DedupMatchArg};
use clap::Parser;

#[test]
fn dedup_defaults_to_distinct_canon() {
let cli = Cli::try_parse_from(["ljx", "dedup", "input.logjet", "-o", "out.logjet"]).expect("cli parses");
let Command::Dedup(args) = cli.command else { panic!("expected dedup command") };
assert!(matches!(args.mode, DedupBehaviorArg::Distinct));
assert!(matches!(args.matcher, DedupMatchArg::Canon));
}

#[test]
fn dedup_accepts_collapse_and_full_matcher() {
let cli =
Cli::try_parse_from(["ljx", "dedup", "input.logjet", "-o", "out.logjet", "--mode", "collapse", "--match", "full"]).expect("cli parses");
let Command::Dedup(args) = cli.command else { panic!("expected dedup command") };
assert!(matches!(args.mode, DedupBehaviorArg::Collapse));
assert!(matches!(args.matcher, DedupMatchArg::Full));
}
}
25 changes: 17 additions & 8 deletions ljx/src/commands/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@ use std::io::Write;

use logjet::{LogjetReader, LogjetWriter};

use crate::cli::{DedupArgs, DedupModeArg};
use crate::cli::{DedupArgs, DedupBehaviorArg, DedupMatchArg};
use crate::dedup::flat_record::BucketKeyKind;
use crate::dedup::{self, DedupMode, DedupOpts, DrainOpts};
use crate::dedup::{self, DedupMatchMode, DedupMode, DedupOpts, DrainOpts};
use crate::error::Result;
use crate::input::{InputHandle, open_output};

impl From<DedupModeArg> for DedupMode {
fn from(value: DedupModeArg) -> Self {
impl From<DedupBehaviorArg> for DedupMode {
fn from(value: DedupBehaviorArg) -> Self {
match value {
DedupModeArg::Exact => Self::Exact,
DedupModeArg::Hash2 => Self::Hash2,
DedupModeArg::Full => Self::Full,
DedupBehaviorArg::Distinct => Self::Distinct,
DedupBehaviorArg::Collapse => Self::Collapse,
}
}
}

impl From<DedupMatchArg> for DedupMatchMode {
fn from(value: DedupMatchArg) -> Self {
match value {
DedupMatchArg::Exact => Self::Exact,
DedupMatchArg::Canon => Self::Hash2,
DedupMatchArg::Full => Self::Full,
}
}
}
Expand All @@ -27,7 +36,7 @@ pub fn run(args: DedupArgs) -> Result<()> {
depth: args.drain_depth.unwrap_or(3),
extra_delimiters: args.extra_delimiters.as_ref().map(|s| s.split(',').map(String::from).collect()).unwrap_or_default(),
};
let opts = DedupOpts { mode: args.mode.into(), bucket_key, drain };
let opts = DedupOpts { mode: args.mode.into(), match_mode: args.matcher.into(), bucket_key, drain };

let input = InputHandle::open(&args.input)?;
let mut reader = LogjetReader::new(input.into_buf_reader());
Expand Down
Loading
Loading