Skip to content

Commit d5904fd

Browse files
authored
Merge pull request #12 from tinythings/isbm-data-deduplication-modes
Add global dedup mode alongside existing one
2 parents c328b03 + aa18e67 commit d5904fd

15 files changed

Lines changed: 960 additions & 102 deletions

File tree

demo/kill-bill/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ From this directory:
3030
The script does this:
3131

3232
1. starts `ljd` in file mode
33-
2. writes 100 numbered OTLP log messages into one `killbill.logjet` file
33+
2. writes 100 numbered OTLP log messages into one `killbill.logjet` file with a short pacing delay so several physical blocks are created
3434
3. stops `ljd`
3535
4. cuts out the middle third of the file by raw bytes
3636
5. saves only that byte slice as `./damaged/killbill.logjet`
@@ -40,6 +40,9 @@ The script does this:
4040

4141
The damaged file does not start on a real block boundary. Its front is just a
4242
raw byte slice from the middle of the original file.
43+
The pacing delay is intentional: it makes the demo deterministic across fast
44+
machines by ensuring the original file contains several independently
45+
recoverable blocks before the byte cut.
4346

4447
## Expected Result
4548

demo/kill-bill/run-demo.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ LOG_DIR="$SCRIPT_DIR/logs"
1111
DAMAGED_DIR="$SCRIPT_DIR/damaged"
1212
ORIGINAL_FILE="$LOG_DIR/killbill.logjet"
1313
DAMAGED_FILE="$DAMAGED_DIR/killbill.logjet"
14+
EMIT_DELAY_S=0.03
1415

1516
for bin in "$LJD" "$EMITTER" "$COLLECTOR"; do
1617
if [ ! -x "$bin" ]; then
@@ -34,6 +35,7 @@ rm -rf "$LOG_DIR" "$DAMAGED_DIR"
3435
mkdir -p "$LOG_DIR" "$DAMAGED_DIR"
3536

3637
echo "starting ljd to write one .logjet file with 100 messages"
38+
echo "pacing writes by ${EMIT_DELAY_S}s so the demo reliably produces multiple recoverable blocks"
3739
"$LJD" --config "$CONFIG" &
3840
LJD_PID=$!
3941

@@ -43,6 +45,7 @@ i=1
4345
while [ "$i" -le 100 ]; do
4446
MESSAGE=$(printf 'KILL BILL %03d: the reader should recover this if its block survives the byte cut' "$i")
4547
"$EMITTER" 127.0.0.1:4318 --once --service-name KILL-BILL --message "$MESSAGE" >/dev/null
48+
sleep "$EMIT_DELAY_S"
4649
i=$((i + 1))
4750
done
4851

ljx/src/cli.rs

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub enum Command {
5555
Stats(StatsArgs),
5656
#[command(name = "view", alias = "cat", about = "Interactively browse filtered records in a terminal UI")]
5757
View(ViewArgs),
58-
#[command(about = "Deduplicate log records, collapsing identical or similar bodies")]
58+
#[command(about = "Deduplicate log records across the whole selection or collapse nearby bursts")]
5959
Dedup(DedupArgs),
6060
}
6161

@@ -67,26 +67,36 @@ pub struct DedupArgs {
6767
#[arg(short, long, value_name = "OUTPUT", help = "Output .logjet file or - for stdout")]
6868
pub output: PathBuf,
6969

70-
#[arg(long, value_enum, default_value_t = DedupModeArg::Hash2)]
71-
pub mode: DedupModeArg,
70+
#[arg(long, value_enum, default_value_t = DedupBehaviorArg::Distinct, help = "Dedup behavior: distinct across the whole selection, or collapse nearby bursts")]
71+
pub mode: DedupBehaviorArg,
7272

73-
#[arg(long, value_name = "KEYS", help = "Comma-separated bucket extensions: scope, source_line")]
73+
#[arg(long = "match", alias = "matcher", value_enum, default_value_t = DedupMatchArg::Canon, help = "Matcher level inside the selected mode: exact, canon, or full")]
74+
pub matcher: DedupMatchArg,
75+
76+
#[arg(long, value_name = "KEYS", help = "Comma-separated bucket extensions: scope, source_line (used by collapse mode)")]
7477
pub bucket_by: Option<String>,
7578

76-
#[arg(long, value_name = "FLOAT", help = "Drain3 similarity threshold (full mode only) [default: 0.7]")]
79+
#[arg(long, value_name = "FLOAT", help = "Drain3 similarity threshold (full match only) [default: 0.7]")]
7780
pub sim_th: Option<f64>,
7881

79-
#[arg(long, value_name = "INT", help = "Drain3 prefix tree depth (full mode only) [default: 3]")]
82+
#[arg(long, value_name = "INT", help = "Drain3 prefix tree depth (full match only) [default: 3]")]
8083
pub drain_depth: Option<i64>,
8184

82-
#[arg(long, value_name = "DELIMS", help = "Drain3 extra delimiters, comma-separated (full mode only)")]
85+
#[arg(long, value_name = "DELIMS", help = "Drain3 extra delimiters, comma-separated (full match only)")]
8386
pub extra_delimiters: Option<String>,
8487
}
8588

8689
#[derive(Debug, Clone, Copy, ValueEnum)]
87-
pub enum DedupModeArg {
90+
pub enum DedupBehaviorArg {
91+
Distinct,
92+
Collapse,
93+
}
94+
95+
#[derive(Debug, Clone, Copy, ValueEnum)]
96+
pub enum DedupMatchArg {
8897
Exact,
89-
Hash2,
98+
#[value(alias = "hash2")]
99+
Canon,
90100
Full,
91101
}
92102

@@ -183,3 +193,26 @@ impl From<OutputCodec> for logjet::Codec {
183193
}
184194
}
185195
}
196+
197+
#[cfg(test)]
198+
mod cli_utst {
199+
use super::{Cli, Command, DedupBehaviorArg, DedupMatchArg};
200+
use clap::Parser;
201+
202+
#[test]
203+
fn dedup_defaults_to_distinct_canon() {
204+
let cli = Cli::try_parse_from(["ljx", "dedup", "input.logjet", "-o", "out.logjet"]).expect("cli parses");
205+
let Command::Dedup(args) = cli.command else { panic!("expected dedup command") };
206+
assert!(matches!(args.mode, DedupBehaviorArg::Distinct));
207+
assert!(matches!(args.matcher, DedupMatchArg::Canon));
208+
}
209+
210+
#[test]
211+
fn dedup_accepts_collapse_and_full_matcher() {
212+
let cli =
213+
Cli::try_parse_from(["ljx", "dedup", "input.logjet", "-o", "out.logjet", "--mode", "collapse", "--match", "full"]).expect("cli parses");
214+
let Command::Dedup(args) = cli.command else { panic!("expected dedup command") };
215+
assert!(matches!(args.mode, DedupBehaviorArg::Collapse));
216+
assert!(matches!(args.matcher, DedupMatchArg::Full));
217+
}
218+
}

ljx/src/commands/dedup.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,27 @@ use std::io::Write;
44

55
use logjet::{LogjetReader, LogjetWriter};
66

7-
use crate::cli::{DedupArgs, DedupModeArg};
7+
use crate::cli::{DedupArgs, DedupBehaviorArg, DedupMatchArg};
88
use crate::dedup::flat_record::BucketKeyKind;
9-
use crate::dedup::{self, DedupMode, DedupOpts, DrainOpts};
9+
use crate::dedup::{self, DedupMatchMode, DedupMode, DedupOpts, DrainOpts};
1010
use crate::error::Result;
1111
use crate::input::{InputHandle, open_output};
1212

13-
impl From<DedupModeArg> for DedupMode {
14-
fn from(value: DedupModeArg) -> Self {
13+
impl From<DedupBehaviorArg> for DedupMode {
14+
fn from(value: DedupBehaviorArg) -> Self {
1515
match value {
16-
DedupModeArg::Exact => Self::Exact,
17-
DedupModeArg::Hash2 => Self::Hash2,
18-
DedupModeArg::Full => Self::Full,
16+
DedupBehaviorArg::Distinct => Self::Distinct,
17+
DedupBehaviorArg::Collapse => Self::Collapse,
18+
}
19+
}
20+
}
21+
22+
impl From<DedupMatchArg> for DedupMatchMode {
23+
fn from(value: DedupMatchArg) -> Self {
24+
match value {
25+
DedupMatchArg::Exact => Self::Exact,
26+
DedupMatchArg::Canon => Self::Hash2,
27+
DedupMatchArg::Full => Self::Full,
1928
}
2029
}
2130
}
@@ -27,7 +36,7 @@ pub fn run(args: DedupArgs) -> Result<()> {
2736
depth: args.drain_depth.unwrap_or(3),
2837
extra_delimiters: args.extra_delimiters.as_ref().map(|s| s.split(',').map(String::from).collect()).unwrap_or_default(),
2938
};
30-
let opts = DedupOpts { mode: args.mode.into(), bucket_key, drain };
39+
let opts = DedupOpts { mode: args.mode.into(), match_mode: args.matcher.into(), bucket_key, drain };
3140

3241
let input = InputHandle::open(&args.input)?;
3342
let mut reader = LogjetReader::new(input.into_buf_reader());

0 commit comments

Comments
 (0)