Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ Fast, mergeable **KLL** sketch for streaming quantiles — deterministic, zero d
- **Weighted ingestion** via `add(x, weight)` for aggregated data
- **Mergeable** sketches for distributed/parallel ingestion
- **Serializable** (`to_bytes` / `from_bytes`)
- **Convenience helpers** such as `quantiles(m)` for evenly spaced cuts
- **Convenience helpers** such as `quantiles(m)` and `quantiles_at(qs)` for
evenly spaced or ad-hoc cuts
- **Zero dependencies**, Python 3.9+

---
Expand Down Expand Up @@ -60,6 +61,7 @@ assert abs(a2.quantile(0.5) - a.quantile(0.5)) < 1e-12
| `size()` | Total number of ingested items `n`. |
| `quantile(q)` | Approximate `q`-quantile for `q∈[0,1]`. |
| `quantiles(m)` | Evenly spaced cut points. |
| `quantiles_at(qs)` | Batched quantiles for arbitrary `qs`. |
| `median()` | Convenience for `quantile(0.5)`. |
| `rank(x)` | Approximate rank of `x` in `[0, n]`. |
| `cdf(xs)` | CDF values for a sequence `xs`. |
Expand All @@ -81,6 +83,7 @@ This implementation follows **Karnin–Lang–Liberty (2016)**: a space-optimal
* Typical error ≈ **O(1/k)** in rank space (increase `capacity` to tighten ε).
* Updates amortized **O(1)** with occasional compactions.
* Queries merge level buffers (**k-way**) and scan weights to the target rank.
Use `quantiles_at` to answer multiple quantiles with a single scan.

> Tip: For heavy query loads, cache materialized arrays between queries.

Expand Down
60 changes: 46 additions & 14 deletions kll_sketch/kll_sketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import struct
import random
import heapq
from bisect import bisect_left
from typing import Iterable, List, Tuple, Optional


Expand All @@ -32,8 +33,8 @@ class KLL:
This guarantees total weight conservation: Σ(weights) == n.

Public API:
add(x, weight=1), extend(xs), quantile(q), quantiles(m), median(), rank(x),
cdf(xs), merge(other), to_bytes(), from_bytes()
add(x, weight=1), extend(xs), quantile(q), quantiles(m), quantiles_at(qs),
median(), rank(x), cdf(xs), merge(other), to_bytes(), from_bytes()
"""

# ---------------------------- Tunable constants ----------------------------
Expand Down Expand Up @@ -100,17 +101,22 @@ def median(self) -> float:
def quantile(self, q: float) -> float:
if not (0.0 <= q <= 1.0):
raise ValueError("q must be in [0,1]")
if self._n == 0:
raise ValueError("empty sketch")
vals, wts = self._materialize_aligned()
# invariant: sum(wts) == n
target = q * (self._n - 1) # rank target in [0, n-1]
cum = 0.0
for v, w in zip(vals, wts):
cum += w
if cum >= target - 1e-12:
return v
return vals[-1]
return self._batched_quantiles([q])[0]

def quantiles_at(self, probabilities: Iterable[float]) -> List[float]:
"""Return the approximate quantiles for each entry in ``probabilities``.

This method evaluates all requested quantiles using a single materialized
pass through the sketch, which is significantly faster than issuing
repeated :meth:`quantile` calls for large query batches.
"""

qs = [float(q) for q in probabilities]
if any(not (0.0 <= q <= 1.0) for q in qs):
raise ValueError("all probabilities must be in [0,1]")
if not qs:
return []
return self._batched_quantiles(qs)

def rank(self, x: float) -> float:
"""Approximate rank in [0, n]."""
Expand Down Expand Up @@ -350,7 +356,8 @@ def quantiles(self, m: int) -> List[float]:
if m == 1:
return [self.quantile(0.5)]
step = 1.0 / m
return [self.quantile(step * i) for i in range(1, m)]
qs = [step * i for i in range(1, m)]
return self.quantiles_at(qs)

# ---------------------- weighted ingestion internals ----------------------
def _ingest_weighted_value(self, value: float, weight: int) -> None:
Expand All @@ -368,6 +375,31 @@ def _ingest_weighted_value(self, value: float, weight: int) -> None:
if self._capacity_exceeded():
self._compress_until_ok()

def _batched_quantiles(self, qs: List[float]) -> List[float]:
if self._n == 0:
raise ValueError("empty sketch")
vals, wts = self._materialize_aligned()
if not vals:
raise ValueError("empty sketch")

prefix: List[float] = []
total = 0.0
for w in wts:
total += w
prefix.append(total)

ordered = sorted(enumerate(qs), key=lambda item: item[1])
out = [0.0] * len(qs)
search_lo = 0
for idx, q in ordered:
target = q * (self._n - 1)
pos = bisect_left(prefix, target - 1e-12, lo=search_lo)
if pos >= len(vals):
pos = len(vals) - 1
out[idx] = vals[pos]
search_lo = pos
return out


# ----------------------------- quick self-test --------------------------------
if __name__ == "__main__":
Expand Down
24 changes: 24 additions & 0 deletions kll_sketch/tests/test_kll.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,27 @@ def test_quantiles_helper_even_spacing() -> None:

median_only = sketch.quantiles(1)
assert median_only == pytest.approx([sketch.median()])


def test_quantiles_at_matches_repeated_calls() -> None:
rng = random.Random(123)
xs = [rng.gauss(0.0, 2.0) for _ in range(5000)]

sketch = KLL(capacity=200)
sketch.extend(xs)

qs = [0.05, 0.2, 0.33, 0.5, 0.75, 0.95]
batched = sketch.quantiles_at(qs)
repeated = [sketch.quantile(q) for q in qs]
assert batched == pytest.approx(repeated, abs=1e-12)


def test_quantiles_at_accepts_unsorted_probabilities() -> None:
sketch = KLL(capacity=128)
sketch.extend(range(1000))

qs = [0.9, 0.1, 0.5]
values = sketch.quantiles_at(qs)
assert values[0] == pytest.approx(sketch.quantile(0.9))
assert values[1] == pytest.approx(sketch.quantile(0.1))
assert values[2] == pytest.approx(sketch.quantile(0.5))