forked from HKUDS/LightRAG
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlightrag_gemini_workspace_demo.py
More file actions
146 lines (122 loc) · 4.61 KB
/
lightrag_gemini_workspace_demo.py
File metadata and controls
146 lines (122 loc) · 4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
LightRAG Data Isolation Demo: Workspace Management
This example demonstrates how to maintain multiple isolated knowledge bases
within a single application using LightRAG's 'workspace' feature.
Key Concepts:
- Workspace Isolation: Each RAG instance is assigned a unique workspace name,
which ensures that Knowledge Graphs, Vector Databases, and Chunks are
stored in separate, non-conflicting directories.
- Independent Configuration: Different workspaces can utilize different
ENTITY_TYPES and document sets simultaneously.
Prerequisites:
1. Set the following environment variables:
- GEMINI_API_KEY: Your Google Gemini API key.
- ENTITY_TYPES: A JSON string of entity categories (e.g., '["Person", "Organization"]').
2. Ensure your data directory contains:
- Data/book-small.txt
- Data/HR_policies.txt
Usage:
python lightrag_workspace_demo.py
"""
import os
import asyncio
import json
import numpy as np
from lightrag import LightRAG, QueryParam
from lightrag.llm.gemini import gemini_model_complete, gemini_embed
from lightrag.utils import wrap_embedding_func_with_attrs
from lightrag.constants import DEFAULT_ENTITY_TYPES
async def llm_model_func(
prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs
) -> str:
"""Wrapper for Gemini LLM completion."""
return await gemini_model_complete(
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=os.getenv("GEMINI_API_KEY"),
model_name="gemini-2.0-flash-exp",
**kwargs,
)
@wrap_embedding_func_with_attrs(
embedding_dim=768, max_token_size=2048, model_name="models/text-embedding-004"
)
async def embedding_func(texts: list[str]) -> np.ndarray:
"""Wrapper for Gemini embedding model."""
return await gemini_embed.func(
texts, api_key=os.getenv("GEMINI_API_KEY"), model="models/text-embedding-004"
)
async def initialize_rag(
workspace: str = "default_workspace",
entities=None,
) -> LightRAG:
"""
Initializes a LightRAG instance with data isolation.
- entities (if provided) overrides everything
- else ENTITY_TYPES env var is used
- else DEFAULT_ENTITY_TYPES is used
"""
if entities is not None:
entity_types = entities
else:
env_entities = os.getenv("ENTITY_TYPES")
if env_entities:
entity_types = json.loads(env_entities)
else:
entity_types = DEFAULT_ENTITY_TYPES
rag = LightRAG(
workspace=workspace,
llm_model_name="gemini-2.0-flash",
llm_model_func=llm_model_func,
embedding_func=embedding_func,
embedding_func_max_async=4,
embedding_batch_num=8,
llm_model_max_async=2,
addon_params={"entity_types": entity_types},
)
await rag.initialize_storages()
return rag
async def main():
rag_1 = None
rag_2 = None
try:
# 1. Initialize Isolated Workspaces
# Instance 1: Dedicated to literary analysis
# Instance 2: Dedicated to corporate HR documentation
print("Initializing isolated LightRAG workspaces...")
rag_1 = await initialize_rag("rag_workspace_book")
rag_2 = await initialize_rag("rag_workspace_hr")
# 2. Populate Workspace 1 (Literature)
book_path = "Data/book-small.txt"
if os.path.exists(book_path):
with open(book_path, "r", encoding="utf-8") as f:
print(f"Indexing {book_path} into Literature Workspace...")
await rag_1.ainsert(f.read())
# 3. Populate Workspace 2 (Corporate)
hr_path = "Data/HR_policies.txt"
if os.path.exists(hr_path):
with open(hr_path, "r", encoding="utf-8") as f:
print(f"Indexing {hr_path} into HR Workspace...")
await rag_2.ainsert(f.read())
# 4. Context-Specific Querying
print("\n--- Querying Literature Workspace ---")
res1 = await rag_1.aquery(
"What is the main theme?",
param=QueryParam(mode="hybrid", stream=False),
)
print(f"Book Analysis: {res1[:200]}...")
print("\n--- Querying HR Workspace ---")
res2 = await rag_2.aquery(
"What is the leave policy?", param=QueryParam(mode="hybrid")
)
print(f"HR Response: {res2[:200]}...")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Finalize storage to safely close DB connections and write buffers
if rag_1:
await rag_1.finalize_storages()
if rag_2:
await rag_2.finalize_storages()
if __name__ == "__main__":
asyncio.run(main())