Skip to content

Commit c8a5d32

Browse files
committed
Fix memory leak on short-lived reactive pipelines
Derived rx nodes registered their invalidation callbacks (_invalidate_obj/_invalidate_current) on their source parameter as strong bound-method references. Since the source is typically long-lived, this pinned every derived node — and anything it captured, such as large arrays — alive indefinitely, with no way to reclaim it. Wrap the invalidation callbacks in a weakref.WeakMethod (_WeakInvalidator) and register a finalizer to prune the watcher once the node is collected. The node now becomes garbage collectable as soon as it is no longer referenced, and the source's watcher list no longer grows without bound. This changes no observable public API behavior: while an expression is referenced it behaves exactly as before, and watch()/_watch() are untouched. Only a node that was previously unreachable-but-uncollectable (pinned solely by the source's internal watcher) is now reclaimed. Assisted-by: Claude:claude-opus-4-8
1 parent 2ec0571 commit c8a5d32

2 files changed

Lines changed: 75 additions & 2 deletions

File tree

param/reactive.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import operator
9595
import typing as t
9696
import warnings
97+
import weakref
9798

9899
from collections.abc import (
99100
AsyncGenerator, Callable, Coroutine, Generator, Iterable, Iterator, Sized
@@ -1359,6 +1360,39 @@ def wrapped_sync(*wargs, **wkwargs):
13591360
setattr(wrapped, name, t.cast('Callable', accessor)(wrapped))
13601361
return wrapped
13611362

1363+
class _WeakInvalidator:
1364+
"""
1365+
A weakly-bound invalidation callback for an ``rx`` pipeline node.
1366+
1367+
``rx`` nodes register invalidation callbacks on their *source* parameters.
1368+
If those callbacks were strong bound-method references the source (which is
1369+
typically long-lived) would keep every derived node — and anything it
1370+
captured, such as large arrays — alive indefinitely. Wrapping the bound
1371+
method in a :class:`weakref.WeakMethod` lets the derived node be garbage
1372+
collected as soon as nothing else references it; once collected the call
1373+
becomes a no-op and the now-dead watcher is removed by a finalizer (see
1374+
``rx._watch_invalidation``).
1375+
"""
1376+
1377+
__slots__ = ('_ref', '__weakref__')
1378+
1379+
def __init__(self, method):
1380+
self._ref = weakref.WeakMethod(method)
1381+
1382+
def __call__(self, *events):
1383+
method = self._ref()
1384+
if method is not None:
1385+
return method(*events)
1386+
1387+
1388+
def _remove_watcher(owner, watcher):
1389+
"""Unwatch ``watcher`` from ``owner``, ignoring if it is already gone."""
1390+
try:
1391+
owner.param.unwatch(watcher)
1392+
except Exception:
1393+
pass
1394+
1395+
13621396
# When we only support python >= 3.11 we should exchange 'rx' with Self type annotation below.
13631397
# See https://peps.python.org/pep-0673/
13641398

@@ -1712,9 +1746,21 @@ def _setup_invalidations(self, depth: int = 0):
17121746
for _, params in full_groupby(self._fn_params, lambda x: id(x.owner)):
17131747
fps = [p.name for p in params if p in self._root._fn_params]
17141748
if fps:
1715-
params[0].owner.param._watch(self._invalidate_obj, fps, precedence=-1)
1749+
self._watch_invalidation(params[0].owner, self._invalidate_obj, fps)
17161750
for _, params in full_groupby(self._internal_params, lambda x: id(x.owner)):
1717-
params[0].owner.param._watch(self._invalidate_current, [p.name for p in params], precedence=-1)
1751+
self._watch_invalidation(params[0].owner, self._invalidate_current, [p.name for p in params])
1752+
1753+
def _watch_invalidation(self, owner, method, names):
1754+
"""
1755+
Register a *weak* invalidation watcher on a source parameter.
1756+
1757+
The callback only holds a weak reference to this node, so a long-lived
1758+
source does not pin the (potentially short-lived) derived node alive.
1759+
A finalizer removes the watcher automatically once this node is garbage
1760+
collected, keeping the source's watcher list from growing without bound.
1761+
"""
1762+
watcher = owner.param._watch(_WeakInvalidator(method), names, precedence=-1)
1763+
weakref.finalize(self, _remove_watcher, owner, watcher)
17181764

17191765
def _invalidate_current(self, *events):
17201766
if all(event.obj is self._trigger for event in events):

tests/testreactive.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import asyncio
2+
import gc
23
import math
34
import operator
45
import re
56
import time
7+
import weakref
68

79
import param
810
import pytest
@@ -1029,3 +1031,28 @@ async def expensive_compute(a):
10291031
assert y_rx.rx.value == 4
10301032

10311033
assert call_count == 2
1034+
1035+
1036+
def test_reactive_derived_node_is_garbage_collected():
1037+
def watcher_count(source):
1038+
watchers = source._internal_params[0].owner._param__private.watchers
1039+
return sum(len(lst) for what in watchers.values() for lst in what.values())
1040+
1041+
a = param.rx(1)
1042+
baseline = watcher_count(a)
1043+
assert baseline == 2
1044+
1045+
b = a + 1
1046+
assert b.rx.value == 2
1047+
assert watcher_count(a) > baseline
1048+
1049+
ref = weakref.ref(b)
1050+
del b
1051+
gc.collect()
1052+
assert ref() is None
1053+
assert watcher_count(a) == baseline
1054+
1055+
c = a + 10
1056+
a.rx.value = 5
1057+
assert c.rx.value == 15
1058+
assert watcher_count(a) > baseline

0 commit comments

Comments
 (0)