diff --git a/README.md b/README.md index 25dcd17..954de5e 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,8 @@ Fast, mergeable **KLL** sketch for streaming quantiles — deterministic, zero d - **Weighted ingestion** via `add(x, weight)` for aggregated data - **Mergeable** sketches for distributed/parallel ingestion - **Serializable** (`to_bytes` / `from_bytes`) -- **Convenience helpers** such as `quantiles(m)` for evenly spaced cuts +- **Convenience helpers** such as `quantiles(m)` and `quantiles_at(qs)` for + evenly spaced or ad-hoc cuts - **Zero dependencies**, Python 3.9+ --- @@ -60,6 +61,7 @@ assert abs(a2.quantile(0.5) - a.quantile(0.5)) < 1e-12 | `size()` | Total number of ingested items `n`. | | `quantile(q)` | Approximate `q`-quantile for `q∈[0,1]`. | | `quantiles(m)` | Evenly spaced cut points. | +| `quantiles_at(qs)` | Batched quantiles for arbitrary `qs`. | | `median()` | Convenience for `quantile(0.5)`. | | `rank(x)` | Approximate rank of `x` in `[0, n]`. | | `cdf(xs)` | CDF values for a sequence `xs`. | @@ -81,6 +83,7 @@ This implementation follows **Karnin–Lang–Liberty (2016)**: a space-optimal * Typical error ≈ **O(1/k)** in rank space (increase `capacity` to tighten ε). * Updates amortized **O(1)** with occasional compactions. * Queries merge level buffers (**k-way**) and scan weights to the target rank. + Use `quantiles_at` to answer multiple quantiles with a single scan. > Tip: For heavy query loads, cache materialized arrays between queries. diff --git a/kll_sketch/kll_sketch.py b/kll_sketch/kll_sketch.py index 7e2e509..228a0ad 100644 --- a/kll_sketch/kll_sketch.py +++ b/kll_sketch/kll_sketch.py @@ -12,6 +12,7 @@ import struct import random import heapq +from bisect import bisect_left from typing import Iterable, List, Tuple, Optional @@ -32,8 +33,8 @@ class KLL: This guarantees total weight conservation: Σ(weights) == n. Public API: - add(x, weight=1), extend(xs), quantile(q), quantiles(m), median(), rank(x), - cdf(xs), merge(other), to_bytes(), from_bytes() + add(x, weight=1), extend(xs), quantile(q), quantiles(m), quantiles_at(qs), + median(), rank(x), cdf(xs), merge(other), to_bytes(), from_bytes() """ # ---------------------------- Tunable constants ---------------------------- @@ -100,17 +101,22 @@ def median(self) -> float: def quantile(self, q: float) -> float: if not (0.0 <= q <= 1.0): raise ValueError("q must be in [0,1]") - if self._n == 0: - raise ValueError("empty sketch") - vals, wts = self._materialize_aligned() - # invariant: sum(wts) == n - target = q * (self._n - 1) # rank target in [0, n-1] - cum = 0.0 - for v, w in zip(vals, wts): - cum += w - if cum >= target - 1e-12: - return v - return vals[-1] + return self._batched_quantiles([q])[0] + + def quantiles_at(self, probabilities: Iterable[float]) -> List[float]: + """Return the approximate quantiles for each entry in ``probabilities``. + + This method evaluates all requested quantiles using a single materialized + pass through the sketch, which is significantly faster than issuing + repeated :meth:`quantile` calls for large query batches. + """ + + qs = [float(q) for q in probabilities] + if any(not (0.0 <= q <= 1.0) for q in qs): + raise ValueError("all probabilities must be in [0,1]") + if not qs: + return [] + return self._batched_quantiles(qs) def rank(self, x: float) -> float: """Approximate rank in [0, n].""" @@ -350,7 +356,8 @@ def quantiles(self, m: int) -> List[float]: if m == 1: return [self.quantile(0.5)] step = 1.0 / m - return [self.quantile(step * i) for i in range(1, m)] + qs = [step * i for i in range(1, m)] + return self.quantiles_at(qs) # ---------------------- weighted ingestion internals ---------------------- def _ingest_weighted_value(self, value: float, weight: int) -> None: @@ -368,6 +375,31 @@ def _ingest_weighted_value(self, value: float, weight: int) -> None: if self._capacity_exceeded(): self._compress_until_ok() + def _batched_quantiles(self, qs: List[float]) -> List[float]: + if self._n == 0: + raise ValueError("empty sketch") + vals, wts = self._materialize_aligned() + if not vals: + raise ValueError("empty sketch") + + prefix: List[float] = [] + total = 0.0 + for w in wts: + total += w + prefix.append(total) + + ordered = sorted(enumerate(qs), key=lambda item: item[1]) + out = [0.0] * len(qs) + search_lo = 0 + for idx, q in ordered: + target = q * (self._n - 1) + pos = bisect_left(prefix, target - 1e-12, lo=search_lo) + if pos >= len(vals): + pos = len(vals) - 1 + out[idx] = vals[pos] + search_lo = pos + return out + # ----------------------------- quick self-test -------------------------------- if __name__ == "__main__": diff --git a/kll_sketch/tests/test_kll.py b/kll_sketch/tests/test_kll.py index 30796cf..7a67f62 100644 --- a/kll_sketch/tests/test_kll.py +++ b/kll_sketch/tests/test_kll.py @@ -206,3 +206,27 @@ def test_quantiles_helper_even_spacing() -> None: median_only = sketch.quantiles(1) assert median_only == pytest.approx([sketch.median()]) + + +def test_quantiles_at_matches_repeated_calls() -> None: + rng = random.Random(123) + xs = [rng.gauss(0.0, 2.0) for _ in range(5000)] + + sketch = KLL(capacity=200) + sketch.extend(xs) + + qs = [0.05, 0.2, 0.33, 0.5, 0.75, 0.95] + batched = sketch.quantiles_at(qs) + repeated = [sketch.quantile(q) for q in qs] + assert batched == pytest.approx(repeated, abs=1e-12) + + +def test_quantiles_at_accepts_unsorted_probabilities() -> None: + sketch = KLL(capacity=128) + sketch.extend(range(1000)) + + qs = [0.9, 0.1, 0.5] + values = sketch.quantiles_at(qs) + assert values[0] == pytest.approx(sketch.quantile(0.9)) + assert values[1] == pytest.approx(sketch.quantile(0.1)) + assert values[2] == pytest.approx(sketch.quantile(0.5))