Skip to content

Commit 33101c2

Browse files
authored
Merge branch 'release-1.10.0' into feat/kb-v1-db-connectors
2 parents 5d81cb4 + 5bf7371 commit 33101c2

11 files changed

Lines changed: 148 additions & 286 deletions

File tree

docs/docs/Develop/memory.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ If you want to use a non-default cache setting, you can use the following enviro
119119

120120
| Variable | Type | Default | Description |
121121
|----------|------|---------|-------------|
122-
| `LANGFLOW_CACHE_TYPE` | String | `async` | Set the cache type for Langflow's internal caching system. Possible values: `async`, `redis`, `memory`, `disk`. If you set the type to `redis`, then you must also set the `LANGFLOW_REDIS_*` environment variables. |
122+
| `LANGFLOW_CACHE_TYPE` | String | `async` | Set the cache type for Langflow's internal caching system. Possible values: `async`, `redis`, `memory`. If you set the type to `redis`, then you must also set the `LANGFLOW_REDIS_*` environment variables. The `disk` backend was removed in 1.10 — switch to `async` for in-memory or `redis` for cross-worker. |
123123
| `LANGFLOW_LANGCHAIN_CACHE` | String | `InMemoryCache` | Set the cache storage type for the LangChain caching system (a Langflow dependency), either `InMemoryCache` or `SQLiteCache`. |
124124
| `LANGFLOW_REDIS_HOST` | String | `localhost` | Redis server hostname if `LANGFLOW_CACHE_TYPE=redis`. |
125125
| `LANGFLOW_REDIS_PORT` | Integer | `6379` | Redis server port if `LANGFLOW_CACHE_TYPE=redis`. |

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ cassio = [
114114
"cassio>=0.1.7"
115115
]
116116
local = [
117-
"llama-cpp-python~=0.2.0",
118117
"sentence-transformers>=2.3.1",
119118
"ctransformers>=0.2.10"
120119
]

src/backend/base/langflow/services/cache/disk.py

Lines changed: 0 additions & 95 deletions
This file was deleted.

src/backend/base/langflow/services/cache/factory.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from lfx.log.logger import logger
66
from typing_extensions import override
77

8-
from langflow.services.cache.disk import AsyncDiskCache
98
from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache
109
from langflow.services.factory import ServiceFactory
1110

@@ -36,10 +35,4 @@ def create(self, settings_service: SettingsService):
3635
return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire)
3736
if settings_service.settings.cache_type == "async":
3837
return AsyncInMemoryCache(expiration_time=settings_service.settings.cache_expire)
39-
if settings_service.settings.cache_type == "disk":
40-
cache_dir = settings_service.settings.cache_dir or settings_service.settings.config_dir
41-
return AsyncDiskCache(
42-
cache_dir=cache_dir,
43-
expiration_time=settings_service.settings.cache_expire,
44-
)
4538
return None
Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING
4+
15
from typing_extensions import override
26

37
from langflow.services.factory import ServiceFactory
48
from langflow.services.flow_events.service import FlowEventsService
59

10+
if TYPE_CHECKING:
11+
from lfx.services.settings.service import SettingsService
12+
613

714
class FlowEventsServiceFactory(ServiceFactory):
815
def __init__(self) -> None:
916
super().__init__(FlowEventsService)
1017

1118
@override
12-
def create(self):
13-
return FlowEventsService()
19+
def create(self, settings_service: SettingsService):
20+
return FlowEventsService(cache_dir=settings_service.settings.cache_dir)

src/backend/base/langflow/services/flow_events/service.py

Lines changed: 89 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
from __future__ import annotations
22

3-
import json
3+
import sqlite3
44
import tempfile
5+
import threading
56
import time
6-
from dataclasses import asdict, dataclass
7+
from dataclasses import dataclass
78
from pathlib import Path
89
from typing import Literal, get_args
910

10-
from diskcache import Cache
11-
1211
from langflow.services.base import Service
1312

1413
FLOW_EVENT_TYPES = Literal[
@@ -29,12 +28,25 @@ class FlowEvent:
2928
summary: str = ""
3029

3130

31+
_SCHEMA = """
32+
CREATE TABLE IF NOT EXISTS flow_events (
33+
flow_id TEXT NOT NULL,
34+
ts REAL NOT NULL,
35+
type TEXT NOT NULL,
36+
summary TEXT NOT NULL DEFAULT '',
37+
expires_at REAL NOT NULL
38+
);
39+
CREATE INDEX IF NOT EXISTS idx_flow_events_flow_ts ON flow_events(flow_id, ts);
40+
CREATE INDEX IF NOT EXISTS idx_flow_events_expires ON flow_events(expires_at);
41+
"""
42+
43+
3244
class FlowEventsService(Service):
33-
"""Disk-backed event queue keyed by flow_id.
45+
"""SQLite-backed event queue keyed by flow_id.
3446
35-
Uses diskcache for cross-worker visibility (multiple uvicorn/gunicorn workers
36-
share the same SQLite-backed cache directory). TTL-based cleanup is handled
37-
by diskcache's built-in expiry.
47+
Uses Python's stdlib sqlite3 in WAL mode for cross-worker visibility (multiple
48+
uvicorn/gunicorn workers share the same on-disk database file). TTL-based
49+
cleanup is performed lazily on each write and read.
3850
3951
Limitations:
4052
- Events are ephemeral: lost on disk cleanup or container restart.
@@ -49,28 +61,68 @@ class FlowEventsService(Service):
4961
SETTLE_TIMEOUT: float = 10.0
5062
MAX_EVENTS_PER_FLOW: int = 1000
5163

64+
_VALID_EVENT_TYPES: frozenset[str] = frozenset(get_args(FLOW_EVENT_TYPES))
65+
5266
def __init__(self, cache_dir: str | Path | None = None) -> None:
5367
if cache_dir is None:
5468
cache_dir = Path(tempfile.gettempdir()) / "langflow_flow_events"
55-
self._cache = Cache(str(cache_dir))
56-
57-
_VALID_EVENT_TYPES: frozenset[str] = frozenset(get_args(FLOW_EVENT_TYPES))
69+
cache_dir = Path(cache_dir)
70+
cache_dir.mkdir(parents=True, exist_ok=True)
71+
self._db_path = cache_dir / "flow_events.sqlite"
72+
# isolation_level=None puts pysqlite in autocommit mode so we manage
73+
# transactions explicitly via BEGIN/COMMIT.
74+
self._conn = sqlite3.connect(
75+
str(self._db_path),
76+
isolation_level=None,
77+
check_same_thread=False,
78+
timeout=5.0,
79+
)
80+
# Serialize use of the single connection across asyncio tasks / FastAPI threads
81+
# in this worker. WAL handles cross-worker concurrency.
82+
self._lock = threading.Lock()
83+
with self._lock:
84+
self._conn.execute("PRAGMA journal_mode=WAL")
85+
self._conn.execute("PRAGMA synchronous=NORMAL")
86+
self._conn.execute("PRAGMA busy_timeout=5000")
87+
self._conn.executescript(_SCHEMA)
5888

5989
def append(self, flow_id: str, event_type: str, summary: str = "") -> FlowEvent:
6090
if event_type not in self._VALID_EVENT_TYPES:
6191
msg = f"Invalid event type: {event_type!r}. Must be one of {sorted(self._VALID_EVENT_TYPES)}"
6292
raise ValueError(msg)
63-
event = FlowEvent(type=event_type, timestamp=time.time(), summary=summary)
64-
key = f"flow_events:{flow_id}"
65-
66-
with self._cache.transact():
67-
raw = self._cache.get(key, default=None)
68-
events: list[dict] = json.loads(raw) if raw else []
69-
events.append(asdict(event))
70-
# Trim to max size
71-
if len(events) > self.MAX_EVENTS_PER_FLOW:
72-
events = events[-self.MAX_EVENTS_PER_FLOW :]
73-
self._cache.set(key, json.dumps(events), expire=self.TTL_SECONDS)
93+
94+
now = time.time()
95+
event = FlowEvent(type=event_type, timestamp=now, summary=summary)
96+
expires_at = now + self.TTL_SECONDS
97+
98+
with self._lock:
99+
self._conn.execute("BEGIN IMMEDIATE")
100+
try:
101+
# Opportunistic TTL cleanup across all flows.
102+
self._conn.execute("DELETE FROM flow_events WHERE expires_at < ?", (now,))
103+
self._conn.execute(
104+
"INSERT INTO flow_events (flow_id, ts, type, summary, expires_at) VALUES (?, ?, ?, ?, ?)",
105+
(flow_id, now, event_type, summary, expires_at),
106+
)
107+
# Bound per-flow size: keep only the most recent MAX_EVENTS_PER_FLOW rows.
108+
# Order by (ts, rowid) so events appended in the same microsecond have a
109+
# stable, insertion-aware ordering when picking which rows to drop.
110+
self._conn.execute(
111+
"""
112+
DELETE FROM flow_events
113+
WHERE rowid IN (
114+
SELECT rowid FROM flow_events
115+
WHERE flow_id = ?
116+
ORDER BY ts DESC, rowid DESC
117+
LIMIT -1 OFFSET ?
118+
)
119+
""",
120+
(flow_id, self.MAX_EVENTS_PER_FLOW),
121+
)
122+
self._conn.execute("COMMIT")
123+
except Exception:
124+
self._conn.execute("ROLLBACK")
125+
raise
74126

75127
return event
76128

@@ -82,10 +134,19 @@ def get_since(self, flow_id: str, since: float) -> tuple[list[FlowEvent], bool]:
82134
- A flow_settled event exists after `since`, OR
83135
- The most recent event is older than SETTLE_TIMEOUT seconds.
84136
"""
85-
key = f"flow_events:{flow_id}"
86-
raw = self._cache.get(key, default=None)
87-
all_events = [FlowEvent(**e) for e in json.loads(raw)] if raw else []
88-
137+
now = time.time()
138+
with self._lock:
139+
rows = self._conn.execute(
140+
"""
141+
SELECT type, ts, summary
142+
FROM flow_events
143+
WHERE flow_id = ? AND expires_at >= ?
144+
ORDER BY ts ASC, rowid ASC
145+
""",
146+
(flow_id, now),
147+
).fetchall()
148+
149+
all_events = [FlowEvent(type=r[0], timestamp=r[1], summary=r[2]) for r in rows]
89150
after = [e for e in all_events if e.timestamp > since]
90151

91152
if not after and not all_events:
@@ -100,4 +161,5 @@ def get_since(self, flow_id: str, since: float) -> tuple[list[FlowEvent], bool]:
100161
return after, settled
101162

102163
async def teardown(self) -> None:
103-
self._cache.close()
164+
with self._lock:
165+
self._conn.close()

src/backend/base/pyproject.toml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ dependencies = [
7373
"filelock>=3.20.1,<4.0.0",
7474
"grandalf>=0.8.0,<1.0.0",
7575
"spider-client>=0.0.27,<1.0.0",
76-
"diskcache>=5.6.3,<6.0.0",
7776
"clickhouse-connect==0.7.19",
7877
"assemblyai>=0.33.0,<1.0.0",
7978
"fastapi-pagination>=0.13.1,<1.0.0",
@@ -176,7 +175,6 @@ postgresql = [
176175
]
177176

178177
local = [
179-
"llama-cpp-python>=0.2.0",
180178
"sentence-transformers>=2.0.0",
181179
"ctransformers>=0.2"
182180
]
@@ -224,7 +222,6 @@ sambanova = ["langchain-sambanova~=1.0.0"]
224222
groq = ["langchain-groq~=1.1.1"]
225223

226224
# Individual local LLM providers
227-
llama-cpp = ["llama-cpp-python~=0.2.0"]
228225
sentence-transformers = ["sentence-transformers>=2.3.1"]
229226
ctransformers = ["ctransformers>=0.2.10"]
230227

@@ -242,7 +239,6 @@ pytube = ["pytube==15.0.0"]
242239
# Individual agent frameworks
243240
# crewai is commented out due to httpx version conflict
244241
# crewai = ["crewai>=0.126.0"]
245-
dspy = ["dspy-ai==2.5.41"]
246242
smolagents = ["smolagents>=1.8.0"]
247243

248244
# Individual tools
@@ -399,7 +395,6 @@ complete = [
399395
"langflow-base[docx]",
400396
"langflow-base[pytube]",
401397
# "langflow-base[crewai]", # commented out due to httpx version conflict
402-
"langflow-base[dspy]",
403398
"langflow-base[smolagents]",
404399
"langflow-base[beautifulsoup]",
405400
"langflow-base[serpapi]",

0 commit comments

Comments
 (0)