Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 70 additions & 38 deletions backend/app/services/graph_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,20 +184,29 @@ def _build_graph_worker(
error_msg = f"{str(e)}\n{traceback.format_exc()}"
self.task_manager.fail_task(task_id, error_msg)

def create_graph(self, name: str) -> str:
"""创建Zep图谱(公开方法)"""
def create_graph(self, name: str, max_retries: int = 3) -> str:
"""创建Zep图谱(公开方法,带重试机制)"""
graph_id = f"mirofish_{uuid.uuid4().hex[:16]}"

self.client.graph.create(
graph_id=graph_id,
name=name,
description="MiroFish Social Simulation Graph"
)

return graph_id
for attempt in range(max_retries):
try:
self.client.graph.create(
graph_id=graph_id,
name=name,
description="MiroFish Social Simulation Graph"
)
return graph_id
except Exception as e:
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2 # 2秒, 4秒, 6秒...
print(f"创建图谱失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
print(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
else:
raise

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
"""设置图谱本体(公开方法)"""
def set_ontology(self, graph_id: str, ontology: Dict[str, Any], max_retries: int = 3):
"""设置图谱本体(公开方法,带重试机制)"""
import warnings
from typing import Optional
from pydantic import Field
Expand Down Expand Up @@ -277,13 +286,24 @@ def safe_attr_name(attr_name: str) -> str:
if source_targets:
edge_definitions[name] = (edge_class, source_targets)

# 调用Zep API设置本体
# 调用Zep API设置本体(带重试)
if entity_types or edge_definitions:
self.client.graph.set_ontology(
graph_ids=[graph_id],
entities=entity_types if entity_types else None,
edges=edge_definitions if edge_definitions else None,
)
for attempt in range(max_retries):
try:
self.client.graph.set_ontology(
graph_ids=[graph_id],
entities=entity_types if entity_types else None,
edges=edge_definitions if edge_definitions else None,
)
break
except Exception as e:
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2
print(f"设置本体失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
print(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
else:
raise

def add_text_batches(
self,
Expand Down Expand Up @@ -314,27 +334,39 @@ def add_text_batches(
for chunk in batch_chunks
]

# 发送到Zep
try:
batch_result = self.client.graph.add_batch(
graph_id=graph_id,
episodes=episodes
)

# 收集返回的 episode uuid
if batch_result and isinstance(batch_result, list):
for ep in batch_result:
ep_uuid = getattr(ep, 'uuid_', None) or getattr(ep, 'uuid', None)
if ep_uuid:
episode_uuids.append(ep_uuid)

# 避免请求过快
time.sleep(1)

except Exception as e:
if progress_callback:
progress_callback(f"批次 {batch_num} 发送失败: {str(e)}", 0)
raise
# 发送到Zep(带重试)
max_retries = 3
for attempt in range(max_retries):
try:
batch_result = self.client.graph.add_batch(
graph_id=graph_id,
episodes=episodes
)

# 收集返回的 episode uuid
if batch_result and isinstance(batch_result, list):
for ep in batch_result:
ep_uuid = getattr(ep, 'uuid_', None) or getattr(ep, 'uuid', None)
if ep_uuid:
episode_uuids.append(ep_uuid)

# 避免请求过快
time.sleep(1)
break

except Exception as e:
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2
if progress_callback:
progress_callback(
f"批次 {batch_num} 发送失败,{wait_time}秒后重试 ({attempt + 1}/{max_retries})...",
(i + len(batch_chunks)) / total_chunks
)
time.sleep(wait_time)
else:
if progress_callback:
progress_callback(f"批次 {batch_num} 发送失败: {str(e)}", 0)
raise

return episode_uuids

Expand Down