Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ Fast, mergeable **KLL** sketch for streaming quantiles — deterministic, zero d
## ✨ Features
- **Accurate ε-quantiles** under tight memory bounds (KLL algorithm)
- **Deterministic compaction** (parity sampling) + **weight conservation**
- **Weighted ingestion** via `add(x, weight)` for aggregated data
- **Mergeable** sketches for distributed/parallel ingestion
- **Serializable** (`to_bytes` / `from_bytes`)
- **Convenience helpers** such as `quantiles(m)` for evenly spaced cuts
- **Zero dependencies**, Python 3.9+

---
Expand All @@ -31,7 +33,8 @@ sk.extend([1, 5, 2, 9, 3, 6, 4, 8, 7])
print("n =", sk.size())
print("median ≈", sk.median())
print("q(0.9) ≈", sk.quantile(0.9))
````
print("quartiles ≈", sk.quantiles(4))
```

### Merge & Serialize

Expand All @@ -52,10 +55,11 @@ assert abs(a2.quantile(0.5) - a.quantile(0.5)) < 1e-12

| Method | Description |
| ----------------------------- | --------------------------------------- |
| `add(x)` | Ingest one value. |
| `add(x, weight=1)` | Ingest one value with optional weight. |
| `extend(xs)` | Ingest an iterable of values. |
| `size()` | Total number of ingested items `n`. |
| `quantile(q)` | Approximate `q`-quantile for `q∈[0,1]`. |
| `quantiles(m)` | Evenly spaced cut points. |
| `median()` | Convenience for `quantile(0.5)`. |
| `rank(x)` | Approximate rank of `x` in `[0, n]`. |
| `cdf(xs)` | CDF values for a sequence `xs`. |
Expand Down Expand Up @@ -92,8 +96,6 @@ python -m pytest -q

## 🗺️ Roadmap

* Weighted ingestion (`add(x, w)`).
* `quantiles(m)` helper (evenly spaced cut points).
* Optional NumPy/C hot paths for sort/merge.

---
Expand Down
78 changes: 60 additions & 18 deletions kll_sketch/kll_sketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

class KLL:
"""
KLL streaming quantile sketch (unweighted, mergeable, serializable).
KLL streaming quantile sketch (supports weighted ingestion, mergeable, serializable).

Paper:
- Karnin, Zohar, Edo Liberty, and Liran Lang. "Optimal quantile approximation
Expand All @@ -32,8 +32,8 @@ class KLL:
This guarantees total weight conservation: Σ(weights) == n.

Public API:
add(x), extend(xs), quantile(q), median(), rank(x), cdf(xs),
merge(other), to_bytes(), from_bytes()
add(x, weight=1), extend(xs), quantile(q), quantiles(m), median(), rank(x),
cdf(xs), merge(other), to_bytes(), from_bytes()
"""

# ---------------------------- Tunable constants ----------------------------
Expand All @@ -60,27 +60,36 @@ def __init__(self, capacity: int = 200, rng_seed: int = _DEFAULT_SEED):
self._rng_seed = int(rng_seed)

# ------------------------------- Public API --------------------------------
def add(self, x: float) -> None:
def add(self, x: float, weight: float = 1.0) -> None:
"""Ingest a value with an optional (integer) weight."""

# ``weight`` may be provided as an ``int`` or any float that rounds to an
# integer (for compatibility with NumPy scalars). Each unit of weight is
# equivalent to inserting ``x`` once; internally we fold the binary
# decomposition of ``weight`` across the sketch levels to avoid O(weight)
# work for large aggregates.

xv = float(x)
if math.isnan(xv) or math.isinf(xv):
raise ValueError("x must be finite")
self._levels[0].append(xv)
self._n += 1
if self._capacity_exceeded():
self._compress_until_ok()

wv = float(weight)
if math.isnan(wv) or math.isinf(wv):
raise ValueError("weight must be finite")
if wv <= 0.0:
raise ValueError("weight must be > 0")

rounded = int(round(wv))
if abs(wv - rounded) > 1e-9:
raise ValueError("weight must be an integer")
if rounded <= 0:
raise ValueError("weight must be > 0")

self._ingest_weighted_value(xv, rounded)

def extend(self, xs: Iterable[float]) -> None:
for x in xs:
xv = float(x)
if math.isnan(xv) or math.isinf(xv):
raise ValueError("values must be finite")
# ``self._levels[0]`` can be replaced during compaction, so we must
# append directly to the current buffer each iteration instead of
# keeping a stale reference (which would silently drop values).
self._levels[0].append(xv)
self._n += 1
if self._capacity_exceeded():
self._compress_until_ok()
self.add(x)

def size(self) -> int:
return self._n
Expand Down Expand Up @@ -326,6 +335,39 @@ def _materialize_aligned(self) -> Tuple[List[float], List[float]]:
heapq.heappush(heap, (arr[i], j, i, w))
return out_v2, out_w2

def quantiles(self, m: int) -> List[float]:
"""Return evenly spaced quantile cut points.

``m`` corresponds to the number of equal-mass buckets. For ``m > 1`` the
return value contains ``m-1`` interior cut points. ``m == 1`` yields the
median for convenience.
"""

if m <= 0:
raise ValueError("m must be positive")
if self._n == 0:
raise ValueError("empty sketch")
if m == 1:
return [self.quantile(0.5)]
step = 1.0 / m
return [self.quantile(step * i) for i in range(1, m)]

# ---------------------- weighted ingestion internals ----------------------
def _ingest_weighted_value(self, value: float, weight: int) -> None:
"""Fold ``weight`` copies of ``value`` into the level buffers."""

remaining = weight
level = 0
while remaining:
if remaining & 1:
self._ensure_levels(level + 1)
self._levels[level].append(value)
remaining >>= 1
level += 1
self._n += weight
if self._capacity_exceeded():
self._compress_until_ok()


# ----------------------------- quick self-test --------------------------------
if __name__ == "__main__":
Expand Down
39 changes: 39 additions & 0 deletions kll_sketch/tests/test_kll.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ def test_invalid_inputs_raise() -> None:
sketch.add(float("nan"))
with pytest.raises(ValueError):
sketch.add(float("inf"))
with pytest.raises(ValueError):
sketch.add(1.0, weight=0)
with pytest.raises(ValueError):
sketch.add(1.0, weight=-2)
with pytest.raises(ValueError):
sketch.add(1.0, weight=1.5)
with pytest.raises(ValueError):
sketch.quantile(-0.01)
with pytest.raises(ValueError):
Expand Down Expand Up @@ -167,3 +173,36 @@ def test_rank_brackets_quantile() -> None:
target_rank = q * (len(xs) - 1)
assert lower <= target_rank + 200
assert upper >= target_rank - 200


def test_weighted_ingestion_matches_repetition() -> None:
weighted = KLL(capacity=128)
weighted.add(-1.0, weight=3)
weighted.add(2.5, weight=5)
weighted.add(10.0, weight=2)

expanded = KLL(capacity=128)
expanded.extend([-1.0] * 3)
expanded.extend([2.5] * 5)
expanded.extend([10.0] * 2)

assert weighted.size() == expanded.size() == 10
for q in [0.1, 0.5, 0.9]:
assert weighted.quantile(q) == pytest.approx(expanded.quantile(q), abs=1e-9)


def test_quantiles_helper_even_spacing() -> None:
sketch = KLL(capacity=128)
sketch.extend(range(100))

quartiles = sketch.quantiles(4)
assert len(quartiles) == 3
for lhs, rhs in zip(quartiles, quartiles[1:]):
assert lhs <= rhs
# Check against expected percentile anchors from the raw data.
truth = [float(i) for i in (25, 50, 75)]
for estimate, reference in zip(quartiles, truth):
assert estimate == pytest.approx(reference, abs=5.0)

median_only = sketch.quantiles(1)
assert median_only == pytest.approx([sketch.median()])