Skip to content

Commit 2da939a

Browse files
committed
update
1 parent fa46926 commit 2da939a

File tree

3 files changed

+490
-0
lines changed

3 files changed

+490
-0
lines changed
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
# Streaming Median (Two-Heaps)
2+
3+
A scalable data structure to maintain the median of a stream of integers with:
4+
5+
- `add(x)`: O(log n)
6+
- `median()`: O(1)
7+
- Handles up to ~10 million inserts
8+
- Memory: O(n)
9+
10+
## Table of Contents
11+
12+
- [Streaming Median (Two-Heaps)](#streaming-median-two-heaps)
13+
- [Table of Contents](#table-of-contents)
14+
- [Concept](#concept)
15+
- [Invariants](#invariants)
16+
- [Median Logic](#median-logic)
17+
- [Insert Algorithm (`add(x)`)](#insert-algorithm-addx)
18+
- [Operations](#operations)
19+
- [Complexity](#complexity)
20+
- [Thread Safety](#thread-safety)
21+
- [Python Implementation](#python-implementation)
22+
- [Go Implementation](#go-implementation)
23+
- [Notes for Large Streams](#notes-for-large-streams)
24+
- [Assumptions](#assumptions)
25+
26+
## Concept
27+
28+
Use two heaps:
29+
30+
- `low`: max-heap for the lower half of numbers
31+
- `high`: min-heap for the upper half
32+
33+
### Invariants
34+
35+
- Size balance: `len(low) == len(high)` or `len(low) == len(high) + 1`
36+
- Order property: `max(low) ≤ min(high)`
37+
38+
### Median Logic
39+
40+
- If sizes equal: `median = (top(low) + top(high)) / 2`
41+
- Else: `median = top(low)`
42+
43+
### Insert Algorithm (`add(x)`)
44+
45+
1. If `low` is empty or `x ≤ top(low)`, push to `low`; otherwise push to `high`.
46+
2. Rebalance:
47+
- If `len(low) > len(high) + 1`: move `top(low)``high`
48+
- If `len(high) > len(low)`: move `top(high)``low`
49+
50+
## Operations
51+
52+
- `add(x)`: O(log n) due to heap push/pop
53+
- `median()`: O(1) by reading heap tops
54+
55+
## Complexity
56+
57+
- Time: `add(x)` O(log n), `median()` O(1)
58+
- Space: O(n), store all elements across both heaps
59+
60+
## Thread Safety
61+
62+
- Recommended: single RW lock protecting both heaps
63+
- `add(x)`: acquire write lock (exclusive)
64+
- `median()`: acquire read lock (shared)
65+
- Avoid partial locking—both heaps must be mutated atomically.
66+
- For even-size average, use widened arithmetic (e.g., float64) to avoid integer overflow.
67+
68+
## Python Implementation
69+
70+
```python
71+
# median_stream.py
72+
import heapq
73+
import threading
74+
from typing import Optional, Union
75+
76+
Number = Union[int, float]
77+
78+
class MedianStream:
79+
def __init__(self) -> None:
80+
self.low = [] # max-heap via negatives
81+
self.high = [] # min-heap
82+
self._lock = threading.Lock()
83+
84+
def add(self, x: Number) -> None:
85+
with self._lock:
86+
if not self.low:
87+
heapq.heappush(self.low, -float(x))
88+
else:
89+
low_top = -self.low[0]
90+
if x <= low_top:
91+
heapq.heappush(self.low, -float(x))
92+
else:
93+
heapq.heappush(self.high, float(x))
94+
95+
# Rebalance
96+
if len(self.low) > len(self.high) + 1:
97+
moved = -heapq.heappop(self.low)
98+
heapq.heappush(self.high, moved)
99+
elif len(self.high) > len(self.low):
100+
moved = heapq.heappop(self.high)
101+
heapq.heappush(self.low, -moved)
102+
103+
def median(self) -> Optional[float]:
104+
with self._lock:
105+
total = len(self.low) + len(self.high)
106+
if total == 0:
107+
return None
108+
109+
if len(self.low) > len(self.high):
110+
return -self.low[0]
111+
else:
112+
low_top = -self.low[0]
113+
high_top = self.high[0]
114+
return (low_top + high_top) / 2.0
115+
116+
if __name__ == "__main__":
117+
ms = MedianStream()
118+
data = [5, 15, 1, 3]
119+
expected = [5.0, 10.0, 5.0, 4.0]
120+
out = []
121+
for x in data:
122+
ms.add(x)
123+
out.append(ms.median())
124+
print("Data:", data)
125+
print("Medians:", out)
126+
print("Expected:", expected)
127+
assert out == expected
128+
129+
ms2 = MedianStream()
130+
for x in range(1, 11):
131+
ms2.add(x)
132+
assert ms2.median() == 5.5
133+
134+
ms3 = MedianStream()
135+
for x in range(10, 0, -1):
136+
ms3.add(x)
137+
assert ms3.median() == 5.5
138+
139+
ms4 = MedianStream()
140+
for v in [2, 2, 2, 2, 2]:
141+
ms4.add(v)
142+
assert ms4.median() == 2.0
143+
144+
print("All tests passed.")
145+
```
146+
147+
## Go Implementation
148+
149+
```go
150+
// median_stream.go
151+
package main
152+
153+
import (
154+
"container/heap"
155+
"fmt"
156+
"math"
157+
"sync"
158+
)
159+
160+
type FloatMinHeap []float64
161+
func (h FloatMinHeap) Len() int { return len(h) }
162+
func (h FloatMinHeap) Less(i, j int) bool { return h[i] < h[j] }
163+
func (h FloatMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
164+
func (h *FloatMinHeap) Push(x interface{}) { *h = append(*h, x.(float64)) }
165+
func (h *FloatMinHeap) Pop() interface{} {
166+
old := *h; n := len(old); x := old[n-1]; *h = old[:n-1]; return x
167+
}
168+
func (h FloatMinHeap) Peek() float64 { return h[0] }
169+
170+
type FloatMaxHeap []float64
171+
func (h FloatMaxHeap) Len() int { return len(h) }
172+
func (h FloatMaxHeap) Less(i, j int) bool { return h[i] > h[j] } // max-heap
173+
func (h FloatMaxHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
174+
func (h *FloatMaxHeap) Push(x interface{}) { *h = append(*h, x.(float64)) }
175+
func (h *FloatMaxHeap) Pop() interface{} {
176+
old := *h; n := len(old); x := old[n-1]; *h = old[:n-1]; return x
177+
}
178+
func (h FloatMaxHeap) Peek() float64 { return h[0] }
179+
180+
type MedianStream struct {
181+
low FloatMaxHeap
182+
high FloatMinHeap
183+
mu sync.RWMutex
184+
}
185+
186+
func NewMedianStream() *MedianStream {
187+
ms := &MedianStream{low: make(FloatMaxHeap, 0), high: make(FloatMinHeap, 0)}
188+
heap.Init(&ms.low); heap.Init(&ms.high)
189+
return ms
190+
}
191+
192+
func (ms *MedianStream) Add(x float64) {
193+
ms.mu.Lock()
194+
defer ms.mu.Unlock()
195+
196+
if ms.low.Len() == 0 {
197+
heap.Push(&ms.low, x)
198+
} else if x <= ms.low.Peek() {
199+
heap.Push(&ms.low, x)
200+
} else {
201+
heap.Push(&ms.high, x)
202+
}
203+
204+
if ms.low.Len() > ms.high.Len()+1 {
205+
moved := heap.Pop(&ms.low).(float64)
206+
heap.Push(&ms.high, moved)
207+
} else if ms.high.Len() > ms.low.Len() {
208+
moved := heap.Pop(&ms.high).(float64)
209+
heap.Push(&ms.low, moved)
210+
}
211+
}
212+
213+
func (ms *MedianStream) Median() (float64, bool) {
214+
ms.mu.RLock()
215+
defer ms.mu.RUnlock()
216+
217+
total := ms.low.Len() + ms.high.Len()
218+
if total == 0 { return 0, false }
219+
220+
if ms.low.Len() > ms.high.Len() {
221+
return ms.low.Peek(), true
222+
}
223+
lowTop := ms.low.Peek()
224+
highTop := ms.high.Peek()
225+
return (lowTop + highTop) / 2.0, true
226+
}
227+
228+
func main() {
229+
ms := NewMedianStream()
230+
data := []float64{5, 15, 1, 3}
231+
expected := []float64{5, 10, 5, 4}
232+
out := make([]float64, 0, len(data))
233+
234+
for _, x := range data {
235+
ms.Add(x)
236+
m, ok := ms.Median()
237+
if !ok { panic("median not available") }
238+
out = append(out, m)
239+
}
240+
fmt.Println("Data: ", data)
241+
fmt.Println("Medians: ", out)
242+
fmt.Println("Expected:", expected)
243+
244+
eq := func(a, b float64) bool { return math.Abs(a-b) < 1e-9 }
245+
for i := range out {
246+
if !eq(out[i], expected[i]) {
247+
panic(fmt.Sprintf("median mismatch at %d: got %v want %v", i, out[i], expected[i]))
248+
}
249+
}
250+
251+
ms2 := NewMedianStream()
252+
for i := 1.0; i <= 10.0; i++ { ms2.Add(i) }
253+
m2, _ := ms2.Median(); if !eq(m2, 5.5) { panic("median incorrect for 1..10") }
254+
255+
ms3 := NewMedianStream()
256+
for i := 10.0; i >= 1.0; i-- { ms3.Add(i) }
257+
m3, _ := ms3.Median(); if !eq(m3, 5.5) { panic("median incorrect for 10..1") }
258+
259+
ms4 := NewMedianStream()
260+
for i := 0; i < 5; i++ { ms4.Add(2.0) }
261+
m4, _ := ms4.Median(); if !eq(m4, 2.0) { panic("median incorrect for duplicates") }
262+
263+
fmt.Println("All tests passed.")
264+
}
265+
```
266+
267+
## Notes for Large Streams
268+
269+
- Memory: ~160 MB for 10M `float64` total across two heaps (plus overhead). Use `int64` for integers.
270+
- For exact rational median on even counts (no floating rounding), return the two middles and compute externally.
271+
272+
## Assumptions
273+
274+
- Inputs fit in 64-bit numeric types.
275+
- Even-count median returns a floating average. Adjust if your API requires integer or rational results.

0 commit comments

Comments
 (0)