Skip to content

Commit 8bee7c5

Browse files
authored
Merge pull request #11 from tinythings/isbm-data-deduplication
Data deduplication
2 parents 4051228 + e8e92b7 commit 8bee7c5

29 files changed

Lines changed: 3463 additions & 68 deletions

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

doc/ljx.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ Documented command set:
3737
- `view`
3838
- `split`
3939
- `join`
40+
- `dedup`
4041

4142
Current implementation status for release `0.1`:
4243

@@ -155,6 +156,65 @@ Potential validation:
155156
- sequence continuity checks
156157
- timestamp monotonicity checks
157158

159+
## `ljx dedup`
160+
161+
Deduplicate log records by collapsing identical or structurally similar bodies.
162+
163+
Three modes, each building on the previous:
164+
165+
- `exact` -- collapse records with byte-identical bodies within the same bucket.
166+
- `hash2` (default) -- canonicalise bodies (normalise numbers, IDs, paths, timestamps),
167+
then collapse records sharing the same canonical form.
168+
- `full` -- after hash2, run Drain3 template mining on remaining singletons to catch
169+
near-duplicates that differ by alphabetic tokens.
170+
171+
Records are partitioned into buckets by `(service.name, severity_number)` before any
172+
dedup. No stage ever merges records across buckets.
173+
174+
Intended examples:
175+
176+
```text
177+
ljx dedup telemetry.logjet -o deduped.logjet
178+
ljx dedup telemetry.logjet -o deduped.logjet --mode=exact
179+
ljx dedup telemetry.logjet -o deduped.logjet --mode=full
180+
ljx dedup telemetry.logjet -o deduped.logjet --bucket-by=scope
181+
ljx dedup telemetry.logjet -o deduped.logjet --mode=full --sim-th=0.8
182+
```
183+
184+
Each output record represents a group of collapsed inputs. The original body from the
185+
first-seen record is preserved. Dedup metadata is added as attributes:
186+
187+
- `dedup.count` -- number of records collapsed into this group
188+
- `dedup.mode` -- which stage produced the group (`exact`, `hash2`, `full/canon`, `full/drain3`)
189+
- `dedup.signature` -- hex hash identifying the group
190+
- `dedup.canonical_body` -- normalised body form (hash2 and full modes)
191+
- `dedup.body_shape` -- detected body type (`json`, `kv`, `prefixed`, `freetext`)
192+
- `dedup.first_seen_ns`, `dedup.last_seen_ns` -- timestamp range of the group
193+
- `dedup.time_span_ms` -- duration the pattern was active
194+
- `dedup.exemplar_trace_ids`, `dedup.exemplar_span_ids` -- up to 3 trace/span IDs for RCA
195+
- `dedup.drain3_template` -- Drain3 template with `<*>` wildcards (full mode only)
196+
- `dedup.drain3_cluster_id` -- Drain3 cluster ID (full mode only)
197+
198+
Non-log records (metrics, traces) pass through unchanged.
199+
200+
Bucket extensions via `--bucket-by`:
201+
202+
- `scope` -- add `instrumentation_scope.name` to the bucket key
203+
- `source_line` -- add `code.filepath` + `code.lineno` to the bucket key
204+
205+
Drain3-specific options (full mode only):
206+
207+
- `--sim-th` -- similarity threshold, 0.0 to 1.0 (default 0.7)
208+
- `--drain-depth` -- prefix tree depth (default 3)
209+
- `--extra-delimiters` -- comma-separated extra token delimiters
210+
211+
Expected properties:
212+
213+
- output is valid `.logjet`
214+
- non-log records preserved in original order
215+
- deterministic for exact and hash2 modes (same input, same output)
216+
- full mode is order-dependent (Drain3 produces different templates for different input orders)
217+
158218
## Implementation Notes
159219

160220
The simplest useful internal shape for `ljx` is:

ljx/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@ opentelemetry-proto = { version = "0.31", features = ["gen-tonic", "logs"] }
1414
prost = "0.14"
1515
ratatui = "0.29"
1616
regex = "1.12"
17+
serde_json = "1"
1718
shlex = "1.3"
19+
xxhash-rust = { version = "0.8", features = ["xxh3"] }

ljx/src/cli.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,39 @@ pub enum Command {
5555
Stats(StatsArgs),
5656
#[command(name = "view", alias = "cat", about = "Interactively browse filtered records in a terminal UI")]
5757
View(ViewArgs),
58+
#[command(about = "Deduplicate log records, collapsing identical or similar bodies")]
59+
Dedup(DedupArgs),
60+
}
61+
62+
#[derive(Debug, Clone, Args)]
63+
pub struct DedupArgs {
64+
#[arg(value_name = "INPUT", help = "Input .logjet file or - for stdin")]
65+
pub input: PathBuf,
66+
67+
#[arg(short, long, value_name = "OUTPUT", help = "Output .logjet file or - for stdout")]
68+
pub output: PathBuf,
69+
70+
#[arg(long, value_enum, default_value_t = DedupModeArg::Hash2)]
71+
pub mode: DedupModeArg,
72+
73+
#[arg(long, value_name = "KEYS", help = "Comma-separated bucket extensions: scope, source_line")]
74+
pub bucket_by: Option<String>,
75+
76+
#[arg(long, value_name = "FLOAT", help = "Drain3 similarity threshold (full mode only) [default: 0.7]")]
77+
pub sim_th: Option<f64>,
78+
79+
#[arg(long, value_name = "INT", help = "Drain3 prefix tree depth (full mode only) [default: 3]")]
80+
pub drain_depth: Option<i64>,
81+
82+
#[arg(long, value_name = "DELIMS", help = "Drain3 extra delimiters, comma-separated (full mode only)")]
83+
pub extra_delimiters: Option<String>,
84+
}
85+
86+
#[derive(Debug, Clone, Copy, ValueEnum)]
87+
pub enum DedupModeArg {
88+
Exact,
89+
Hash2,
90+
Full,
5891
}
5992

6093
#[derive(Debug, Clone, Args)]

ljx/src/commands/dedup.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
//! `ljx dedup` command: run the dedup pipeline on a .logjet file.
2+
3+
use std::io::Write;
4+
5+
use logjet::{LogjetReader, LogjetWriter};
6+
7+
use crate::cli::{DedupArgs, DedupModeArg};
8+
use crate::dedup::flat_record::BucketKeyKind;
9+
use crate::dedup::{self, DedupMode, DedupOpts, DrainOpts};
10+
use crate::error::Result;
11+
use crate::input::{InputHandle, open_output};
12+
13+
impl From<DedupModeArg> for DedupMode {
14+
fn from(value: DedupModeArg) -> Self {
15+
match value {
16+
DedupModeArg::Exact => Self::Exact,
17+
DedupModeArg::Hash2 => Self::Hash2,
18+
DedupModeArg::Full => Self::Full,
19+
}
20+
}
21+
}
22+
23+
pub fn run(args: DedupArgs) -> Result<()> {
24+
let bucket_key = parse_bucket_by(&args.bucket_by)?;
25+
let drain = DrainOpts {
26+
sim_th: args.sim_th.unwrap_or(0.7),
27+
depth: args.drain_depth.unwrap_or(3),
28+
extra_delimiters: args.extra_delimiters.as_ref().map(|s| s.split(',').map(String::from).collect()).unwrap_or_default(),
29+
};
30+
let opts = DedupOpts { mode: args.mode.into(), bucket_key, drain };
31+
32+
let input = InputHandle::open(&args.input)?;
33+
let mut reader = LogjetReader::new(input.into_buf_reader());
34+
let unpacked = dedup::unpack::unpack(&mut reader)?;
35+
36+
let output = open_output(&args.output)?;
37+
let mut writer = LogjetWriter::new(output);
38+
39+
let stats = dedup::dedup(unpacked.records, unpacked.passthrough, &mut writer, &opts)?;
40+
41+
let mut out = writer.into_inner()?;
42+
out.flush()?;
43+
44+
eprintln!("{} records → {} groups ({:.1}% reduction)", stats.total_records, stats.group_count, stats.reduction_pct(),);
45+
Ok(())
46+
}
47+
48+
fn parse_bucket_by(bucket_by: &Option<String>) -> Result<BucketKeyKind> {
49+
let Some(val) = bucket_by else {
50+
return Ok(BucketKeyKind::Default);
51+
};
52+
let parts: Vec<&str> = val.split(',').map(str::trim).collect();
53+
let has_scope = parts.contains(&"scope");
54+
let has_source = parts.contains(&"source_line");
55+
for &p in &parts {
56+
if p != "scope" && p != "source_line" {
57+
return Err(crate::error::Error::Usage(format!("unknown --bucket-by value: {p:?} (valid: scope, source_line)")));
58+
}
59+
}
60+
Ok(match (has_scope, has_source) {
61+
(true, true) => BucketKeyKind::ScopeAndSourceLine,
62+
(true, false) => BucketKeyKind::Scope,
63+
(false, true) => BucketKeyKind::SourceLine,
64+
(false, false) => BucketKeyKind::Default,
65+
})
66+
}

ljx/src/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod count;
2+
pub mod dedup;
23
pub mod filter;
34
pub mod join;
45
pub mod split;

0 commit comments

Comments
 (0)