From c5ab03014a2e13c8b5573e99a4c96f16fe841f7d Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Wed, 7 May 2025 16:43:08 -0700 Subject: [PATCH 1/5] Ensure consistent embeddings used --- lumen/ai/agents.py | 4 +- lumen/ai/vector_store.py | 140 ++++++++++++++++++++++++++++ lumen/tests/ai/test_vector_store.py | 15 ++- 3 files changed, 156 insertions(+), 3 deletions(-) diff --git a/lumen/ai/agents.py b/lumen/ai/agents.py index d5e60bc7b..93ef8c7a2 100644 --- a/lumen/ai/agents.py +++ b/lumen/ai/agents.py @@ -241,7 +241,7 @@ class ChatAgent(Agent): "Best for high-level information about data or general conversation", "Can be used to describe available tables", "Not useful for answering data specific questions", - "Must be paired with TableLookup or DocumentLookup", + "Can be paired with TableLookup or DocumentLookup", ]) purpose = param.String(default=""" @@ -257,7 +257,7 @@ class ChatAgent(Agent): ) # technically not required if appended manually with tool in coordinator - requires = param.List(default=["vector_metaset"], readonly=True) + requires = param.List(default=[], readonly=True) async def respond( self, diff --git a/lumen/ai/vector_store.py b/lumen/ai/vector_store.py index 930bd63eb..43af9b03d 100644 --- a/lumen/ai/vector_store.py +++ b/lumen/ai/vector_store.py @@ -1,4 +1,5 @@ import asyncio +import importlib import io import json import os @@ -68,6 +69,7 @@ class VectorStore(LLMUser): embeddings = param.ClassSelector( class_=Embeddings, default=NumpyEmbeddings(), + allow_None=True, doc="Embeddings object for text processing.", ) @@ -85,6 +87,9 @@ class VectorStore(LLMUser): def __init__(self, **params): super().__init__(**params) + # If embeddings is None, use NumpyEmbeddings as default + if self.embeddings is None: + self.embeddings = NumpyEmbeddings() if self.chunk_func is None: self.chunk_func = semchunk.chunkerify( self.chunk_tokenizer, chunk_size=self.chunk_size @@ -897,8 +902,16 @@ class DuckDBVectorStore(VectorStore): uri = param.String(default=":memory:", doc="The URI of the DuckDB database") + embeddings = param.ClassSelector( + class_=Embeddings, + default=None, + allow_None=True, + doc="Embeddings object for text processing. If None and a URI is provided, loads from the database; else NumpyEmbeddings.", + ) + def __init__(self, **params): super().__init__(**params) + connection = duckdb.connect(":memory:") # following the instructions from # https://duckdb.org/docs/stable/extensions/vss.html#persistence @@ -930,6 +943,19 @@ def __init__(self, **params): ) self._initialized = uri_exists and has_documents + if self.uri != ":memory:" and self._initialized: + config = self._get_embeddings_config() + if config and self.embeddings is None: + module_name, class_name = config["class"].rsplit(".", 1) + module = importlib.import_module(module_name) + embedding_class = getattr(module, class_name) + self.embeddings = embedding_class(**config["params"]) + log_debug(f"Loaded embeddings {class_name} from database.") + self._check_embeddings_consistency() + + if self.embeddings is None: + self.embeddings = NumpyEmbeddings() + def _setup_database(self, embedding_dim: int) -> None: """Set up the DuckDB database with necessary tables and indexes.""" self.connection.execute("CREATE SEQUENCE IF NOT EXISTS documents_id_seq;") @@ -945,6 +971,34 @@ def _setup_database(self, embedding_dim: int) -> None: """ ) + self.connection.execute( + """ + CREATE TABLE IF NOT EXISTS vector_store_metadata ( + key VARCHAR PRIMARY KEY, + value JSON + ); + """ + ) + + # Store embedding configuration + embedding_info = { + "class": self.embeddings.__class__.__module__ + "." + self.embeddings.__class__.__name__, + "params": {} + } + for param_name, param_obj in self.embeddings.param.objects().items(): + if param_name not in ['name']: + value = getattr(self.embeddings, param_name) + if isinstance(value, (str, int, float, bool, list, dict)) or value is None: + embedding_info["params"][param_name] = value + + self.connection.execute( + """ + INSERT OR REPLACE INTO vector_store_metadata (key, value) + VALUES ('embeddings', ?::JSON); + """, + [json.dumps(embedding_info)] + ) + self.connection.execute( """ CREATE INDEX IF NOT EXISTS embedding_index @@ -953,6 +1007,91 @@ def _setup_database(self, embedding_dim: int) -> None: ) self._initialized = True + def _check_embeddings_consistency(self): + """ + Check if the provided embeddings are consistent with the stored configuration. + Raises ValueError if there's a mismatch that would cause empty query results. + """ + # Check if metadata table exists + has_metadata = ( + self.connection.execute( + "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'vector_store_metadata';" + ).fetchone()[0] + > 0 + ) + + if not has_metadata: + return # No metadata table, can't check consistency + + result = self.connection.execute( + "SELECT value FROM vector_store_metadata WHERE key = 'embeddings';" + ).fetchone() + + if not result: + return + + stored_config = json.loads(result[0]) + stored_class = stored_config["class"] + stored_params = stored_config["params"] + + # Get current embeddings class + current_class = self.embeddings.__class__.__module__ + "." + self.embeddings.__class__.__name__ + + # Check if classes match + if current_class != stored_class: + raise ValueError( + f"Provided embeddings class '{current_class}' does not match the stored class " + f"'{stored_class}' for this vector store. This would result in empty query results. " + f"Use compatible embeddings or create a new vector store." + ) + + # Check if critical parameters match + for param_name, stored_value in stored_params.items(): + if hasattr(self.embeddings, param_name): + current_value = getattr(self.embeddings, param_name) + if current_value != stored_value and param_name in ['model', 'dimensions', 'chunk_size']: + raise ValueError( + f"Provided embeddings parameter '{param_name}' value '{current_value}' " + f"does not match stored value '{stored_value}'. This would result in " + f"empty query results. Use compatible embeddings or create a new vector store." + ) + + + def _get_embeddings_config(self): + """ + Get the embeddings configuration stored in the vector store. + + Returns + ------- + dict or None + The embeddings configuration or None if not available. + """ + if not self._initialized: + return None + + # Check if metadata table exists + has_metadata = ( + self.connection.execute( + "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'vector_store_metadata';" + ).fetchone()[0] + > 0 + ) + + if not has_metadata: + return None + + try: + result = self.connection.execute( + "SELECT value FROM vector_store_metadata WHERE key = 'embeddings';" + ).fetchone() + + if result: + return json.loads(result[0]) + return None + except Exception as e: + log_debug(f"Error retrieving embeddings configuration: {e}") + return None + async def _add_items( self, texts: list[str], @@ -1072,6 +1211,7 @@ async def query( try: result = self.connection.execute(base_query, params).fetchall() + print(result) return [ { "id": row[0], diff --git a/lumen/tests/ai/test_vector_store.py b/lumen/tests/ai/test_vector_store.py index b9a0356f2..b42f91ce1 100644 --- a/lumen/tests/ai/test_vector_store.py +++ b/lumen/tests/ai/test_vector_store.py @@ -5,7 +5,7 @@ except ModuleNotFoundError: pytest.skip("lumen.ai could not be imported, skipping tests.", allow_module_level=True) -from lumen.ai.embeddings import NumpyEmbeddings +from lumen.ai.embeddings import Embeddings, NumpyEmbeddings from lumen.ai.vector_store import DuckDBVectorStore, NumpyVectorStore @@ -545,3 +545,16 @@ async def test_not_initalized(self, tmp_path): results = await store.query("First doc") assert len(results) == 0 store.close() + + async def test_check_embeddings_consistency(self, tmp_path): + db_path = str(tmp_path / "test_duckdb.db") + store = DuckDBVectorStore(uri=db_path, embeddings=NumpyEmbeddings()) + store.add([{"text": "First doc"}]) + store.close() + + store = DuckDBVectorStore(uri=db_path, embeddings=NumpyEmbeddings()) + assert len(store.query("First doc")) == 1 + store.close() + + with pytest.raises(ValueError, match="Provided embeddings class"): + DuckDBVectorStore(uri=db_path, embeddings=Embeddings()) From 08fa30da76fbe22f9800a6dbb5207a9266e1905d Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Wed, 7 May 2025 16:44:43 -0700 Subject: [PATCH 2/5] Remove extraneous comments --- lumen/ai/agents.py | 4 ++-- lumen/ai/vector_store.py | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/lumen/ai/agents.py b/lumen/ai/agents.py index 93ef8c7a2..d5e60bc7b 100644 --- a/lumen/ai/agents.py +++ b/lumen/ai/agents.py @@ -241,7 +241,7 @@ class ChatAgent(Agent): "Best for high-level information about data or general conversation", "Can be used to describe available tables", "Not useful for answering data specific questions", - "Can be paired with TableLookup or DocumentLookup", + "Must be paired with TableLookup or DocumentLookup", ]) purpose = param.String(default=""" @@ -257,7 +257,7 @@ class ChatAgent(Agent): ) # technically not required if appended manually with tool in coordinator - requires = param.List(default=[], readonly=True) + requires = param.List(default=["vector_metaset"], readonly=True) async def respond( self, diff --git a/lumen/ai/vector_store.py b/lumen/ai/vector_store.py index 43af9b03d..029a7e46f 100644 --- a/lumen/ai/vector_store.py +++ b/lumen/ai/vector_store.py @@ -87,9 +87,6 @@ class VectorStore(LLMUser): def __init__(self, **params): super().__init__(**params) - # If embeddings is None, use NumpyEmbeddings as default - if self.embeddings is None: - self.embeddings = NumpyEmbeddings() if self.chunk_func is None: self.chunk_func = semchunk.chunkerify( self.chunk_tokenizer, chunk_size=self.chunk_size @@ -1211,7 +1208,6 @@ async def query( try: result = self.connection.execute(base_query, params).fetchall() - print(result) return [ { "id": row[0], From a7aa06c38f43a4be647f637ead3a0b4fc146e4bf Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Wed, 7 May 2025 16:46:54 -0700 Subject: [PATCH 3/5] Update param --- lumen/ai/vector_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lumen/ai/vector_store.py b/lumen/ai/vector_store.py index 029a7e46f..46083b5ff 100644 --- a/lumen/ai/vector_store.py +++ b/lumen/ai/vector_store.py @@ -1046,7 +1046,7 @@ def _check_embeddings_consistency(self): for param_name, stored_value in stored_params.items(): if hasattr(self.embeddings, param_name): current_value = getattr(self.embeddings, param_name) - if current_value != stored_value and param_name in ['model', 'dimensions', 'chunk_size']: + if current_value != stored_value and param_name in ['model', 'embedding_dim', 'chunk_size']: raise ValueError( f"Provided embeddings parameter '{param_name}' value '{current_value}' " f"does not match stored value '{stored_value}'. This would result in " From a3109fc1699af868270e714296f6c272df07745b Mon Sep 17 00:00:00 2001 From: Andrew Huang Date: Thu, 8 May 2025 08:56:44 -0700 Subject: [PATCH 4/5] fix test --- lumen/tests/ai/test_vector_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lumen/tests/ai/test_vector_store.py b/lumen/tests/ai/test_vector_store.py index b42f91ce1..94c5cacd7 100644 --- a/lumen/tests/ai/test_vector_store.py +++ b/lumen/tests/ai/test_vector_store.py @@ -549,11 +549,11 @@ async def test_not_initalized(self, tmp_path): async def test_check_embeddings_consistency(self, tmp_path): db_path = str(tmp_path / "test_duckdb.db") store = DuckDBVectorStore(uri=db_path, embeddings=NumpyEmbeddings()) - store.add([{"text": "First doc"}]) + await store.add([{"text": "First doc"}]) store.close() store = DuckDBVectorStore(uri=db_path, embeddings=NumpyEmbeddings()) - assert len(store.query("First doc")) == 1 + assert len(await store.query("First doc")) == 1 store.close() with pytest.raises(ValueError, match="Provided embeddings class"): From cd9b01db6ddb689df8b828389f22037ed3eb2803 Mon Sep 17 00:00:00 2001 From: Andrew <15331990+ahuang11@users.noreply.github.com> Date: Fri, 9 May 2025 08:35:02 -0700 Subject: [PATCH 5/5] Update lumen/ai/vector_store.py Co-authored-by: Andy Maloney --- lumen/ai/vector_store.py | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/lumen/ai/vector_store.py b/lumen/ai/vector_store.py index 46083b5ff..f627ed398 100644 --- a/lumen/ai/vector_store.py +++ b/lumen/ai/vector_store.py @@ -1010,24 +1010,7 @@ def _check_embeddings_consistency(self): Raises ValueError if there's a mismatch that would cause empty query results. """ # Check if metadata table exists - has_metadata = ( - self.connection.execute( - "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'vector_store_metadata';" - ).fetchone()[0] - > 0 - ) - - if not has_metadata: - return # No metadata table, can't check consistency - - result = self.connection.execute( - "SELECT value FROM vector_store_metadata WHERE key = 'embeddings';" - ).fetchone() - - if not result: - return - - stored_config = json.loads(result[0]) + stored_config = self._get_embeddings_config() or {"class": "", "params": {}} stored_class = stored_config["class"] stored_params = stored_config["params"]