Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,7 @@ backend/logs/
backend/uploads/

# Docker 数据
data/
data/
# Personal configuration
CLAUDE.md
skills/
44 changes: 42 additions & 2 deletions backend/app/api/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ..utils.logger import get_logger
from ..utils.locale import t, get_locale, set_locale
from ..models.task import TaskManager, TaskStatus
from ..utils.zep_rate_limiter import graph_data_cache
from ..models.project import ProjectManager, ProjectStatus

# 获取日志器
Expand Down Expand Up @@ -564,12 +565,35 @@ def list_tasks():
})


# ============== 配置接口 ==============

@graph_bp.route('/config', methods=['GET'])
def get_graph_config():
"""
返回前端需要的图谱轮询配置。
前端根据这些值决定是否自动轮询以及间隔。
"""
# 初始化缓存 TTL(确保与 Config 同步)
graph_data_cache.ttl = Config.ZEP_CACHE_TTL

return jsonify({
"success": True,
"data": {
"poll_interval": Config.ZEP_GRAPH_POLL_INTERVAL, # 0 = 仅手动刷新
"cache_ttl": Config.ZEP_CACHE_TTL,
"rate_limit": Config.ZEP_RATE_LIMIT,
"rate_limit_window": Config.ZEP_RATE_LIMIT_WINDOW,
}
})


# ============== 图谱数据接口 ==============

@graph_bp.route('/data/<graph_id>', methods=['GET'])
def get_graph_data(graph_id: str):
"""
获取图谱数据(节点和边)
获取图谱数据(节点和边)。
使用响应缓存避免频繁调用 Zep API。
"""
try:
if not Config.ZEP_API_KEY:
Expand All @@ -578,12 +602,28 @@ def get_graph_data(graph_id: str):
"error": t('api.zepApiKeyMissing')
}), 500

# 检查缓存
cache_key = f"graph_data:{graph_id}"
cached = graph_data_cache.get(cache_key)
if cached is not None:
logger.debug(f"Serving cached graph data for {graph_id}")
return jsonify({
"success": True,
"data": cached,
"cached": True
})

# 缓存未命中,调用 Zep API
builder = GraphBuilderService(api_key=Config.ZEP_API_KEY)
graph_data = builder.get_graph_data(graph_id)

# 缓存成功响应
graph_data_cache.set(cache_key, graph_data)

return jsonify({
"success": True,
"data": graph_data
"data": graph_data,
"cached": False
})

except Exception as e:
Expand Down
11 changes: 11 additions & 0 deletions backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,20 @@ class Config:
LLM_BASE_URL = os.environ.get('LLM_BASE_URL', 'https://api.openai.com/v1')
LLM_MODEL_NAME = os.environ.get('LLM_MODEL_NAME', 'gpt-4o-mini')

# Boost/Fallback LLM配置(可选,主 LLM 失败时自动回退)
LLM_BOOST_API_KEY = os.environ.get('LLM_BOOST_API_KEY')
LLM_BOOST_BASE_URL = os.environ.get('LLM_BOOST_BASE_URL')
LLM_BOOST_MODEL_NAME = os.environ.get('LLM_BOOST_MODEL_NAME')

# Zep配置
ZEP_API_KEY = os.environ.get('ZEP_API_KEY')

# Zep 速率限制配置(可通过 .env 调整,升级付费计划后放宽)
ZEP_RATE_LIMIT = int(os.environ.get('ZEP_RATE_LIMIT', '5')) # 每个窗口期允许的请求数
ZEP_RATE_LIMIT_WINDOW = int(os.environ.get('ZEP_RATE_LIMIT_WINDOW', '60')) # 窗口期(秒)
ZEP_CACHE_TTL = int(os.environ.get('ZEP_CACHE_TTL', '30')) # graph data 缓存时间(秒),0=不缓存
ZEP_GRAPH_POLL_INTERVAL = int(os.environ.get('ZEP_GRAPH_POLL_INTERVAL', '0')) # 前端自动轮询间隔(秒),0=仅手动刷新

# 文件上传配置
MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50MB
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), '../uploads')
Expand Down
34 changes: 24 additions & 10 deletions backend/app/services/graph_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ..utils.zep_paging import fetch_all_nodes, fetch_all_edges
from .text_processor import TextProcessor
from ..utils.locale import t, get_locale, set_locale
from ..utils.zep_retry import with_zep_retry


@dataclass
Expand Down Expand Up @@ -190,6 +191,7 @@ def _build_graph_worker(
error_msg = f"{str(e)}\n{traceback.format_exc()}"
self.task_manager.fail_task(task_id, error_msg)

@with_zep_retry(max_retries=3, operation_name="create_graph")
def create_graph(self, name: str) -> str:
"""创建Zep图谱(公开方法)"""
graph_id = f"mirofish_{uuid.uuid4().hex[:16]}"
Expand Down Expand Up @@ -285,11 +287,14 @@ def safe_attr_name(attr_name: str) -> str:

# 调用Zep API设置本体
if entity_types or edge_definitions:
self.client.graph.set_ontology(
graph_ids=[graph_id],
entities=entity_types if entity_types else None,
edges=edge_definitions if edge_definitions else None,
)
@with_zep_retry(max_retries=3, operation_name="set_ontology")
def _set_ontology():
self.client.graph.set_ontology(
graph_ids=[graph_id],
entities=entity_types if entity_types else None,
edges=edge_definitions if edge_definitions else None,
)
_set_ontology()

def add_text_batches(
self,
Expand Down Expand Up @@ -322,10 +327,14 @@ def add_text_batches(

# 发送到Zep
try:
batch_result = self.client.graph.add_batch(
graph_id=graph_id,
episodes=episodes
)
@with_zep_retry(max_retries=3, operation_name=f"add_batch {batch_num}/{total_batches}")
def _add_batch():
return self.client.graph.add_batch(
graph_id=graph_id,
episodes=episodes
)

batch_result = _add_batch()

# 收集返回的 episode uuid
if batch_result and isinstance(batch_result, list):
Expand Down Expand Up @@ -376,7 +385,11 @@ def _wait_for_episodes(
# 检查每个 episode 的处理状态
for ep_uuid in list(pending_episodes):
try:
episode = self.client.graph.episode.get(uuid_=ep_uuid)
@with_zep_retry(max_retries=2, initial_delay=1.0, operation_name="get_episode")
def _get_episode():
return self.client.graph.episode.get(uuid_=ep_uuid)

episode = _get_episode()
is_processed = getattr(episode, 'processed', False)

if is_processed:
Expand Down Expand Up @@ -500,6 +513,7 @@ def get_graph_data(self, graph_id: str) -> Dict[str, Any]:
"edge_count": len(edges_data),
}

@with_zep_retry(max_retries=3, operation_name="delete_graph")
def delete_graph(self, graph_id: str):
"""删除图谱"""
self.client.graph.delete(graph_id=graph_id)
Expand Down
61 changes: 17 additions & 44 deletions backend/app/services/oasis_profile_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ..config import Config
from ..utils.logger import get_logger
from ..utils.locale import get_language_instruction, get_locale, set_locale, t
from ..utils.zep_retry import with_zep_retry
from .zep_entity_reader import EntityNode, ZepEntityReader

logger = get_logger('mirofish.oasis_profile')
Expand Down Expand Up @@ -316,55 +317,27 @@ def _search_zep_for_entity(self, entity: EntityNode) -> Dict[str, Any]:

comprehensive_query = t('progress.zepSearchQuery', name=entity_name)

@with_zep_retry(max_retries=3, initial_delay=2.0, operation_name="Zep Edge Search")
def search_edges():
"""搜索边(事实/关系)- 带重试机制"""
max_retries = 3
last_exception = None
delay = 2.0

for attempt in range(max_retries):
try:
return self.zep_client.graph.search(
query=comprehensive_query,
graph_id=self.graph_id,
limit=30,
scope="edges",
reranker="rrf"
)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
logger.debug(f"Zep边搜索第 {attempt + 1} 次失败: {str(e)[:80]}, 重试中...")
time.sleep(delay)
delay *= 2
else:
logger.debug(f"Zep边搜索在 {max_retries} 次尝试后仍失败: {e}")
return None
return self.zep_client.graph.search(
query=comprehensive_query,
graph_id=self.graph_id,
limit=30,
scope="edges",
reranker="rrf"
)

@with_zep_retry(max_retries=3, initial_delay=2.0, operation_name="Zep Node Search")
def search_nodes():
"""搜索节点(实体摘要)- 带重试机制"""
max_retries = 3
last_exception = None
delay = 2.0

for attempt in range(max_retries):
try:
return self.zep_client.graph.search(
query=comprehensive_query,
graph_id=self.graph_id,
limit=20,
scope="nodes",
reranker="rrf"
)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
logger.debug(f"Zep节点搜索第 {attempt + 1} 次失败: {str(e)[:80]}, 重试中...")
time.sleep(delay)
delay *= 2
else:
logger.debug(f"Zep节点搜索在 {max_retries} 次尝试后仍失败: {e}")
return None
return self.zep_client.graph.search(
query=comprehensive_query,
graph_id=self.graph_id,
limit=20,
scope="nodes",
reranker="rrf"
)

try:
# 并行执行edges和nodes搜索
Expand Down
5 changes: 3 additions & 2 deletions backend/app/services/ontology_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ def generate(

return result

# 传给 LLM 的文本最大长度(5万字)
MAX_TEXT_LENGTH_FOR_LLM = 50000
# 传给 LLM 的文本最大长度(2万字)
# 本体分析只需识别实体/关系类型,不需要完整文本;完整文本仍用于后续图谱构建
MAX_TEXT_LENGTH_FOR_LLM = 20000

def _build_user_message(
self,
Expand Down
25 changes: 6 additions & 19 deletions backend/app/services/zep_entity_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ..config import Config
from ..utils.logger import get_logger
from ..utils.zep_paging import fetch_all_nodes, fetch_all_edges
from ..utils.zep_retry import with_zep_retry

logger = get_logger('mirofish.zep_entity_reader')

Expand Down Expand Up @@ -104,25 +105,11 @@ def _call_with_retry(
Returns:
API调用结果
"""
last_exception = None
delay = initial_delay

for attempt in range(max_retries):
try:
return func()
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
logger.warning(
f"Zep {operation_name} 第 {attempt + 1} 次尝试失败: {str(e)[:100]}, "
f"{delay:.1f}秒后重试..."
)
time.sleep(delay)
delay *= 2 # 指数退避
else:
logger.error(f"Zep {operation_name} 在 {max_retries} 次尝试后仍失败: {str(e)}")

raise last_exception
@with_zep_retry(max_retries=max_retries, initial_delay=initial_delay, operation_name=operation_name)
def _execute():
return func()

return _execute()

def get_all_nodes(self, graph_id: str) -> List[Dict[str, Any]]:
"""
Expand Down
Loading