This repository was archived by the owner on Dec 7, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproject_memory.py
More file actions
63 lines (57 loc) · 2.35 KB
/
project_memory.py
File metadata and controls
63 lines (57 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from typing import Dict, List, Optional
from app.schemas import ProjectInfo
from app.models import ChatHistory, StoredChatMessage
from uuid import uuid4
from app.logger import enrich_context
class ProjectMemory:
def __init__(self):
self.projects: Dict[str, ProjectInfo] = {}
self.histories: Dict[str, List[ChatHistory]] = {}
enrich_context(event="memory_init").info("Project memory initialized")
def create_project(self, name: str) -> ProjectInfo:
project_id = str(uuid4())
project = ProjectInfo(id=project_id, name=name)
self.projects[project_id] = project
self.histories[project_id] = []
enrich_context(
event="project_created", project_id=project_id, project_name=name
).info("Project created in memory")
return project
def list_projects(self) -> List[ProjectInfo]:
enrich_context(event="projects_listed", count=len(self.projects)).info(
"Listing projects"
)
return list(self.projects.values())
def add_chat(
self,
project_id: str,
messages: List[StoredChatMessage],
trace_id: Optional[str] = None,
span_id: Optional[str] = None,
) -> ChatHistory:
if project_id not in self.projects:
enrich_context(event="project_not_found", project_id=project_id).warning(
"Attempt to add chat to missing project"
)
raise ValueError(f"Проект {project_id} не найден")
history = ChatHistory(
project_id=project_id,
messages=messages,
trace_id=trace_id,
span_id=span_id,
)
self.histories[project_id].append(history)
enrich_context(event="chat_saved", project_id=project_id).info(
"Chat added to project"
)
return history
def get_project_history(self, project_id: str) -> List[ChatHistory]:
if project_id not in self.projects:
enrich_context(event="project_not_found", project_id=project_id).warning(
"History requested for missing project"
)
raise ValueError(f"Проект {project_id} не найден")
enrich_context(event="history_retrieved", project_id=project_id).info(
"Returning project history"
)
return self.histories.get(project_id, [])