Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions doc/ljx.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Documented command set:
- `view`
- `split`
- `join`
- `dedup`

Current implementation status for release `0.1`:

Expand Down Expand Up @@ -155,6 +156,65 @@ Potential validation:
- sequence continuity checks
- timestamp monotonicity checks

## `ljx dedup`

Deduplicate log records by collapsing identical or structurally similar bodies.

Three modes, each building on the previous:

- `exact` -- collapse records with byte-identical bodies within the same bucket.
- `hash2` (default) -- canonicalise bodies (normalise numbers, IDs, paths, timestamps),
then collapse records sharing the same canonical form.
- `full` -- after hash2, run Drain3 template mining on remaining singletons to catch
near-duplicates that differ by alphabetic tokens.

Records are partitioned into buckets by `(service.name, severity_number)` before any
dedup. No stage ever merges records across buckets.

Intended examples:

```text
ljx dedup telemetry.logjet -o deduped.logjet
ljx dedup telemetry.logjet -o deduped.logjet --mode=exact
ljx dedup telemetry.logjet -o deduped.logjet --mode=full
ljx dedup telemetry.logjet -o deduped.logjet --bucket-by=scope
ljx dedup telemetry.logjet -o deduped.logjet --mode=full --sim-th=0.8
```

Each output record represents a group of collapsed inputs. The original body from the
first-seen record is preserved. Dedup metadata is added as attributes:

- `dedup.count` -- number of records collapsed into this group
- `dedup.mode` -- which stage produced the group (`exact`, `hash2`, `full/canon`, `full/drain3`)
- `dedup.signature` -- hex hash identifying the group
- `dedup.canonical_body` -- normalised body form (hash2 and full modes)
- `dedup.body_shape` -- detected body type (`json`, `kv`, `prefixed`, `freetext`)
- `dedup.first_seen_ns`, `dedup.last_seen_ns` -- timestamp range of the group
- `dedup.time_span_ms` -- duration the pattern was active
- `dedup.exemplar_trace_ids`, `dedup.exemplar_span_ids` -- up to 3 trace/span IDs for RCA
- `dedup.drain3_template` -- Drain3 template with `<*>` wildcards (full mode only)
- `dedup.drain3_cluster_id` -- Drain3 cluster ID (full mode only)

Non-log records (metrics, traces) pass through unchanged.

Bucket extensions via `--bucket-by`:

- `scope` -- add `instrumentation_scope.name` to the bucket key
- `source_line` -- add `code.filepath` + `code.lineno` to the bucket key

Drain3-specific options (full mode only):

- `--sim-th` -- similarity threshold, 0.0 to 1.0 (default 0.7)
- `--drain-depth` -- prefix tree depth (default 3)
- `--extra-delimiters` -- comma-separated extra token delimiters

Expected properties:

- output is valid `.logjet`
- non-log records preserved in original order
- deterministic for exact and hash2 modes (same input, same output)
- full mode is order-dependent (Drain3 produces different templates for different input orders)

## Implementation Notes

The simplest useful internal shape for `ljx` is:
Expand Down
2 changes: 2 additions & 0 deletions ljx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ opentelemetry-proto = { version = "0.31", features = ["gen-tonic", "logs"] }
prost = "0.14"
ratatui = "0.29"
regex = "1.12"
serde_json = "1"
shlex = "1.3"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
33 changes: 33 additions & 0 deletions ljx/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,39 @@ pub enum Command {
Stats(StatsArgs),
#[command(name = "view", alias = "cat", about = "Interactively browse filtered records in a terminal UI")]
View(ViewArgs),
#[command(about = "Deduplicate log records, collapsing identical or similar bodies")]
Dedup(DedupArgs),
}

#[derive(Debug, Clone, Args)]
pub struct DedupArgs {
#[arg(value_name = "INPUT", help = "Input .logjet file or - for stdin")]
pub input: PathBuf,

#[arg(short, long, value_name = "OUTPUT", help = "Output .logjet file or - for stdout")]
pub output: PathBuf,

#[arg(long, value_enum, default_value_t = DedupModeArg::Hash2)]
pub mode: DedupModeArg,

#[arg(long, value_name = "KEYS", help = "Comma-separated bucket extensions: scope, source_line")]
pub bucket_by: Option<String>,

#[arg(long, value_name = "FLOAT", help = "Drain3 similarity threshold (full mode only) [default: 0.7]")]
pub sim_th: Option<f64>,

#[arg(long, value_name = "INT", help = "Drain3 prefix tree depth (full mode only) [default: 3]")]
pub drain_depth: Option<i64>,

#[arg(long, value_name = "DELIMS", help = "Drain3 extra delimiters, comma-separated (full mode only)")]
pub extra_delimiters: Option<String>,
}

#[derive(Debug, Clone, Copy, ValueEnum)]
pub enum DedupModeArg {
Exact,
Hash2,
Full,
}

#[derive(Debug, Clone, Args)]
Expand Down
66 changes: 66 additions & 0 deletions ljx/src/commands/dedup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//! `ljx dedup` command: run the dedup pipeline on a .logjet file.

use std::io::Write;

use logjet::{LogjetReader, LogjetWriter};

use crate::cli::{DedupArgs, DedupModeArg};
use crate::dedup::flat_record::BucketKeyKind;
use crate::dedup::{self, DedupMode, DedupOpts, DrainOpts};
use crate::error::Result;
use crate::input::{InputHandle, open_output};

impl From<DedupModeArg> for DedupMode {
fn from(value: DedupModeArg) -> Self {
match value {
DedupModeArg::Exact => Self::Exact,
DedupModeArg::Hash2 => Self::Hash2,
DedupModeArg::Full => Self::Full,
}
}
}

pub fn run(args: DedupArgs) -> Result<()> {
let bucket_key = parse_bucket_by(&args.bucket_by)?;
let drain = DrainOpts {
sim_th: args.sim_th.unwrap_or(0.7),
depth: args.drain_depth.unwrap_or(3),
extra_delimiters: args.extra_delimiters.as_ref().map(|s| s.split(',').map(String::from).collect()).unwrap_or_default(),
};
let opts = DedupOpts { mode: args.mode.into(), bucket_key, drain };

let input = InputHandle::open(&args.input)?;
let mut reader = LogjetReader::new(input.into_buf_reader());
let unpacked = dedup::unpack::unpack(&mut reader)?;

let output = open_output(&args.output)?;
let mut writer = LogjetWriter::new(output);

let stats = dedup::dedup(unpacked.records, unpacked.passthrough, &mut writer, &opts)?;

let mut out = writer.into_inner()?;
out.flush()?;

eprintln!("{} records → {} groups ({:.1}% reduction)", stats.total_records, stats.group_count, stats.reduction_pct(),);
Ok(())
}

fn parse_bucket_by(bucket_by: &Option<String>) -> Result<BucketKeyKind> {
let Some(val) = bucket_by else {
return Ok(BucketKeyKind::Default);
};
let parts: Vec<&str> = val.split(',').map(str::trim).collect();
let has_scope = parts.contains(&"scope");
let has_source = parts.contains(&"source_line");
for &p in &parts {
if p != "scope" && p != "source_line" {
return Err(crate::error::Error::Usage(format!("unknown --bucket-by value: {p:?} (valid: scope, source_line)")));
}
}
Ok(match (has_scope, has_source) {
(true, true) => BucketKeyKind::ScopeAndSourceLine,
(true, false) => BucketKeyKind::Scope,
(false, true) => BucketKeyKind::SourceLine,
(false, false) => BucketKeyKind::Default,
})
}
1 change: 1 addition & 0 deletions ljx/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod count;
pub mod dedup;
pub mod filter;
pub mod join;
pub mod split;
Expand Down
Loading
Loading