Skip to content

Commit 1bb0ce7

Browse files
authored
Add HeapSet._sorted internal flag (#6949)
1 parent a9e5790 commit 1bb0ce7

File tree

2 files changed

+105
-3
lines changed

2 files changed

+105
-3
lines changed

distributed/collections.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,20 @@ class HeapSet(MutableSet[T]):
3838
Values must be compatible with :mod:`weakref`.
3939
"""
4040

41-
__slots__ = ("key", "_data", "_heap", "_inc")
41+
__slots__ = ("key", "_data", "_heap", "_inc", "_sorted")
4242
key: Callable[[T], Any]
4343
_data: set[T]
4444
_inc: int
4545
_heap: list[tuple[Any, int, weakref.ref[T]]]
46+
_sorted: bool
4647

4748
def __init__(self, *, key: Callable[[T], Any]):
4849
# FIXME https://github.com/python/mypy/issues/708
4950
self.key = key # type: ignore
5051
self._data = set()
5152
self._inc = 0
5253
self._heap = []
54+
self._sorted = True
5355

5456
def __repr__(self) -> str:
5557
return f"<{type(self).__name__}: {len(self)} items>"
@@ -68,6 +70,7 @@ def _unpickle(
6870
self._inc = inc
6971
self._heap = [(k, i, weakref.ref(v)) for k, i, v in heap]
7072
heapq.heapify(self._heap)
73+
self._sorted = not heap
7174
return self
7275

7376
def __contains__(self, value: object) -> bool:
@@ -82,13 +85,14 @@ def add(self, value: T) -> None:
8285
k = self.key(value) # type: ignore
8386
vref = weakref.ref(value)
8487
heapq.heappush(self._heap, (k, self._inc, vref))
88+
self._sorted = False
8589
self._data.add(value)
8690
self._inc += 1
8791

8892
def discard(self, value: T) -> None:
8993
self._data.discard(value)
9094
if not self._data:
91-
self._heap.clear()
95+
self.clear()
9296

9397
def peek(self) -> T:
9498
"""Return the smallest element without removing it"""
@@ -99,6 +103,7 @@ def peek(self) -> T:
99103
if value in self._data:
100104
return value
101105
heapq.heappop(self._heap)
106+
self._sorted = False
102107

103108
def peekn(self, n: int) -> Iterator[T]:
104109
"Iterator over the N smallest elements. This is O(1) for n == 1, O(n*logn) otherwise."
@@ -119,9 +124,12 @@ def pop(self) -> T:
119124
raise KeyError("pop from an empty set")
120125
while True:
121126
_, _, vref = heapq.heappop(self._heap)
127+
self._sorted = False
122128
value = vref()
123129
if value in self._data:
124130
self._data.discard(value)
131+
if not self._data:
132+
self.clear()
125133
return value
126134

127135
def peekright(self) -> T:
@@ -147,6 +155,8 @@ def popright(self) -> T:
147155
value = vref()
148156
if value in self._data:
149157
self._data.discard(value)
158+
if not self._data:
159+
self.clear()
150160
return value
151161

152162
def __iter__(self) -> Iterator[T]:
@@ -160,11 +170,15 @@ def sorted(self) -> Iterator[T]:
160170
elements in order, from smallest to largest according to the key and insertion
161171
order.
162172
"""
163-
for _, _, vref in sorted(self._heap):
173+
if not self._sorted:
174+
self._heap.sort() # A sorted list maintains the heap invariant
175+
self._sorted = True
176+
for _, _, vref in self._heap:
164177
value = vref()
165178
if value in self._data:
166179
yield value
167180

168181
def clear(self) -> None:
169182
self._data.clear()
170183
self._heap.clear()
184+
self._sorted = True

distributed/tests/test_collections.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,91 @@ def __init__(self, i):
176176
assert set(heap) == {cx}
177177

178178

179+
def assert_heap_sorted(heap: HeapSet) -> None:
180+
assert heap._sorted
181+
assert heap._heap == sorted(heap._heap)
182+
183+
184+
def test_heapset_sorted_flag_left():
185+
heap = HeapSet(key=operator.attrgetter("i"))
186+
assert heap._sorted
187+
c1 = C("1", 1)
188+
c2 = C("2", 2)
189+
c3 = C("3", 3)
190+
c4 = C("4", 4)
191+
192+
heap.add(c4)
193+
assert not heap._sorted
194+
heap.add(c3)
195+
heap.add(c2)
196+
heap.add(c1)
197+
198+
list(heap.sorted())
199+
assert_heap_sorted(heap)
200+
201+
# `peek` maintains sort if first element is not discarded
202+
assert heap.peek() is c1
203+
assert_heap_sorted(heap)
204+
205+
# `pop` always de-sorts
206+
assert heap.pop() is c1
207+
assert not heap._sorted
208+
209+
list(heap.sorted())
210+
211+
# discard first element
212+
heap.discard(c2)
213+
assert heap.peek() is c3
214+
assert not heap._sorted
215+
216+
# popping the last element resets the sorted flag
217+
assert heap.pop() is c3
218+
assert heap.pop() is c4
219+
assert not heap
220+
assert_heap_sorted(heap)
221+
222+
# discarding`` the last element resets the sorted flag
223+
heap.add(c1)
224+
heap.add(c2)
225+
assert not heap._sorted
226+
heap.discard(c1)
227+
assert not heap._sorted
228+
heap.discard(c2)
229+
assert not heap
230+
assert_heap_sorted(heap)
231+
232+
233+
def test_heapset_sorted_flag_right():
234+
"Verify right operations don't affect sortedness"
235+
heap = HeapSet(key=operator.attrgetter("i"))
236+
c1 = C("1", 1)
237+
c2 = C("2", 2)
238+
c3 = C("3", 3)
239+
240+
heap.add(c2)
241+
heap.add(c3)
242+
heap.add(c1)
243+
244+
assert not heap._sorted
245+
list(heap.sorted())
246+
assert_heap_sorted(heap)
247+
248+
assert heap.peekright() is c3
249+
assert_heap_sorted(heap)
250+
assert heap.popright() is c3
251+
assert_heap_sorted(heap)
252+
assert heap.popright() is c2
253+
assert_heap_sorted(heap)
254+
255+
heap.add(c2)
256+
assert not heap._sorted
257+
assert heap.popright() is c2
258+
assert not heap._sorted
259+
assert heap.popright() is c1
260+
assert not heap
261+
assert_heap_sorted(heap)
262+
263+
179264
@pytest.mark.parametrize("peek", [False, True])
180265
def test_heapset_popright(peek):
181266
heap = HeapSet(key=operator.attrgetter("i"))
@@ -231,8 +316,11 @@ def test_heapset_pickle():
231316
if random.random() > 0.7:
232317
heap.remove(c)
233318

319+
list(heap.sorted()) # trigger sort
320+
assert heap._sorted
234321
heap2 = pickle.loads(pickle.dumps(heap))
235322
assert len(heap) == len(heap2)
323+
assert not heap2._sorted # re-heapification may have broken the sort
236324
# Test that the heap has been re-heapified upon unpickle
237325
assert len(heap2._heap) < len(heap._heap)
238326
while heap:

0 commit comments

Comments
 (0)