1212import struct
1313import random
1414import heapq
15+ from bisect import bisect_left
1516from typing import Iterable , List , Tuple , Optional
1617
1718
@@ -32,8 +33,8 @@ class KLL:
3233 This guarantees total weight conservation: Σ(weights) == n.
3334
3435 Public API:
35- add(x, weight=1), extend(xs), quantile(q), quantiles(m), median(), rank(x ),
36- cdf(xs), merge(other), to_bytes(), from_bytes()
36+ add(x, weight=1), extend(xs), quantile(q), quantiles(m), quantiles_at(qs ),
37+ median(), rank(x), cdf(xs), merge(other), to_bytes(), from_bytes()
3738 """
3839
3940 # ---------------------------- Tunable constants ----------------------------
@@ -100,17 +101,22 @@ def median(self) -> float:
100101 def quantile (self , q : float ) -> float :
101102 if not (0.0 <= q <= 1.0 ):
102103 raise ValueError ("q must be in [0,1]" )
103- if self ._n == 0 :
104- raise ValueError ("empty sketch" )
105- vals , wts = self ._materialize_aligned ()
106- # invariant: sum(wts) == n
107- target = q * (self ._n - 1 ) # rank target in [0, n-1]
108- cum = 0.0
109- for v , w in zip (vals , wts ):
110- cum += w
111- if cum >= target - 1e-12 :
112- return v
113- return vals [- 1 ]
104+ return self ._batched_quantiles ([q ])[0 ]
105+
106+ def quantiles_at (self , probabilities : Iterable [float ]) -> List [float ]:
107+ """Return the approximate quantiles for each entry in ``probabilities``.
108+
109+ This method evaluates all requested quantiles using a single materialized
110+ pass through the sketch, which is significantly faster than issuing
111+ repeated :meth:`quantile` calls for large query batches.
112+ """
113+
114+ qs = [float (q ) for q in probabilities ]
115+ if any (not (0.0 <= q <= 1.0 ) for q in qs ):
116+ raise ValueError ("all probabilities must be in [0,1]" )
117+ if not qs :
118+ return []
119+ return self ._batched_quantiles (qs )
114120
115121 def rank (self , x : float ) -> float :
116122 """Approximate rank in [0, n]."""
@@ -350,7 +356,8 @@ def quantiles(self, m: int) -> List[float]:
350356 if m == 1 :
351357 return [self .quantile (0.5 )]
352358 step = 1.0 / m
353- return [self .quantile (step * i ) for i in range (1 , m )]
359+ qs = [step * i for i in range (1 , m )]
360+ return self .quantiles_at (qs )
354361
355362 # ---------------------- weighted ingestion internals ----------------------
356363 def _ingest_weighted_value (self , value : float , weight : int ) -> None :
@@ -368,6 +375,31 @@ def _ingest_weighted_value(self, value: float, weight: int) -> None:
368375 if self ._capacity_exceeded ():
369376 self ._compress_until_ok ()
370377
378+ def _batched_quantiles (self , qs : List [float ]) -> List [float ]:
379+ if self ._n == 0 :
380+ raise ValueError ("empty sketch" )
381+ vals , wts = self ._materialize_aligned ()
382+ if not vals :
383+ raise ValueError ("empty sketch" )
384+
385+ prefix : List [float ] = []
386+ total = 0.0
387+ for w in wts :
388+ total += w
389+ prefix .append (total )
390+
391+ ordered = sorted (enumerate (qs ), key = lambda item : item [1 ])
392+ out = [0.0 ] * len (qs )
393+ search_lo = 0
394+ for idx , q in ordered :
395+ target = q * (self ._n - 1 )
396+ pos = bisect_left (prefix , target - 1e-12 , lo = search_lo )
397+ if pos >= len (vals ):
398+ pos = len (vals ) - 1
399+ out [idx ] = vals [pos ]
400+ search_lo = pos
401+ return out
402+
371403
372404# ----------------------------- quick self-test --------------------------------
373405if __name__ == "__main__" :
0 commit comments