diff --git a/README.md b/README.md index 95d5a19..25dcd17 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,10 @@ Fast, mergeable **KLL** sketch for streaming quantiles — deterministic, zero d ## ✨ Features - **Accurate ε-quantiles** under tight memory bounds (KLL algorithm) - **Deterministic compaction** (parity sampling) + **weight conservation** +- **Weighted ingestion** via `add(x, weight)` for aggregated data - **Mergeable** sketches for distributed/parallel ingestion - **Serializable** (`to_bytes` / `from_bytes`) +- **Convenience helpers** such as `quantiles(m)` for evenly spaced cuts - **Zero dependencies**, Python 3.9+ --- @@ -31,7 +33,8 @@ sk.extend([1, 5, 2, 9, 3, 6, 4, 8, 7]) print("n =", sk.size()) print("median ≈", sk.median()) print("q(0.9) ≈", sk.quantile(0.9)) -```` +print("quartiles ≈", sk.quantiles(4)) +``` ### Merge & Serialize @@ -52,10 +55,11 @@ assert abs(a2.quantile(0.5) - a.quantile(0.5)) < 1e-12 | Method | Description | | ----------------------------- | --------------------------------------- | -| `add(x)` | Ingest one value. | +| `add(x, weight=1)` | Ingest one value with optional weight. | | `extend(xs)` | Ingest an iterable of values. | | `size()` | Total number of ingested items `n`. | | `quantile(q)` | Approximate `q`-quantile for `q∈[0,1]`. | +| `quantiles(m)` | Evenly spaced cut points. | | `median()` | Convenience for `quantile(0.5)`. | | `rank(x)` | Approximate rank of `x` in `[0, n]`. | | `cdf(xs)` | CDF values for a sequence `xs`. | @@ -92,8 +96,6 @@ python -m pytest -q ## 🗺️ Roadmap -* Weighted ingestion (`add(x, w)`). -* `quantiles(m)` helper (evenly spaced cut points). * Optional NumPy/C hot paths for sort/merge. --- diff --git a/kll_sketch/kll_sketch.py b/kll_sketch/kll_sketch.py index ea220e0..7e2e509 100644 --- a/kll_sketch/kll_sketch.py +++ b/kll_sketch/kll_sketch.py @@ -17,7 +17,7 @@ class KLL: """ - KLL streaming quantile sketch (unweighted, mergeable, serializable). + KLL streaming quantile sketch (supports weighted ingestion, mergeable, serializable). Paper: - Karnin, Zohar, Edo Liberty, and Liran Lang. "Optimal quantile approximation @@ -32,8 +32,8 @@ class KLL: This guarantees total weight conservation: Σ(weights) == n. Public API: - add(x), extend(xs), quantile(q), median(), rank(x), cdf(xs), - merge(other), to_bytes(), from_bytes() + add(x, weight=1), extend(xs), quantile(q), quantiles(m), median(), rank(x), + cdf(xs), merge(other), to_bytes(), from_bytes() """ # ---------------------------- Tunable constants ---------------------------- @@ -60,27 +60,36 @@ def __init__(self, capacity: int = 200, rng_seed: int = _DEFAULT_SEED): self._rng_seed = int(rng_seed) # ------------------------------- Public API -------------------------------- - def add(self, x: float) -> None: + def add(self, x: float, weight: float = 1.0) -> None: + """Ingest a value with an optional (integer) weight.""" + + # ``weight`` may be provided as an ``int`` or any float that rounds to an + # integer (for compatibility with NumPy scalars). Each unit of weight is + # equivalent to inserting ``x`` once; internally we fold the binary + # decomposition of ``weight`` across the sketch levels to avoid O(weight) + # work for large aggregates. + xv = float(x) if math.isnan(xv) or math.isinf(xv): raise ValueError("x must be finite") - self._levels[0].append(xv) - self._n += 1 - if self._capacity_exceeded(): - self._compress_until_ok() + + wv = float(weight) + if math.isnan(wv) or math.isinf(wv): + raise ValueError("weight must be finite") + if wv <= 0.0: + raise ValueError("weight must be > 0") + + rounded = int(round(wv)) + if abs(wv - rounded) > 1e-9: + raise ValueError("weight must be an integer") + if rounded <= 0: + raise ValueError("weight must be > 0") + + self._ingest_weighted_value(xv, rounded) def extend(self, xs: Iterable[float]) -> None: for x in xs: - xv = float(x) - if math.isnan(xv) or math.isinf(xv): - raise ValueError("values must be finite") - # ``self._levels[0]`` can be replaced during compaction, so we must - # append directly to the current buffer each iteration instead of - # keeping a stale reference (which would silently drop values). - self._levels[0].append(xv) - self._n += 1 - if self._capacity_exceeded(): - self._compress_until_ok() + self.add(x) def size(self) -> int: return self._n @@ -326,6 +335,39 @@ def _materialize_aligned(self) -> Tuple[List[float], List[float]]: heapq.heappush(heap, (arr[i], j, i, w)) return out_v2, out_w2 + def quantiles(self, m: int) -> List[float]: + """Return evenly spaced quantile cut points. + + ``m`` corresponds to the number of equal-mass buckets. For ``m > 1`` the + return value contains ``m-1`` interior cut points. ``m == 1`` yields the + median for convenience. + """ + + if m <= 0: + raise ValueError("m must be positive") + if self._n == 0: + raise ValueError("empty sketch") + if m == 1: + return [self.quantile(0.5)] + step = 1.0 / m + return [self.quantile(step * i) for i in range(1, m)] + + # ---------------------- weighted ingestion internals ---------------------- + def _ingest_weighted_value(self, value: float, weight: int) -> None: + """Fold ``weight`` copies of ``value`` into the level buffers.""" + + remaining = weight + level = 0 + while remaining: + if remaining & 1: + self._ensure_levels(level + 1) + self._levels[level].append(value) + remaining >>= 1 + level += 1 + self._n += weight + if self._capacity_exceeded(): + self._compress_until_ok() + # ----------------------------- quick self-test -------------------------------- if __name__ == "__main__": diff --git a/kll_sketch/tests/test_kll.py b/kll_sketch/tests/test_kll.py index f22cfdd..30796cf 100644 --- a/kll_sketch/tests/test_kll.py +++ b/kll_sketch/tests/test_kll.py @@ -130,6 +130,12 @@ def test_invalid_inputs_raise() -> None: sketch.add(float("nan")) with pytest.raises(ValueError): sketch.add(float("inf")) + with pytest.raises(ValueError): + sketch.add(1.0, weight=0) + with pytest.raises(ValueError): + sketch.add(1.0, weight=-2) + with pytest.raises(ValueError): + sketch.add(1.0, weight=1.5) with pytest.raises(ValueError): sketch.quantile(-0.01) with pytest.raises(ValueError): @@ -167,3 +173,36 @@ def test_rank_brackets_quantile() -> None: target_rank = q * (len(xs) - 1) assert lower <= target_rank + 200 assert upper >= target_rank - 200 + + +def test_weighted_ingestion_matches_repetition() -> None: + weighted = KLL(capacity=128) + weighted.add(-1.0, weight=3) + weighted.add(2.5, weight=5) + weighted.add(10.0, weight=2) + + expanded = KLL(capacity=128) + expanded.extend([-1.0] * 3) + expanded.extend([2.5] * 5) + expanded.extend([10.0] * 2) + + assert weighted.size() == expanded.size() == 10 + for q in [0.1, 0.5, 0.9]: + assert weighted.quantile(q) == pytest.approx(expanded.quantile(q), abs=1e-9) + + +def test_quantiles_helper_even_spacing() -> None: + sketch = KLL(capacity=128) + sketch.extend(range(100)) + + quartiles = sketch.quantiles(4) + assert len(quartiles) == 3 + for lhs, rhs in zip(quartiles, quartiles[1:]): + assert lhs <= rhs + # Check against expected percentile anchors from the raw data. + truth = [float(i) for i in (25, 50, 75)] + for estimate, reference in zip(quartiles, truth): + assert estimate == pytest.approx(reference, abs=5.0) + + median_only = sketch.quantiles(1) + assert median_only == pytest.approx([sketch.median()])