1- # kll_sketch.py — KLL Streaming Quantile Sketch (Pass #5: constants + style + docs)
1+ # KLL Streaming Quantile Sketch (Python)
2+ # Production-ready implementation with:
3+ # - Named constants (caps, loop guards)
4+ # - Deterministic RNG salting (seed + salt mixer)
5+ # - Weight-conserving compaction (both boundaries preserved)
6+ # - Aligned materialization (values + weights stay in sync)
7+ # - Merge + serialize/deserialize
28# Python 3.9+
39
410from __future__ import annotations
@@ -15,16 +21,15 @@ class KLL:
1521
1622 Paper:
1723 - Karnin, Zohar, Edo Liberty, and Liran Lang. "Optimal quantile approximation
18- in streams." 2016 IEEE 57th Annual Symposium on Foundations of Computer
19- Science (FOCS). IEEE, 2016.
24+ in streams." FOCS 2016.
2025
2126 Strategy (high level):
2227 - Maintain multiple levels of buffers; level ℓ items represent weight 2^ℓ.
2328 - When over capacity, compact a level by sampling one element from each
2429 disjoint pair (parity-controlled), promoting the sampled elements to the
2530 next level (thus doubling their weight).
26- - Boundary elements not participating in any pair are preserved at the
27- current level to conserve total weight (sum of weights == n) .
31+ - Boundary elements NOT in any pair are preserved at the current level.
32+ This guarantees total weight conservation: Σ( weights) == n.
2833
2934 Public API:
3035 add(x), extend(xs), quantile(q), median(), rank(x), cdf(xs),
@@ -34,13 +39,13 @@ class KLL:
3439 # ---------------------------- Tunable constants ----------------------------
3540 _MIN_CAPACITY : int = 40 # minimal allowed capacity for accuracy
3641 _SOFT_CAP_FACTOR : float = 1.15 # global soft overfill before compaction
37- _LEVEL_BASE_MIN : int = 8 # base per-level capacity (~k//8, min 8)
42+ _LEVEL_BASE_MIN : int = 8 # per-level base cap (~k//8, min 8)
3843 _STALL_BREAK_LOOPS : int = 16 # break if no progress after this many loops
3944 _MAX_COMPACT_LOOPS : int = 10_000 # absolute safety bound on compaction loops
4045 _DEFAULT_SEED : int = 0xA5B357 # deterministic default RNG seed
4146
4247 # 64-bit odd constant (golden ratio scaled) for hashing the RNG salt.
43- # This mixes the configured seed with evolving salt (level + n + buffer size)
48+ # We mix the configured seed with an evolving salt (level + n + buffer size)
4449 # to get stable but well-dispersed pseudo-randomness per compaction event.
4550 _SALT_MIX64 : int = 0x9E3779B185EBCA87
4651
@@ -87,6 +92,7 @@ def quantile(self, q: float) -> float:
8792 if self ._n == 0 :
8893 raise ValueError ("empty sketch" )
8994 vals , wts = self ._materialize_aligned ()
95+ # invariant: sum(wts) == n
9096 target = q * (self ._n - 1 ) # rank target in [0, n-1]
9197 cum = 0.0
9298 for v , w in zip (vals , wts ):
@@ -188,13 +194,13 @@ def _level_capacity(self, level: int) -> int:
188194
189195 def _rng (self , salt : int ) -> random .Random :
190196 # Deterministic per-event RNG using a 64-bit mix of seed and salt.
191- # The SALT combines level + current n + buffer length to vary across events.
197+ # SALT combines level + current n + buffer length to vary across events.
192198 mix = (self ._rng_seed * self ._SALT_MIX64 + (salt & 0xFFFFFFFFFFFF )) & 0xFFFFFFFFFFFFFFFF
193199 return random .Random (mix )
194200
195201 def _find_compactable_level (self ) -> Optional [int ]:
196202 """
197- Return a level index that can form at least one pair under the current parity .
203+ Return a level index that can form at least one pair.
198204 Preference: first level exceeding its cap (and len>=2), else the lowest
199205 level with len>=2; else None.
200206 """
@@ -229,7 +235,7 @@ def _compress_once(self) -> bool:
229235 return False
230236
231237 promoted : List [float ] = []
232- # True KLL: sample one from each disjoint pair; parity controls pairing.
238+ # True KLL: choose one from each adjacent pair (unbiased)
233239 for i in range (start , len (buf ) - 1 , 2 ):
234240 promoted .append (buf [i ] if rng .getrandbits (1 ) else buf [i + 1 ])
235241
@@ -295,7 +301,7 @@ def _materialize_aligned(self) -> Tuple[List[float], List[float]]:
295301 best , best_j = v , j
296302 if best_j < 0 :
297303 break
298- out_v .append (best ) # value
304+ out_v .append (best ) # value
299305 out_w .append (per_level [best_j ][1 ]) # weight aligned to source level
300306 idx [best_j ] += 1
301307 return out_v , out_w
@@ -334,5 +340,5 @@ def _materialize_aligned(self) -> Tuple[List[float], List[float]]:
334340 print (f"q={ q :>4} : est={ est :+.4f} truth={ truth :+.4f} |err|={ err :.4f} " )
335341
336342 # Weight conservation sanity check
337- vals , wts = sk ._materialize_aligned ()
338- assert abs (sum (wts ) - sk .size ()) < 1e-9 , "weight conservation violated"
343+ _ , wts = sk ._materialize_aligned ()
344+ assert abs (sum (wts ) - sk .size ()) < 1e-9 , "weight conservation violated"
0 commit comments